This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 68b9a2936a [improvement](doe) Step1: Fe generates the DSL and is used
to explain (#9895)
68b9a2936a is described below
commit 68b9a2936aeb749d344495f6eea4afc1d759bd28
Author: Stalary <[email protected]>
AuthorDate: Mon Jul 18 23:20:58 2022 +0800
[improvement](doe) Step1: Fe generates the DSL and is used to explain
(#9895)
For the first step, I will only change FE and then change BE once I make
sure the DSL is ok.
---
.../java/org/apache/doris/catalog/Catalog.java | 1 -
.../java/org/apache/doris/catalog/EsTable.java | 51 +----
.../doris/external/elasticsearch/EsUrls.java | 37 ++++
.../doris/external/elasticsearch/EsUtil.java | 233 +++++++++++++++++++++
.../external/elasticsearch/PartitionPhase.java | 6 +-
.../external/elasticsearch/QueryBuilders.java | 65 +++++-
.../java/org/apache/doris/planner/EsScanNode.java | 88 +++++---
.../doris/external/elasticsearch/EsUtilTest.java | 163 ++++++++++++--
fe/pom.xml | 2 +-
9 files changed, 547 insertions(+), 99 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index 2b52760400..fbf341c029 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -2967,7 +2967,6 @@ public class Catalog {
if (esTable.getMappingType() != null) {
sb.append("\"type\" =
\"").append(esTable.getMappingType()).append("\",\n");
}
- sb.append("\"transport\" =
\"").append(esTable.getTransport()).append("\",\n");
sb.append("\"enable_docvalue_scan\" =
\"").append(esTable.isDocValueScanEnable()).append("\",\n");
sb.append("\"max_docvalue_fields\" =
\"").append(esTable.maxDocValueFields()).append("\",\n");
sb.append("\"enable_keyword_sniff\" =
\"").append(esTable.isKeywordSniffEnable()).append("\",\n");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java
index b03ce6d0ae..ba39f2d4d2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java
@@ -60,12 +60,15 @@ public class EsTable extends Table {
public static final String VERSION = "version";
public static final String DOC_VALUES_MODE = "doc_values_mode";
- public static final String TRANSPORT_HTTP = "http";
public static final String DOC_VALUE_SCAN = "enable_docvalue_scan";
public static final String KEYWORD_SNIFF = "enable_keyword_sniff";
public static final String MAX_DOCVALUE_FIELDS = "max_docvalue_fields";
public static final String NODES_DISCOVERY = "nodes_discovery";
public static final String HTTP_SSL_ENABLED = "http_ssl_enabled";
+ public static final String ES_DSL = "es_dsl";
+ public static final String INIT_SCROLL_URL = "init_scroll_url";
+ public static final String NEXT_SCROLL_URL = "next_scroll_url";
+ public static final String SEARCH_URL = "search_url";
private static final Logger LOG = LogManager.getLogger(EsTable.class);
@@ -94,8 +97,6 @@ public class EsTable extends Table {
// which type used for `indexName`
private String mappingType = null;
- // only support http
- private String transport = "http";
// only save the partition definition, save the partition key,
// partition list is got from es cluster dynamically and is saved in
esTableState
private PartitionInfo partitionInfo;
@@ -263,7 +264,6 @@ public class EsTable extends Table {
if (mappingType != null) {
tableContext.put("mappingType", mappingType);
}
- tableContext.put("transport", transport);
if (majorVersion != null) {
tableContext.put("majorVersion", majorVersion.toString());
}
@@ -296,7 +296,6 @@ public class EsTable extends Table {
if (mappingType != null) {
sb.append(mappingType);
}
- sb.append(transport);
} else {
for (Map.Entry<String, String> entry : tableContext.entrySet()) {
sb.append(entry.getKey());
@@ -335,7 +334,6 @@ public class EsTable extends Table {
passwd = tableContext.get("passwd");
indexName = tableContext.get("indexName");
mappingType = tableContext.get("mappingType");
- transport = tableContext.get("transport");
if (tableContext.containsKey("majorVersion")) {
try {
majorVersion =
EsMajorVersion.parse(tableContext.get("majorVersion"));
@@ -402,10 +400,6 @@ public class EsTable extends Table {
return mappingType;
}
- public String getTransport() {
- return transport;
- }
-
public PartitionInfo getPartitionInfo() {
return partitionInfo;
}
@@ -464,7 +458,7 @@ public class EsTable extends Table {
JSONObject field = (JSONObject) mappingProps.get(key);
// Complex types are not currently supported.
if (field.containsKey("type")) {
- Type type = toDorisType(field.get("type").toString());
+ Type type = EsUtil.toDorisType(field.get("type").toString());
if (!type.isInvalid()) {
Column column = new Column();
column.setName(key);
@@ -477,39 +471,4 @@ public class EsTable extends Table {
}
return columns;
}
-
- private Type toDorisType(String esType) {
- // reference
https://www.elastic.co/guide/en/elasticsearch/reference/8.3/sql-data-types.html
- switch (esType) {
- case "null":
- return Type.NULL;
- case "boolean":
- return Type.BOOLEAN;
- case "byte":
- return Type.TINYINT;
- case "short":
- return Type.SMALLINT;
- case "integer":
- return Type.INT;
- case "long":
- case "unsigned_long":
- return Type.BIGINT;
- case "float":
- case "half_float":
- return Type.FLOAT;
- case "double":
- case "scaled_float":
- return Type.DOUBLE;
- case "keyword":
- case "text":
- case "ip":
- case "nested":
- case "object":
- return Type.STRING;
- case "date":
- return Type.DATE;
- default:
- return Type.INVALID;
- }
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUrls.java
b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUrls.java
new file mode 100644
index 0000000000..0df1effa4b
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUrls.java
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.external.elasticsearch;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Pack url.
+ **/
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class EsUrls {
+
+ private String searchUrl;
+
+ private String initScrollUrl;
+
+ private String nextScrollUrl;
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java
index f6800d1c3c..2b098a669a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsUtil.java
@@ -17,20 +17,41 @@
package org.apache.doris.external.elasticsearch;
+import org.apache.doris.analysis.BinaryPredicate;
+import org.apache.doris.analysis.BoolLiteral;
+import org.apache.doris.analysis.CompoundPredicate;
+import org.apache.doris.analysis.DateLiteral;
+import org.apache.doris.analysis.DecimalLiteral;
import org.apache.doris.analysis.DistributionDesc;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.FloatLiteral;
+import org.apache.doris.analysis.FunctionCallExpr;
+import org.apache.doris.analysis.InPredicate;
+import org.apache.doris.analysis.IntLiteral;
+import org.apache.doris.analysis.IsNullPredicate;
+import org.apache.doris.analysis.LargeIntLiteral;
+import org.apache.doris.analysis.LikePredicate;
+import org.apache.doris.analysis.LikePredicate.Operator;
import org.apache.doris.analysis.PartitionDesc;
import org.apache.doris.analysis.RangePartitionDesc;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.EsTable;
+import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
+import org.apache.doris.external.elasticsearch.QueryBuilders.QueryBuilder;
+import org.apache.doris.thrift.TExprOpcode;
import org.apache.commons.lang3.StringUtils;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
/**
* Util for ES, some static method.
@@ -210,4 +231,216 @@ public class EsUtil {
searchContext.docValueFieldsContext().put(colName, docValueField);
}
}
+
+ private static QueryBuilder toCompoundEsDsl(Expr expr) {
+ CompoundPredicate compoundPredicate = (CompoundPredicate) expr;
+ switch (compoundPredicate.getOp()) {
+ case AND: {
+ QueryBuilder left = toEsDsl(compoundPredicate.getChild(0));
+ QueryBuilder right = toEsDsl(compoundPredicate.getChild(1));
+ if (left != null && right != null) {
+ return QueryBuilders.boolQuery().must(left).must(right);
+ }
+ return null;
+ }
+ case OR: {
+ QueryBuilder left = toEsDsl(compoundPredicate.getChild(0));
+ QueryBuilder right = toEsDsl(compoundPredicate.getChild(1));
+ if (left != null && right != null) {
+ return
QueryBuilders.boolQuery().should(left).should(right);
+ }
+ return null;
+ }
+ case NOT: {
+ QueryBuilder child = toEsDsl(compoundPredicate.getChild(0));
+ if (child != null) {
+ return QueryBuilders.boolQuery().mustNot(child);
+ }
+ return null;
+ }
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Doris expr to es dsl.
+ **/
+ public static QueryBuilder toEsDsl(Expr expr) {
+ if (expr == null) {
+ return null;
+ }
+ // CompoundPredicate, `between` also converted to CompoundPredicate.
+ if (expr instanceof CompoundPredicate) {
+ return toCompoundEsDsl(expr);
+ }
+ TExprOpcode opCode = expr.getOpcode();
+ String column = ((SlotRef) expr.getChild(0)).getColumnName();
+ if (expr instanceof BinaryPredicate) {
+ Object value = toDorisLiteral(expr.getChild(1));
+ switch (opCode) {
+ case EQ:
+ case EQ_FOR_NULL:
+ return QueryBuilders.termQuery(column, value);
+ case NE:
+ return
QueryBuilders.boolQuery().mustNot(QueryBuilders.termQuery(column, value));
+ case GE:
+ return QueryBuilders.rangeQuery(column).gte(value);
+ case GT:
+ return QueryBuilders.rangeQuery(column).gt(value);
+ case LE:
+ return QueryBuilders.rangeQuery(column).lte(value);
+ case LT:
+ return QueryBuilders.rangeQuery(column).lt(value);
+ default:
+ return null;
+ }
+ }
+ if (expr instanceof IsNullPredicate) {
+ IsNullPredicate isNullPredicate = (IsNullPredicate) expr;
+ if (isNullPredicate.isNotNull()) {
+ return QueryBuilders.existsQuery(column);
+ }
+ return
QueryBuilders.boolQuery().mustNot(QueryBuilders.existsQuery(column));
+ }
+ if (expr instanceof LikePredicate) {
+ LikePredicate likePredicate = (LikePredicate) expr;
+ if (likePredicate.getOp().equals(Operator.LIKE)) {
+ char[] chars =
likePredicate.getChild(1).getStringValue().toCharArray();
+ // example of translation :
+ // abc_123 ===> abc?123
+ // abc%ykz ===> abc*123
+ // %abc123 ===> *abc123
+ // _abc123 ===> ?abc123
+ // \\_abc1 ===> \\_abc1
+ // abc\\_123 ===> abc\\_123
+ // abc\\%123 ===> abc\\%123
+ // NOTE. user must input sql like 'abc\\_123' or 'abc\\%ykz'
+ for (int i = 0; i < chars.length; i++) {
+ if (chars[i] == '_' || chars[i] == '%') {
+ if (i == 0) {
+ chars[i] = (chars[i] == '_') ? '?' : '*';
+ } else if (chars[i - 1] != '\\') {
+ chars[i] = (chars[i] == '_') ? '?' : '*';
+ }
+ }
+ }
+ return QueryBuilders.wildcardQuery(column, new String(chars));
+ } else {
+ return QueryBuilders.wildcardQuery(column,
likePredicate.getChild(1).getStringValue());
+ }
+ }
+ if (expr instanceof InPredicate) {
+ InPredicate inPredicate = (InPredicate) expr;
+ List<Object> values =
inPredicate.getListChildren().stream().map(EsUtil::toDorisLiteral)
+ .collect(Collectors.toList());
+ if (inPredicate.isNotIn()) {
+ return
QueryBuilders.boolQuery().mustNot(QueryBuilders.termsQuery(column, values));
+ }
+ return QueryBuilders.termsQuery(column, values);
+ }
+ if (expr instanceof FunctionCallExpr) {
+ FunctionCallExpr functionCallExpr = (FunctionCallExpr) expr;
+ if ("esquery".equals(functionCallExpr.getFnName().getFunction())) {
+ String stringValue =
functionCallExpr.getChild(1).getStringValue();
+ return new QueryBuilders.EsQueryBuilder(stringValue);
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Transfer es type to doris type.
+ **/
+ public static Type toDorisType(String esType) {
+ // reference
https://www.elastic.co/guide/en/elasticsearch/reference/8.3/sql-data-types.html
+ switch (esType) {
+ case "null":
+ return Type.NULL;
+ case "boolean":
+ return Type.BOOLEAN;
+ case "byte":
+ return Type.TINYINT;
+ case "short":
+ return Type.SMALLINT;
+ case "integer":
+ return Type.INT;
+ case "long":
+ case "unsigned_long":
+ return Type.BIGINT;
+ case "float":
+ case "half_float":
+ return Type.FLOAT;
+ case "double":
+ case "scaled_float":
+ return Type.DOUBLE;
+ case "keyword":
+ case "text":
+ case "ip":
+ case "nested":
+ case "object":
+ return Type.STRING;
+ case "date":
+ return Type.DATE;
+ default:
+ return Type.INVALID;
+ }
+ }
+
+ private static Object toDorisLiteral(Expr expr) {
+ if (!expr.isLiteral()) {
+ return null;
+ }
+ if (expr instanceof BoolLiteral) {
+ BoolLiteral boolLiteral = (BoolLiteral) expr;
+ return boolLiteral.getValue();
+ } else if (expr instanceof DateLiteral) {
+ DateLiteral dateLiteral = (DateLiteral) expr;
+ return dateLiteral.getLongValue();
+ } else if (expr instanceof DecimalLiteral) {
+ DecimalLiteral decimalLiteral = (DecimalLiteral) expr;
+ return decimalLiteral.getValue();
+ } else if (expr instanceof FloatLiteral) {
+ FloatLiteral floatLiteral = (FloatLiteral) expr;
+ return floatLiteral.getValue();
+ } else if (expr instanceof IntLiteral) {
+ IntLiteral intLiteral = (IntLiteral) expr;
+ return intLiteral.getValue();
+ } else if (expr instanceof LargeIntLiteral) {
+ LargeIntLiteral largeIntLiteral = (LargeIntLiteral) expr;
+ return largeIntLiteral.getLongValue();
+ } else if (expr instanceof StringLiteral) {
+ StringLiteral stringLiteral = (StringLiteral) expr;
+ return stringLiteral.getStringValue();
+ }
+ return null;
+ }
+
+ /**
+ * Generate url for be to query es.
+ **/
+ public static EsUrls genEsUrls(String index, String type, boolean
docValueMode, long limit, long batchSize) {
+ String filterPath = docValueMode ?
"filter_path=_scroll_id,hits.total,hits.hits._score,hits.hits.fields"
+ :
"filter_path=_scroll_id,hits.hits._source,hits.total,hits.hits._id";
+ if (limit <= 0) {
+ StringBuilder initScrollUrl = new StringBuilder();
+ StringBuilder nextScrollUrl = new StringBuilder();
+ initScrollUrl.append("/").append(index);
+ if (StringUtils.isNotBlank(type)) {
+ initScrollUrl.append("/").append(type);
+ }
+
initScrollUrl.append("/_search?").append(filterPath).append("&terminate_after=")
+ .append(batchSize);
+ nextScrollUrl.append("/_search/scroll?").append(filterPath);
+ return new EsUrls(null, initScrollUrl.toString(),
nextScrollUrl.toString());
+ } else {
+ StringBuilder searchUrl = new StringBuilder();
+ searchUrl.append("/").append(index);
+ if (StringUtils.isNotBlank(type)) {
+ searchUrl.append("/").append(type);
+ }
+
searchUrl.append("/_search?terminate_after=").append(limit).append("&").append(filterPath);
+ return new EsUrls(searchUrl.toString(), null, null);
+ }
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java
b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java
index bb5d416f56..f13db98fb5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/PartitionPhase.java
@@ -17,8 +17,6 @@
package org.apache.doris.external.elasticsearch;
-import org.apache.doris.catalog.EsTable;
-
import java.util.HashMap;
import java.util.Map;
@@ -53,8 +51,6 @@ public class PartitionPhase implements SearchPhase {
@Override
public void postProcess(SearchContext context) throws DorisEsException {
context.partitions(shardPartitions);
- if (EsTable.TRANSPORT_HTTP.equals(context.esTable().getTransport())) {
- context.partitions().addHttpAddress(nodesInfo);
- }
+ context.partitions().addHttpAddress(nodesInfo);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/QueryBuilders.java
b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/QueryBuilders.java
index 92aea914bd..3dd800cb91 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/QueryBuilders.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/QueryBuilders.java
@@ -18,10 +18,17 @@
package org.apache.doris.external.elasticsearch;
import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import java.io.IOException;
+import java.io.StringWriter;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map.Entry;
import java.util.Objects;
@@ -163,7 +170,11 @@ public final class QueryBuilders {
/**
* Base class to build various ES queries
*/
- abstract static class QueryBuilder {
+ public abstract static class QueryBuilder {
+
+ private static final Logger LOG =
LogManager.getLogger(QueryBuilder.class);
+
+ final ObjectMapper mapper = new ObjectMapper();
/**
* Convert query to JSON format
@@ -172,19 +183,64 @@ public final class QueryBuilders {
* @throws IOException if IO error occurred
*/
abstract void toJson(JsonGenerator out) throws IOException;
+
+ /**
+ * Convert query to JSON format and catch error.
+ **/
+ public String toJson() {
+ StringWriter writer = new StringWriter();
+ try {
+ JsonGenerator gen =
mapper.getFactory().createGenerator(writer);
+ this.toJson(gen);
+ gen.flush();
+ gen.close();
+ } catch (IOException e) {
+ LOG.warn("QueryBuilder toJson error", e);
+ return null;
+ }
+ return writer.toString();
+ }
+ }
+
+ /**
+ * Use for esquery, directly save value.
+ **/
+ public static class EsQueryBuilder extends QueryBuilder {
+
+ private final String value;
+
+ public EsQueryBuilder(String value) {
+ this.value = value;
+ }
+
+ @Override
+ void toJson(JsonGenerator out) throws IOException {
+ JsonNode jsonNode = mapper.readTree(value);
+ out.writeStartObject();
+ Iterator<Entry<String, JsonNode>> values = jsonNode.fields();
+ while (values.hasNext()) {
+ Entry<String, JsonNode> value = values.next();
+ out.writeFieldName(value.getKey());
+ out.writeObject(value.getValue());
+ }
+ out.writeEndObject();
+ }
}
/**
* A Query that matches documents matching boolean combinations of other
queries.
*/
- static class BoolQueryBuilder extends QueryBuilder {
+ public static class BoolQueryBuilder extends QueryBuilder {
private final List<QueryBuilder> mustClauses = new ArrayList<>();
private final List<QueryBuilder> mustNotClauses = new ArrayList<>();
private final List<QueryBuilder> filterClauses = new ArrayList<>();
private final List<QueryBuilder> shouldClauses = new ArrayList<>();
- BoolQueryBuilder must(QueryBuilder queryBuilder) {
+ /**
+ * Use for EsScanNode generate dsl.
+ **/
+ public BoolQueryBuilder must(QueryBuilder queryBuilder) {
Objects.requireNonNull(queryBuilder);
mustClauses.add(queryBuilder);
return this;
@@ -221,8 +277,7 @@ public final class QueryBuilders {
out.writeEndObject();
}
- private void writeJsonArray(String field, List<QueryBuilder> clauses,
JsonGenerator out)
- throws IOException {
+ private void writeJsonArray(String field, List<QueryBuilder> clauses,
JsonGenerator out) throws IOException {
if (clauses.isEmpty()) {
return;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java
index f37704fd9d..fc557e7260 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java
@@ -18,6 +18,7 @@
package org.apache.doris.planner;
import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Catalog;
@@ -30,6 +31,12 @@ import org.apache.doris.common.UserException;
import org.apache.doris.external.elasticsearch.EsShardPartitions;
import org.apache.doris.external.elasticsearch.EsShardRouting;
import org.apache.doris.external.elasticsearch.EsTablePartitions;
+import org.apache.doris.external.elasticsearch.EsUrls;
+import org.apache.doris.external.elasticsearch.EsUtil;
+import org.apache.doris.external.elasticsearch.QueryBuilders;
+import org.apache.doris.external.elasticsearch.QueryBuilders.BoolQueryBuilder;
+import org.apache.doris.external.elasticsearch.QueryBuilders.QueryBuilder;
+import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TEsScanNode;
@@ -47,6 +54,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
+import lombok.SneakyThrows;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -58,6 +66,9 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
+/**
+ * ScanNode for Elasticsearch.
+ **/
public class EsScanNode extends ScanNode {
private static final Logger LOG = LogManager.getLogger(EsScanNode.class);
@@ -68,8 +79,8 @@ public class EsScanNode extends ScanNode {
private EsTablePartitions esTablePartitions;
private List<TScanRangeLocations> shardScanRanges = Lists.newArrayList();
private EsTable table;
-
- boolean isFinalized = false;
+ private QueryBuilder queryBuilder;
+ private boolean isFinalized = false;
public EsScanNode(PlanNodeId id, TupleDescriptor desc, String
planNodeName) {
super(id, desc, planNodeName, StatisticalType.ES_SCAN_NODE);
@@ -114,9 +125,8 @@ public class EsScanNode extends ScanNode {
* return whether can use the doc_values scan
* 0 and 1 are returned to facilitate Doris BE processing
*
- * @param desc the fields needs to read from ES
+ * @param desc the fields needs to read from ES
* @param docValueContext the mapping for docvalues fields from origin
field to doc_value fields
- * @return
*/
private int useDocValueScan(TupleDescriptor desc, Map<String, String>
docValueContext) {
ArrayList<SlotDescriptor> slotDescriptors = desc.getSlots();
@@ -138,13 +148,11 @@ public class EsScanNode extends ScanNode {
return useDocValue ? 1 : 0;
}
+ @SneakyThrows
@Override
protected void toThrift(TPlanNode msg) {
- if (EsTable.TRANSPORT_HTTP.equals(table.getTransport())) {
- msg.node_type = TPlanNodeType.ES_HTTP_SCAN_NODE;
- } else {
- msg.node_type = TPlanNodeType.ES_SCAN_NODE;
- }
+ buildQuery();
+ msg.node_type = TPlanNodeType.ES_HTTP_SCAN_NODE;
Map<String, String> properties = Maps.newHashMap();
properties.put(EsTable.USER, table.getUserName());
properties.put(EsTable.PASSWORD, table.getPasswd());
@@ -155,6 +163,17 @@ public class EsScanNode extends ScanNode {
esScanNode.setDocvalueContext(table.docValueContext());
properties.put(EsTable.DOC_VALUES_MODE,
String.valueOf(useDocValueScan(desc, table.docValueContext())));
}
+ properties.put(EsTable.ES_DSL, queryBuilder.toJson());
+
+ // Be use it add es host_port and shardId to query.
+ EsUrls esUrls = EsUtil.genEsUrls(table.getIndexName(),
table.getMappingType(), table.isDocValueScanEnable(),
+ ConnectContext.get().getSessionVariable().batchSize,
msg.limit);
+ if (esUrls.getSearchUrl() != null) {
+ properties.put(EsTable.SEARCH_URL, esUrls.getSearchUrl());
+ } else {
+ properties.put(EsTable.INIT_SCROLL_URL, esUrls.getInitScrollUrl());
+ properties.put(EsTable.NEXT_SCROLL_URL, esUrls.getNextScrollUrl());
+ }
if (table.isKeywordSniffEnable() && table.fieldsContext().size() > 0) {
esScanNode.setFieldsContext(table.fieldsContext());
}
@@ -181,8 +200,8 @@ public class EsScanNode extends ScanNode {
// info is generated from es cluster state dynamically
if (esTablePartitions == null) {
if (table.getLastMetaDataSyncException() != null) {
- throw new UserException("fetch es table [" + table.getName()
- + "] metadata failure: " +
table.getLastMetaDataSyncException().getLocalizedMessage());
+ throw new UserException("fetch es table [" + table.getName() +
"] metadata failure: "
+ +
table.getLastMetaDataSyncException().getLocalizedMessage());
}
throw new UserException("EsTable metadata has not been synced, Try
it later");
}
@@ -202,10 +221,8 @@ public class EsScanNode extends ScanNode {
}
}
if (LOG.isDebugEnabled()) {
- LOG.debug("partition prune finished, unpartitioned index [{}], "
- + "partitioned index [{}]",
- String.join(",", unPartitionedIndices),
- String.join(",", partitionedIndices));
+ LOG.debug("partition prune finished, unpartitioned index [{}], " +
"partitioned index [{}]",
+ String.join(",", unPartitionedIndices), String.join(",",
partitionedIndices));
}
int size = backendList.size();
int beIndex = random.nextInt(size);
@@ -217,8 +234,7 @@ public class EsScanNode extends ScanNode {
int numBe = Math.min(3, size);
List<TNetworkAddress> shardAllocations = new ArrayList<>();
for (EsShardRouting item : shardRouting) {
-
shardAllocations.add(EsTable.TRANSPORT_HTTP.equals(table.getTransport())
- ? item.getHttpAddress() : item.getAddress());
+ shardAllocations.add(item.getHttpAddress());
}
Collections.shuffle(shardAllocations, random);
@@ -279,15 +295,13 @@ public class EsScanNode extends ScanNode {
* with one or more indices some indices could be pruned by using
partition info
* in index settings currently only support range partition setting
*
- * @param partitionInfo
- * @return
- * @throws AnalysisException
+ * @param partitionInfo partitionInfo
*/
private Collection<Long> partitionPrune(PartitionInfo partitionInfo)
throws AnalysisException {
if (partitionInfo == null) {
return null;
}
- PartitionPruner partitionPruner = null;
+ PartitionPruner partitionPruner;
switch (partitionInfo.getType()) {
case RANGE: {
RangePartitionInfo rangePartitionInfo = (RangePartitionInfo)
partitionInfo;
@@ -319,22 +333,40 @@ public class EsScanNode extends ScanNode {
}
if (!conjuncts.isEmpty()) {
- output.append(prefix).append("PREDICATES: ").append(
- getExplainString(conjuncts)).append("\n");
+ output.append(prefix).append("PREDICATES:
").append(getExplainString(conjuncts)).append("\n");
// reserved for later using: LOCAL_PREDICATES is processed by
Doris EsScanNode
output.append(prefix).append("LOCAL_PREDICATES: ").append("
").append("\n");
// reserved for later using: REMOTE_PREDICATES is processed by
remote ES Cluster
output.append(prefix).append("REMOTE_PREDICATES: ").append("
").append("\n");
- // reserved for later using: translate predicates to ES queryDSL
- output.append(prefix).append("ES_QUERY_DSL: ").append("
").append("\n");
+ buildQuery();
+ output.append(prefix).append("ES_QUERY_DSL:
").append(queryBuilder.toJson()).append("\n");
} else {
output.append(prefix).append("ES_QUERY_DSL:
").append("{\"match_all\": {}}").append("\n");
}
String indexName = table.getIndexName();
String typeName = table.getMappingType();
- output.append(prefix)
- .append(String.format("ES index/type: %s/%s", indexName,
typeName))
- .append("\n");
+ output.append(prefix).append(String.format("ES index/type: %s/%s",
indexName, typeName)).append("\n");
return output.toString();
}
+
+ private void buildQuery() {
+ if (conjuncts.isEmpty()) {
+ queryBuilder = QueryBuilders.matchAllQuery();
+ } else {
+ boolean hasFilter = false;
+ BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
+ for (Expr expr : conjuncts) {
+ QueryBuilder queryBuilder = EsUtil.toEsDsl(expr);
+ if (queryBuilder != null) {
+ hasFilter = true;
+ boolQueryBuilder.must(queryBuilder);
+ }
+ }
+ if (!hasFilter) {
+ queryBuilder = QueryBuilders.matchAllQuery();
+ } else {
+ queryBuilder = boolQueryBuilder;
+ }
+ }
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/EsUtilTest.java
b/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/EsUtilTest.java
index ca7608923b..93fc870db4 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/EsUtilTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/external/elasticsearch/EsUtilTest.java
@@ -17,6 +17,17 @@
package org.apache.doris.external.elasticsearch;
+import org.apache.doris.analysis.BinaryPredicate;
+import org.apache.doris.analysis.BinaryPredicate.Operator;
+import org.apache.doris.analysis.CompoundPredicate;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.FunctionCallExpr;
+import org.apache.doris.analysis.InPredicate;
+import org.apache.doris.analysis.IntLiteral;
+import org.apache.doris.analysis.IsNullPredicate;
+import org.apache.doris.analysis.LikePredicate;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.EsTable;
import org.apache.doris.catalog.PrimitiveType;
@@ -29,6 +40,7 @@ import org.json.simple.JSONValue;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
import java.util.ArrayList;
import java.util.List;
@@ -40,22 +52,14 @@ public class EsUtilTest extends EsTestCase {
private List<Column> columns = new ArrayList<>();
- private String jsonStr = "{\"settings\": {\n"
- + " \"index\": {\n"
- + " \"bpack\": {\n"
- + " \"partition\": {\n"
- + " \"upperbound\": \"12\"\n"
- + " }\n"
- + " },\n"
- + " \"number_of_shards\": \"5\",\n"
+ private String jsonStr = "{\"settings\": {\n" + " \"index\":
{\n" + " \"bpack\": {\n"
+ + " \"partition\": {\n" + "
\"upperbound\": \"12\"\n"
+ + " }\n" + " },\n" + "
\"number_of_shards\": \"5\",\n"
+ " \"provided_name\": \"indexa\",\n"
+ " \"creation_date\": \"1539328532060\",\n"
+ " \"number_of_replicas\": \"1\",\n"
- + " \"uuid\": \"plNNtKiiQ9-n6NpNskFzhQ\",\n"
- + " \"version\": {\n"
- + " \"created\": \"5050099\"\n"
- + " }\n"
- + " }\n"
+ + " \"uuid\": \"plNNtKiiQ9-n6NpNskFzhQ\",\n" + "
\"version\": {\n"
+ + " \"created\": \"5050099\"\n" + "
}\n" + " }\n"
+ " }}";
/**
@@ -153,4 +157,137 @@ public class EsUtilTest extends EsTestCase {
EsUtil.getJsonObject(json,
"settings.index.bpack.partition.upperbound", 0);
}
+ @Test
+ public void testBinaryPredicateConvertEsDsl() {
+ SlotRef k1 = new SlotRef(null, "k1");
+ IntLiteral intLiteral = new IntLiteral(3);
+ Expr eqExpr = new BinaryPredicate(Operator.EQ, k1, intLiteral);
+ Expr neExpr = new BinaryPredicate(Operator.NE, k1, intLiteral);
+ Expr leExpr = new BinaryPredicate(Operator.LE, k1, intLiteral);
+ Expr geExpr = new BinaryPredicate(Operator.GE, k1, intLiteral);
+ Expr ltExpr = new BinaryPredicate(Operator.LT, k1, intLiteral);
+ Expr gtExpr = new BinaryPredicate(Operator.GT, k1, intLiteral);
+ Expr efnExpr = new BinaryPredicate(Operator.EQ_FOR_NULL, new
SlotRef(null, "k1"), new IntLiteral(3));
+ Assert.assertEquals("{\"term\":{\"k1\":3}}",
EsUtil.toEsDsl(eqExpr).toJson());
+ Assert.assertEquals("{\"bool\":{\"must_not\":{\"term\":{\"k1\":3}}}}",
EsUtil.toEsDsl(neExpr).toJson());
+ Assert.assertEquals("{\"range\":{\"k1\":{\"lte\":3}}}",
EsUtil.toEsDsl(leExpr).toJson());
+ Assert.assertEquals("{\"range\":{\"k1\":{\"gte\":3}}}",
EsUtil.toEsDsl(geExpr).toJson());
+ Assert.assertEquals("{\"range\":{\"k1\":{\"lt\":3}}}",
EsUtil.toEsDsl(ltExpr).toJson());
+ Assert.assertEquals("{\"range\":{\"k1\":{\"gt\":3}}}",
EsUtil.toEsDsl(gtExpr).toJson());
+ Assert.assertEquals("{\"term\":{\"k1\":3}}",
EsUtil.toEsDsl(efnExpr).toJson());
+ }
+
+ @Test
+ public void testCompoundPredicateConvertEsDsl() {
+ SlotRef k1 = new SlotRef(null, "k1");
+ IntLiteral intLiteral1 = new IntLiteral(3);
+ SlotRef k2 = new SlotRef(null, "k2");
+ IntLiteral intLiteral2 = new IntLiteral(5);
+ BinaryPredicate binaryPredicate1 = new BinaryPredicate(Operator.EQ,
k1, intLiteral1);
+ BinaryPredicate binaryPredicate2 = new BinaryPredicate(Operator.GT,
k2, intLiteral2);
+ CompoundPredicate andPredicate = new
CompoundPredicate(CompoundPredicate.Operator.AND, binaryPredicate1,
+ binaryPredicate2);
+ CompoundPredicate orPredicate = new
CompoundPredicate(CompoundPredicate.Operator.OR, binaryPredicate1,
+ binaryPredicate2);
+ CompoundPredicate notPredicate = new
CompoundPredicate(CompoundPredicate.Operator.NOT, binaryPredicate1, null);
+
Assert.assertEquals("{\"bool\":{\"must\":[{\"term\":{\"k1\":3}},{\"range\":{\"k2\":{\"gt\":5}}}]}}",
+ EsUtil.toEsDsl(andPredicate).toJson());
+
Assert.assertEquals("{\"bool\":{\"should\":[{\"term\":{\"k1\":3}},{\"range\":{\"k2\":{\"gt\":5}}}]}}",
+ EsUtil.toEsDsl(orPredicate).toJson());
+ Assert.assertEquals("{\"bool\":{\"must_not\":{\"term\":{\"k1\":3}}}}",
EsUtil.toEsDsl(notPredicate).toJson());
+ }
+
+ @Test
+ public void testIsNullPredicateConvertEsDsl() {
+ SlotRef k1 = new SlotRef(null, "k1");
+ IsNullPredicate isNullPredicate = new IsNullPredicate(k1, false);
+ IsNullPredicate isNotNullPredicate = new IsNullPredicate(k1, true);
+
Assert.assertEquals("{\"bool\":{\"must_not\":{\"exists\":{\"field\":\"k1\"}}}}",
+ EsUtil.toEsDsl(isNullPredicate).toJson());
+ Assert.assertEquals("{\"exists\":{\"field\":\"k1\"}}",
EsUtil.toEsDsl(isNotNullPredicate).toJson());
+ }
+
+ @Test
+ public void testLikePredicateConvertEsDsl() {
+ SlotRef k1 = new SlotRef(null, "k1");
+ StringLiteral stringLiteral1 = new StringLiteral("%1%");
+ StringLiteral stringLiteral2 = new StringLiteral("*1*");
+ StringLiteral stringLiteral3 = new StringLiteral("1_2");
+ LikePredicate likePredicate1 = new
LikePredicate(LikePredicate.Operator.LIKE, k1, stringLiteral1);
+ LikePredicate regexPredicate = new
LikePredicate(LikePredicate.Operator.REGEXP, k1, stringLiteral2);
+ LikePredicate likePredicate2 = new
LikePredicate(LikePredicate.Operator.LIKE, k1, stringLiteral3);
+ Assert.assertEquals("{\"wildcard\":{\"k1\":\"*1*\"}}",
EsUtil.toEsDsl(likePredicate1).toJson());
+ Assert.assertEquals("{\"wildcard\":{\"k1\":\"*1*\"}}",
EsUtil.toEsDsl(regexPredicate).toJson());
+ Assert.assertEquals("{\"wildcard\":{\"k1\":\"1?2\"}}",
EsUtil.toEsDsl(likePredicate2).toJson());
+ }
+
+ @Test
+ public void testInPredicateConvertEsDsl() {
+ SlotRef k1 = new SlotRef(null, "k1");
+ IntLiteral intLiteral1 = new IntLiteral(3);
+ IntLiteral intLiteral2 = new IntLiteral(5);
+ List<Expr> intLiterals = new ArrayList<>();
+ intLiterals.add(intLiteral1);
+ intLiterals.add(intLiteral2);
+ InPredicate isInPredicate = new InPredicate(k1, intLiterals, false);
+ InPredicate isNotInPredicate = new InPredicate(k1, intLiterals, true);
+ Assert.assertEquals("{\"terms\":{\"k1\":[3,5]}}",
EsUtil.toEsDsl(isInPredicate).toJson());
+
Assert.assertEquals("{\"bool\":{\"must_not\":{\"terms\":{\"k1\":[3,5]}}}}",
+ EsUtil.toEsDsl(isNotInPredicate).toJson());
+ }
+
+ @Test
+ public void testFunctionCallConvertEsDsl() {
+ SlotRef k1 = new SlotRef(null, "k1");
+ String str = "{\"bool\":{\"must_not\":{\"terms\":{\"k1\":[3,5]}}}}";
+ StringLiteral stringLiteral = new StringLiteral(str);
+ List<Expr> exprs = new ArrayList<>();
+ exprs.add(k1);
+ exprs.add(stringLiteral);
+ FunctionCallExpr functionCallExpr = new FunctionCallExpr("esquery",
exprs);
+ Assert.assertEquals(str, EsUtil.toEsDsl(functionCallExpr).toJson());
+
+ SlotRef k2 = new SlotRef(null, "k2");
+ IntLiteral intLiteral = new IntLiteral(5);
+ BinaryPredicate binaryPredicate = new BinaryPredicate(Operator.EQ, k2,
intLiteral);
+ CompoundPredicate compoundPredicate = new
CompoundPredicate(CompoundPredicate.Operator.AND, binaryPredicate,
+ functionCallExpr);
+ Assert.assertEquals(
+
"{\"bool\":{\"must\":[{\"term\":{\"k2\":5}},{\"bool\":{\"must_not\":{\"terms\":{\"k1\":[3,5]}}}}]}}",
+ EsUtil.toEsDsl(compoundPredicate).toJson());
+ }
+
+ @Test
+ public void testGenEsUrls() {
+ EsUrls typeLimit = EsUtil.genEsUrls("test", "_doc", false, 10, 1024);
+ Assertions.assertEquals(
+
"/test/_doc/_search?terminate_after=10&filter_path=_scroll_id,hits.hits._source,hits.total,hits.hits._id",
+ typeLimit.getSearchUrl());
+ Assertions.assertNull(typeLimit.getInitScrollUrl());
+ Assertions.assertNull(typeLimit.getNextScrollUrl());
+
+ Assertions.assertEquals(
+
"/test/_search?terminate_after=10&filter_path=_scroll_id,hits.hits._source,hits.total,hits.hits._id",
+ EsUtil.genEsUrls("test", null, false, 10,
1024).getSearchUrl());
+
+ EsUrls typeNoLimit = EsUtil.genEsUrls("test", "_doc", false, -1, 1024);
+ Assertions.assertEquals(
+
"/test/_doc/_search?filter_path=_scroll_id,hits.hits._source,hits.total,hits.hits._id&terminate_after=1024",
+ typeNoLimit.getInitScrollUrl());
+
Assertions.assertEquals("/_search/scroll?filter_path=_scroll_id,hits.hits._source,hits.total,hits.hits._id",
+ typeNoLimit.getNextScrollUrl());
+ Assertions.assertNull(typeNoLimit.getSearchUrl());
+
+ EsUrls noTypeNoLimit = EsUtil.genEsUrls("test", null, false, -1, 2048);
+ Assertions.assertEquals(
+
"/test/_search?filter_path=_scroll_id,hits.hits._source,hits.total,hits.hits._id&terminate_after=2048",
+ noTypeNoLimit.getInitScrollUrl());
+
Assertions.assertEquals("/_search/scroll?filter_path=_scroll_id,hits.hits._source,hits.total,hits.hits._id",
+ noTypeNoLimit.getNextScrollUrl());
+
+ EsUrls docValueTypeLimit = EsUtil.genEsUrls("test", "_doc", true, 100,
1024);
+ Assertions.assertEquals(
+
"/test/_doc/_search?terminate_after=100&filter_path=_scroll_id,hits.total,hits.hits._score,hits.hits.fields",
+ docValueTypeLimit.getSearchUrl());
+ }
}
diff --git a/fe/pom.xml b/fe/pom.xml
index 0c3be8cca1..cb4abf93ec 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -230,7 +230,7 @@ under the License.
<commons-collections.version>3.2.2</commons-collections.version>
<scala.version>2.12.10</scala.version>
<kryo.version>4.0.2</kryo.version>
- <lombok.version>1.18.16</lombok.version>
+ <lombok.version>1.18.24</lombok.version>
<tree-printer.version>1.2</tree-printer.version>
<hamcrest.version>2.1</hamcrest.version>
<httpclient.version>4.5.13</httpclient.version>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]