This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 2362500 [Doris On ES] Support create table with wildcard or aliase
index (#3968)
2362500 is described below
commit 2362500e77d994b36a132808f613ef3eedc66113
Author: Yunfeng,Wu <[email protected]>
AuthorDate: Wed Jul 1 22:08:06 2020 +0800
[Doris On ES] Support create table with wildcard or aliase index (#3968)
---
...aSourceException.java => DorisEsException.java} | 4 +--
.../doris/external/elasticsearch/EsFieldInfos.java | 4 +--
.../doris/external/elasticsearch/EsRestClient.java | 4 +--
.../external/elasticsearch/EsShardPartitions.java | 33 ++++++++++++++--------
.../external/elasticsearch/EsShardRouting.java | 11 +++-----
.../external/elasticsearch/EsTablePartitions.java | 22 ++++++++-------
.../java/org/apache/doris/planner/EsScanNode.java | 8 +++---
.../external/elasticsearch/EsRepositoryTest.java | 4 +--
8 files changed, 49 insertions(+), 41 deletions(-)
diff --git
a/fe/src/main/java/org/apache/doris/external/elasticsearch/ExternalDataSourceException.java
b/fe/src/main/java/org/apache/doris/external/elasticsearch/DorisEsException.java
similarity index 89%
rename from
fe/src/main/java/org/apache/doris/external/elasticsearch/ExternalDataSourceException.java
rename to
fe/src/main/java/org/apache/doris/external/elasticsearch/DorisEsException.java
index abb6744..c1ea1f4 100644
---
a/fe/src/main/java/org/apache/doris/external/elasticsearch/ExternalDataSourceException.java
+++
b/fe/src/main/java/org/apache/doris/external/elasticsearch/DorisEsException.java
@@ -19,11 +19,11 @@ package org.apache.doris.external.elasticsearch;
import org.apache.doris.common.UserException;
-public class ExternalDataSourceException extends UserException {
+public class DorisEsException extends UserException {
private static final long serialVersionUID = 7912833584319374692L;
- public ExternalDataSourceException(String msg) {
+ public DorisEsException(String msg) {
super(msg);
}
}
diff --git
a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsFieldInfos.java
b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsFieldInfos.java
index 59d2511..5edb80b 100644
--- a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsFieldInfos.java
+++ b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsFieldInfos.java
@@ -64,7 +64,7 @@ public class EsFieldInfos {
* @return fieldsContext and docValueContext
* @throws Exception
*/
- public static EsFieldInfos fromMapping(List<Column> colList, String
indexName, String indexMapping, String docType) throws
ExternalDataSourceException {
+ public static EsFieldInfos fromMapping(List<Column> colList, String
indexName, String indexMapping, String docType) throws DorisEsException {
JSONObject jsonObject = new JSONObject(indexMapping);
// the indexName use alias takes the first mapping
Iterator<String> keys = jsonObject.keys();
@@ -103,7 +103,7 @@ public class EsFieldInfos {
properties = rootSchema.optJSONObject("properties");
}
if (properties == null) {
- throw new ExternalDataSourceException( "index[" + indexName + "]
type[" + docType + "] mapping not found for the Elasticsearch Cluster");
+ throw new DorisEsException( "index[" + indexName + "] type[" +
docType + "] mapping not found for the Elasticsearch Cluster");
}
return parseProperties(colList, properties);
}
diff --git
a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java
b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java
index adfc294..dd7f93f 100644
--- a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java
+++ b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java
@@ -95,7 +95,7 @@ public class EsRestClient {
String path = indexName + "/_mapping";
String indexMapping = execute(path);
if (indexMapping == null) {
- throw new ExternalDataSourceException( "index[" + indexName + "]
not found for the Elasticsearch Cluster");
+ throw new DorisEsException( "index[" + indexName + "] not found
for the Elasticsearch Cluster");
}
return EsFieldInfos.fromMapping(colList, indexName, indexMapping,
docType);
}
@@ -105,7 +105,7 @@ public class EsRestClient {
String path = indexName + "/_search_shards";
String searchShards = execute(path);
if (searchShards == null) {
- throw new ExternalDataSourceException( "index[" + indexName + "]
search_shards not found for the Elasticsearch Cluster");
+ throw new DorisEsException( "index[" + indexName + "]
search_shards not found for the Elasticsearch Cluster");
}
return EsShardPartitions.findShardPartitions(indexName, searchShards);
}
diff --git
a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsShardPartitions.java
b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsShardPartitions.java
index 30e12a0..5caa6c0 100644
---
a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsShardPartitions.java
+++
b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsShardPartitions.java
@@ -20,12 +20,15 @@ package org.apache.doris.external.elasticsearch;
import org.apache.doris.analysis.SingleRangePartitionDesc;
import org.apache.doris.catalog.PartitionKey;
import org.apache.doris.thrift.TNetworkAddress;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONArray;
import org.json.JSONObject;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -47,17 +50,17 @@ public class EsShardPartitions {
this.partitionDesc = null;
this.partitionKey = null;
}
-
+
/**
* Parse shardRoutings from the json
- * @param indexName indexName(alias or really name)
+ *
+ * @param indexName indexName(alias or really name)
* @param searchShards the return value of _search_shards
* @return shardRoutings is used for searching
*/
- public static EsShardPartitions findShardPartitions(String indexName,
String searchShards) throws ExternalDataSourceException {
+ public static EsShardPartitions findShardPartitions(String indexName,
String searchShards) throws DorisEsException {
EsShardPartitions indexState = new EsShardPartitions(indexName);
JSONObject jsonObject = new JSONObject(searchShards);
- JSONObject nodesMap = jsonObject.getJSONObject("nodes");
JSONArray shards = jsonObject.getJSONArray("shards");
int length = shards.length();
for (int i = 0; i < length; i++) {
@@ -65,14 +68,20 @@ public class EsShardPartitions {
JSONArray shardsArray = shards.getJSONArray(i);
int arrayLength = shardsArray.length();
for (int j = 0; j < arrayLength; j++) {
- JSONObject shard = shardsArray.getJSONObject(j);
- String shardState = shard.getString("state");
+ JSONObject indexShard = shardsArray.getJSONObject(j);
+ String shardState = indexShard.getString("state");
if ("STARTED".equalsIgnoreCase(shardState) ||
"RELOCATING".equalsIgnoreCase(shardState)) {
try {
-
singleShardRouting.add(EsShardRouting.parseShardRouting(shardState,
String.valueOf(i), shard, nodesMap));
+ singleShardRouting.add(
+ EsShardRouting.newSearchShard(
+ indexShard.getString("index"),
+ indexShard.getInt("shard"),
+ indexShard.getBoolean("primary"),
+ indexShard.getString("node"),
+ jsonObject.getJSONObject("nodes")));
} catch (Exception e) {
- throw new ExternalDataSourceException( "index[" +
indexName + "] findShardPartitions error");
- }
+ LOG.error("fetch index [{}] shard partitions failure",
indexName, e);
+ throw new DorisEsException("fetch [" + indexName + "]
shard partitions failure [" + e.getMessage() + "]"); }
}
}
if (singleShardRouting.isEmpty()) {
@@ -142,6 +151,6 @@ public class EsShardPartitions {
@Override
public String toString() {
return "EsIndexState [indexName=" + indexName + ", partitionDesc=" +
partitionDesc + ", partitionKey="
- + partitionKey + "]";
+ + partitionKey + "]";
}
}
diff --git
a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsShardRouting.java
b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsShardRouting.java
index 4f3fa2f..980a62d 100644
---
a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsShardRouting.java
+++
b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsShardRouting.java
@@ -18,10 +18,10 @@
package org.apache.doris.external.elasticsearch;
+import org.apache.doris.thrift.TNetworkAddress;
import org.apache.commons.lang.StringUtils;
import org.json.JSONObject;
-import org.apache.doris.thrift.TNetworkAddress;
public class EsShardRouting {
@@ -41,9 +41,8 @@ public class EsShardRouting {
this.nodeId = nodeId;
}
- public static EsShardRouting parseShardRouting(String indexName, String
shardKey,
- JSONObject shardInfo, JSONObject nodesMap) {
- String nodeId = shardInfo.getString("node");
+ public static EsShardRouting newSearchShard(String indexName, int shardId,
boolean isPrimary,
+ String nodeId, JSONObject nodesMap) {
JSONObject nodeInfo = nodesMap.getJSONObject(nodeId);
String[] transportAddr =
nodeInfo.getString("transport_address").split(":");
// get thrift port from node info
@@ -53,9 +52,7 @@ public class EsShardRouting {
if (!StringUtils.isEmpty(thriftPort)) {
addr = new TNetworkAddress(transportAddr[0],
Integer.parseInt(thriftPort));
}
- boolean isPrimary = shardInfo.getBoolean("primary");
- return new EsShardRouting(indexName, Integer.parseInt(shardKey),
- isPrimary, addr, nodeId);
+ return new EsShardRouting(indexName, shardId, isPrimary, addr, nodeId);
}
public int getShardId() {
diff --git
a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsTablePartitions.java
b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsTablePartitions.java
index 5263086..c1306be 100644
---
a/fe/src/main/java/org/apache/doris/external/elasticsearch/EsTablePartitions.java
+++
b/fe/src/main/java/org/apache/doris/external/elasticsearch/EsTablePartitions.java
@@ -17,12 +17,6 @@
package org.apache.doris.external.elasticsearch;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.EsTable;
import org.apache.doris.catalog.PartitionInfo;
@@ -31,11 +25,19 @@ import org.apache.doris.catalog.RangePartitionInfo;
import org.apache.doris.catalog.SinglePartitionInfo;
import org.apache.doris.common.DdlException;
import org.apache.doris.thrift.TNetworkAddress;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Range;
+
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Range;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
/**
* save the dynamic info parsed from es cluster state such as shard routing,
partition info
*/
@@ -56,7 +58,7 @@ public class EsTablePartitions {
}
public static EsTablePartitions fromShardPartitions(EsTable esTable,
EsShardPartitions shardPartitions)
- throws ExternalDataSourceException, DdlException {
+ throws DorisEsException, DdlException {
EsTablePartitions esTablePartitions = new EsTablePartitions();
RangePartitionInfo partitionInfo = null;
if (esTable.getPartitionInfo() != null) {
@@ -82,7 +84,7 @@ public class EsTablePartitions {
LOG.debug("begin to parse es table [{}] state from search
shards, "
+ "with no partition info", esTable.getName());
} else {
- throw new ExternalDataSourceException("es table only support
range partition, "
+ throw new DorisEsException("es table only support range
partition, "
+ "but current partition type is "
+ esTable.getPartitionInfo().getType());
}
diff --git a/fe/src/main/java/org/apache/doris/planner/EsScanNode.java
b/fe/src/main/java/org/apache/doris/planner/EsScanNode.java
index aa04ec6..03cfc70 100644
--- a/fe/src/main/java/org/apache/doris/planner/EsScanNode.java
+++ b/fe/src/main/java/org/apache/doris/planner/EsScanNode.java
@@ -40,6 +40,9 @@ import org.apache.doris.thrift.TScanRange;
import org.apache.doris.thrift.TScanRangeLocation;
import org.apache.doris.thrift.TScanRangeLocations;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -47,9 +50,6 @@ import com.google.common.collect.Multimap;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -214,7 +214,7 @@ public class EsScanNode extends ScanNode {
// Generate on es scan range
TEsScanRange esScanRange = new TEsScanRange();
esScanRange.setEs_hosts(shardAllocations);
- esScanRange.setIndex(indexState.getIndexName());
+ esScanRange.setIndex(shardRouting.get(0).getIndexName());
esScanRange.setType(table.getMappingType());
esScanRange.setShard_id(shardRouting.get(0).getShardId());
// Scan range
diff --git
a/fe/src/test/java/org/apache/doris/external/elasticsearch/EsRepositoryTest.java
b/fe/src/test/java/org/apache/doris/external/elasticsearch/EsRepositoryTest.java
index a7ce961..c367ec0 100644
---
a/fe/src/test/java/org/apache/doris/external/elasticsearch/EsRepositoryTest.java
+++
b/fe/src/test/java/org/apache/doris/external/elasticsearch/EsRepositoryTest.java
@@ -91,7 +91,7 @@ public class EsRepositoryTest {
}
- @Test(expected = ExternalDataSourceException.class)
+ @Test(expected = DorisEsException.class)
public void testSetErrorType() throws Exception {
EsTable esTable = (EsTable) Catalog.getCurrentCatalog()
.getDb(CatalogTestUtil.testDb1)
@@ -101,7 +101,7 @@ public class EsRepositoryTest {
}
@Test
- public void testSetTableState() throws ExternalDataSourceException,
DdlException {
+ public void testSetTableState() throws DorisEsException, DdlException {
EsTable esTable = (EsTable) Catalog.getCurrentCatalog()
.getDb(CatalogTestUtil.testDb1)
.getTable(CatalogTestUtil.testEsTableId1);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]