This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2362500  [Doris On ES] Support create table with wildcard or aliase 
index (#3968)
2362500 is described below

commit 2362500e77d994b36a132808f613ef3eedc66113
Author: Yunfeng,Wu <[email protected]>
AuthorDate: Wed Jul 1 22:08:06 2020 +0800

    [Doris On ES] Support create table with wildcard or aliase index (#3968)
---
 ...aSourceException.java => DorisEsException.java} |  4 +--
 .../doris/external/elasticsearch/EsFieldInfos.java |  4 +--
 .../doris/external/elasticsearch/EsRestClient.java |  4 +--
 .../external/elasticsearch/EsShardPartitions.java  | 33 ++++++++++++++--------
 .../external/elasticsearch/EsShardRouting.java     | 11 +++-----
 .../external/elasticsearch/EsTablePartitions.java  | 22 ++++++++-------
 .../java/org/apache/doris/planner/EsScanNode.java  |  8 +++---
 .../external/elasticsearch/EsRepositoryTest.java   |  4 +--
 8 files changed, 49 insertions(+), 41 deletions(-)

diff --git 
a/fe/src/main/java/org/apache/doris/external/elasticsearch/ExternalDataSourceException.java
 
b/fe/src/main/java/org/apache/doris/external/elasticsearch/DorisEsException.java
similarity index 89%
rename from 
fe/src/main/java/org/apache/doris/external/elasticsearch/ExternalDataSourceException.java
rename to 
fe/src/main/java/org/apache/doris/external/elasticsearch/DorisEsException.java
index abb6744..c1ea1f4 100644
--- 
a/fe/src/main/java/org/apache/doris/external/elasticsearch/ExternalDataSourceException.java
+++ 
b/fe/src/main/java/org/apache/doris/external/elasticsearch/DorisEsException.java
@@ -19,11 +19,11 @@ package org.apache.doris.external.elasticsearch;
 
 import org.apache.doris.common.UserException;
 
-public class ExternalDataSourceException extends UserException {
+public class DorisEsException extends UserException {
 
     private static final long serialVersionUID = 7912833584319374692L;
 
-    public ExternalDataSourceException(String msg) {
+    public DorisEsException(String msg) {
         super(msg);
     }
 }
diff --git 
a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsFieldInfos.java 
b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsFieldInfos.java
index 59d2511..5edb80b 100644
--- a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsFieldInfos.java
+++ b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsFieldInfos.java
@@ -64,7 +64,7 @@ public class EsFieldInfos {
      * @return fieldsContext and docValueContext
      * @throws Exception
      */
-    public static EsFieldInfos fromMapping(List<Column> colList, String 
indexName, String indexMapping, String docType) throws 
ExternalDataSourceException {
+    public static EsFieldInfos fromMapping(List<Column> colList, String 
indexName, String indexMapping, String docType) throws DorisEsException {
         JSONObject jsonObject = new JSONObject(indexMapping);
         // the indexName use alias takes the first mapping
         Iterator<String> keys = jsonObject.keys();
@@ -103,7 +103,7 @@ public class EsFieldInfos {
             properties = rootSchema.optJSONObject("properties");
         }
         if (properties == null) {
-            throw new ExternalDataSourceException( "index[" + indexName + "] 
type[" + docType + "] mapping not found for the Elasticsearch Cluster");
+            throw new DorisEsException( "index[" + indexName + "] type[" + 
docType + "] mapping not found for the Elasticsearch Cluster");
         }
         return parseProperties(colList, properties);
     }
diff --git 
a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java 
b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java
index adfc294..dd7f93f 100644
--- a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java
+++ b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java
@@ -95,7 +95,7 @@ public class EsRestClient {
         String path = indexName + "/_mapping";
         String indexMapping = execute(path);
         if (indexMapping == null) {
-            throw new ExternalDataSourceException( "index[" + indexName + "] 
not found for the Elasticsearch Cluster");
+            throw new DorisEsException( "index[" + indexName + "] not found 
for the Elasticsearch Cluster");
         }
         return EsFieldInfos.fromMapping(colList, indexName, indexMapping, 
docType);
     }
@@ -105,7 +105,7 @@ public class EsRestClient {
         String path = indexName + "/_search_shards";
         String searchShards = execute(path);
         if (searchShards == null) {
-            throw new ExternalDataSourceException( "index[" + indexName + "] 
search_shards not found for the Elasticsearch Cluster");
+            throw new DorisEsException( "index[" + indexName + "] 
search_shards not found for the Elasticsearch Cluster");
         }
         return EsShardPartitions.findShardPartitions(indexName, searchShards);
     }
diff --git 
a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsShardPartitions.java
 
b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsShardPartitions.java
index 30e12a0..5caa6c0 100644
--- 
a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsShardPartitions.java
+++ 
b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsShardPartitions.java
@@ -20,12 +20,15 @@ package org.apache.doris.external.elasticsearch;
 import org.apache.doris.analysis.SingleRangePartitionDesc;
 import org.apache.doris.catalog.PartitionKey;
 import org.apache.doris.thrift.TNetworkAddress;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.json.JSONArray;
 import org.json.JSONObject;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -47,17 +50,17 @@ public class EsShardPartitions {
         this.partitionDesc = null;
         this.partitionKey = null;
     }
-    
+
     /**
      * Parse shardRoutings from the json
-     * @param indexName indexName(alias or really name)
+     *
+     * @param indexName    indexName(alias or really name)
      * @param searchShards the return value of _search_shards
      * @return shardRoutings is used for searching
      */
-    public static EsShardPartitions findShardPartitions(String indexName, 
String searchShards) throws ExternalDataSourceException {
+    public static EsShardPartitions findShardPartitions(String indexName, 
String searchShards) throws DorisEsException {
         EsShardPartitions indexState = new EsShardPartitions(indexName);
         JSONObject jsonObject = new JSONObject(searchShards);
-        JSONObject nodesMap = jsonObject.getJSONObject("nodes");
         JSONArray shards = jsonObject.getJSONArray("shards");
         int length = shards.length();
         for (int i = 0; i < length; i++) {
@@ -65,14 +68,20 @@ public class EsShardPartitions {
             JSONArray shardsArray = shards.getJSONArray(i);
             int arrayLength = shardsArray.length();
             for (int j = 0; j < arrayLength; j++) {
-                JSONObject shard = shardsArray.getJSONObject(j);
-                String shardState = shard.getString("state");
+                JSONObject indexShard = shardsArray.getJSONObject(j);
+                String shardState = indexShard.getString("state");
                 if ("STARTED".equalsIgnoreCase(shardState) || 
"RELOCATING".equalsIgnoreCase(shardState)) {
                     try {
-                        
singleShardRouting.add(EsShardRouting.parseShardRouting(shardState, 
String.valueOf(i), shard, nodesMap));
+                        singleShardRouting.add(
+                                EsShardRouting.newSearchShard(
+                                        indexShard.getString("index"),
+                                        indexShard.getInt("shard"),
+                                        indexShard.getBoolean("primary"),
+                                        indexShard.getString("node"),
+                                        jsonObject.getJSONObject("nodes")));
                     } catch (Exception e) {
-                        throw new ExternalDataSourceException( "index[" + 
indexName + "] findShardPartitions error");
-                    }
+                        LOG.error("fetch index [{}] shard partitions failure", 
indexName, e);
+                        throw new DorisEsException("fetch [" + indexName + "] 
shard partitions failure [" + e.getMessage() + "]");                   }
                 }
             }
             if (singleShardRouting.isEmpty()) {
@@ -142,6 +151,6 @@ public class EsShardPartitions {
     @Override
     public String toString() {
         return "EsIndexState [indexName=" + indexName + ", partitionDesc=" + 
partitionDesc + ", partitionKey="
-            + partitionKey + "]";
+                + partitionKey + "]";
     }
 }
diff --git 
a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsShardRouting.java 
b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsShardRouting.java
index 4f3fa2f..980a62d 100644
--- 
a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsShardRouting.java
+++ 
b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsShardRouting.java
@@ -18,10 +18,10 @@
 package org.apache.doris.external.elasticsearch;
 
 
+import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.commons.lang.StringUtils;
 import org.json.JSONObject;
 
-import org.apache.doris.thrift.TNetworkAddress;
 
 public class EsShardRouting {
 
@@ -41,9 +41,8 @@ public class EsShardRouting {
         this.nodeId = nodeId;
     }
     
-    public static EsShardRouting parseShardRouting(String indexName, String 
shardKey,
-            JSONObject shardInfo, JSONObject nodesMap) {
-        String nodeId = shardInfo.getString("node");
+    public static EsShardRouting newSearchShard(String indexName, int shardId, 
boolean isPrimary,
+            String nodeId, JSONObject nodesMap) {
         JSONObject nodeInfo = nodesMap.getJSONObject(nodeId);
         String[] transportAddr = 
nodeInfo.getString("transport_address").split(":");
         // get thrift port from node info
@@ -53,9 +52,7 @@ public class EsShardRouting {
         if (!StringUtils.isEmpty(thriftPort)) {
             addr = new TNetworkAddress(transportAddr[0], 
Integer.parseInt(thriftPort));
         }
-        boolean isPrimary = shardInfo.getBoolean("primary");
-        return new EsShardRouting(indexName, Integer.parseInt(shardKey),
-                isPrimary, addr, nodeId);
+        return new EsShardRouting(indexName, shardId, isPrimary, addr, nodeId);
     }
     
     public int getShardId() {
diff --git 
a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsTablePartitions.java
 
b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsTablePartitions.java
index 5263086..c1306be 100644
--- 
a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsTablePartitions.java
+++ 
b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsTablePartitions.java
@@ -17,12 +17,6 @@
 
 package org.apache.doris.external.elasticsearch;
 
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.EsTable;
 import org.apache.doris.catalog.PartitionInfo;
@@ -31,11 +25,19 @@ import org.apache.doris.catalog.RangePartitionInfo;
 import org.apache.doris.catalog.SinglePartitionInfo;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.thrift.TNetworkAddress;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Range;
+
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import com.google.common.collect.Maps;
+import com.google.common.collect.Range;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
 /**
  * save the dynamic info parsed from es cluster state such as shard routing, 
partition info
  */
@@ -56,7 +58,7 @@ public class EsTablePartitions {
     }
 
     public static EsTablePartitions fromShardPartitions(EsTable esTable, 
EsShardPartitions shardPartitions)
-            throws ExternalDataSourceException, DdlException {
+            throws DorisEsException, DdlException {
         EsTablePartitions esTablePartitions = new EsTablePartitions();
         RangePartitionInfo partitionInfo = null;
         if (esTable.getPartitionInfo() != null) {
@@ -82,7 +84,7 @@ public class EsTablePartitions {
                 LOG.debug("begin to parse es table [{}] state from search 
shards, "
                         + "with no partition info", esTable.getName());
             } else {
-                throw new ExternalDataSourceException("es table only support 
range partition, "
+                throw new DorisEsException("es table only support range 
partition, "
                         + "but current partition type is "
                         + esTable.getPartitionInfo().getType());
             }
diff --git a/fe/src/main/java/org/apache/doris/planner/EsScanNode.java 
b/fe/src/main/java/org/apache/doris/planner/EsScanNode.java
index aa04ec6..03cfc70 100644
--- a/fe/src/main/java/org/apache/doris/planner/EsScanNode.java
+++ b/fe/src/main/java/org/apache/doris/planner/EsScanNode.java
@@ -40,6 +40,9 @@ import org.apache.doris.thrift.TScanRange;
 import org.apache.doris.thrift.TScanRangeLocation;
 import org.apache.doris.thrift.TScanRangeLocations;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -47,9 +50,6 @@ import com.google.common.collect.Multimap;
 import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
 
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -214,7 +214,7 @@ public class EsScanNode extends ScanNode {
                 // Generate on es scan range
                 TEsScanRange esScanRange = new TEsScanRange();
                 esScanRange.setEs_hosts(shardAllocations);
-                esScanRange.setIndex(indexState.getIndexName());
+                esScanRange.setIndex(shardRouting.get(0).getIndexName());
                 esScanRange.setType(table.getMappingType());
                 esScanRange.setShard_id(shardRouting.get(0).getShardId());
                 // Scan range
diff --git 
a/fe/src/test/java/org/apache/doris/external/elasticsearch/EsRepositoryTest.java
 
b/fe/src/test/java/org/apache/doris/external/elasticsearch/EsRepositoryTest.java
index a7ce961..c367ec0 100644
--- 
a/fe/src/test/java/org/apache/doris/external/elasticsearch/EsRepositoryTest.java
+++ 
b/fe/src/test/java/org/apache/doris/external/elasticsearch/EsRepositoryTest.java
@@ -91,7 +91,7 @@ public class EsRepositoryTest {
         
     }
     
-    @Test(expected = ExternalDataSourceException.class)
+    @Test(expected = DorisEsException.class)
     public void testSetErrorType() throws Exception {
         EsTable esTable = (EsTable) Catalog.getCurrentCatalog()
                 .getDb(CatalogTestUtil.testDb1)
@@ -101,7 +101,7 @@ public class EsRepositoryTest {
     }
     
     @Test
-    public void testSetTableState() throws ExternalDataSourceException, 
DdlException {
+    public void testSetTableState() throws DorisEsException, DdlException {
         EsTable esTable = (EsTable) Catalog.getCurrentCatalog()
                 .getDb(CatalogTestUtil.testDb1)
                 .getTable(CatalogTestUtil.testEsTableId1);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to