This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 65c24f114c6 [bugfix](paimon)add support for 'in' and 'not in' (#38390)
65c24f114c6 is described below
commit 65c24f114c696edc0d700f9b6fdc55a466363591
Author: wuwenchi <[email protected]>
AuthorDate: Tue Jul 30 15:57:07 2024 +0800
[bugfix](paimon)add support for 'in' and 'not in' (#38390)
## Proposed changes
add support for `in` and `not in`:
```
select * from tb where partition_column in ('a','b','c');
select * from tb where partition_column not in ('a','b','c');
```
---
.../paimon/source/PaimonPredicateConverter.java | 39 +++++++
.../datasource/paimon/source/PaimonScanNode.java | 23 +++-
.../paimon/test_paimon_predict.groovy | 127 +++++++++++++++++++++
3 files changed, 183 insertions(+), 6 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonPredicateConverter.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonPredicateConverter.java
index 605bc1b321a..9e46474898d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonPredicateConverter.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonPredicateConverter.java
@@ -21,9 +21,11 @@ import org.apache.doris.analysis.CastExpr;
import org.apache.doris.analysis.CompoundPredicate;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.FunctionCallExpr;
+import org.apache.doris.analysis.InPredicate;
import org.apache.doris.analysis.IsNullPredicate;
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.Subquery;
import org.apache.doris.thrift.TExprOpcode;
import org.apache.paimon.data.BinaryString;
@@ -85,11 +87,48 @@ public class PaimonPredicateConverter {
default:
return null;
}
+ } else if (dorisExpr instanceof InPredicate) {
+ return doInPredicate((InPredicate) dorisExpr);
} else {
return binaryExprDesc(dorisExpr);
}
}
+ private Predicate doInPredicate(InPredicate predicate) {
+ // InPredicate, only support a in (1,2,3)
+ if (predicate.contains(Subquery.class)) {
+ return null;
+ }
+
+ SlotRef slotRef = convertDorisExprToSlotRef(predicate.getChild(0));
+ if (slotRef == null) {
+ return null;
+ }
+ String colName = slotRef.getColumnName();
+ int idx = fieldNames.indexOf(colName);
+ DataType dataType = paimonFieldTypes.get(idx);
+ List<Object> valueList = new ArrayList<>();
+ for (int i = 1; i < predicate.getChildren().size(); i++) {
+ if (!(predicate.getChild(i) instanceof LiteralExpr)) {
+ return null;
+ }
+ LiteralExpr literalExpr =
convertDorisExprToLiteralExpr(predicate.getChild(i));
+ Object value = dataType.accept(new
PaimonValueConverter(literalExpr));
+ if (value == null) {
+ return null;
+ }
+ valueList.add(value);
+ }
+
+ if (predicate.isNotIn()) {
+ // not in
+ return builder.notIn(idx, valueList);
+ } else {
+ // in
+ return builder.in(idx, valueList);
+ }
+ }
+
private Predicate binaryExprDesc(Expr dorisExpr) {
TExprOpcode opcode = dorisExpr.getOpcode();
// Make sure the col slot is always first
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index aeecbd7eba2..45516fd2841 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -317,15 +317,26 @@ public class PaimonScanNode extends FileQueryScanNode {
@Override
public String getNodeExplainString(String prefix, TExplainLevel
detailLevel) {
- String result = super.getNodeExplainString(prefix, detailLevel)
- + String.format("%spaimonNativeReadSplits=%d/%d\n",
- prefix, rawFileSplitNum, (paimonSplitNum +
rawFileSplitNum));
+ StringBuilder sb = new
StringBuilder(super.getNodeExplainString(prefix, detailLevel));
+ sb.append(String.format("%spaimonNativeReadSplits=%d/%d\n",
+ prefix, rawFileSplitNum, (paimonSplitNum + rawFileSplitNum)));
+
+ sb.append(prefix).append("predicatesFromPaimon:");
+ if (predicates.isEmpty()) {
+ sb.append(" NONE\n");
+ } else {
+ sb.append("\n");
+ for (Predicate predicate : predicates) {
+
sb.append(prefix).append(prefix).append(predicate).append("\n");
+ }
+ }
+
if (detailLevel == TExplainLevel.VERBOSE) {
- result += prefix + "PaimonSplitStats: \n";
+ sb.append(prefix).append("PaimonSplitStats: \n");
for (SplitStat splitStat : splitStats) {
- result += String.format("%s %s\n", prefix, splitStat);
+ sb.append(String.format("%s %s\n", prefix, splitStat));
}
}
- return result;
+ return sb.toString();
}
}
diff --git
a/regression-test/suites/external_table_p0/paimon/test_paimon_predict.groovy
b/regression-test/suites/external_table_p0/paimon/test_paimon_predict.groovy
new file mode 100644
index 00000000000..6f07ae1db8e
--- /dev/null
+++ b/regression-test/suites/external_table_p0/paimon/test_paimon_predict.groovy
@@ -0,0 +1,127 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_paimon_predict",
"p0,external,doris,external_docker,external_docker_doris") {
+ String enabled = context.config.otherConfigs.get("enablePaimonTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("disable paimon test")
+ return
+ }
+
+ String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
+ String catalog_name = "test_paimon_predict"
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+
+ sql """drop catalog if exists ${catalog_name}"""
+ sql """
+ CREATE CATALOG ${catalog_name} PROPERTIES (
+ 'type' = 'paimon',
+ 'warehouse' = 's3://warehouse/wh',
+ 's3.endpoint' = 'http://${externalEnvIp}:${minio_port}',
+ 's3.access_key' = 'admin',
+ 's3.secret_key' = 'password',
+ 's3.path.style.access' = 'true'
+ );
+ """
+ sql """use `${catalog_name}`.`spark_paimon`"""
+
+ explain {
+ sql("select * from predict_for_in")
+ contains("inputSplitNum=9")
+ }
+
+ def explain_one_column = { col_name ->
+
+ explain {
+ sql("select * from predict_for_in where ${col_name} in ('a')")
+ contains("inputSplitNum=3")
+ }
+
+ explain {
+ sql("select * from predict_for_in where ${col_name} in ('b')")
+ contains("inputSplitNum=3")
+ }
+
+ explain {
+ sql("select * from predict_for_in where ${col_name} in ('a','b')")
+ contains("inputSplitNum=6")
+ }
+
+ explain {
+ sql("select * from predict_for_in where ${col_name} in ('a','x')")
+ contains("inputSplitNum=3")
+ }
+
+ explain {
+ sql("select * from predict_for_in where ${col_name} in ('x','y')")
+ contains("inputSplitNum=0")
+ }
+
+ explain {
+ sql("select * from predict_for_in where ${col_name} in
('a','b','c')")
+ contains("inputSplitNum=9")
+ }
+
+ explain {
+ sql("select * from predict_for_in where ${col_name} in
('y','x','a','c')")
+ contains("inputSplitNum=6")
+ }
+
+ explain {
+ sql("select * from predict_for_in where ${col_name} not in
('y','x','a','c')")
+ contains("inputSplitNum=3")
+ }
+
+ explain {
+ sql("select * from predict_for_in where ${col_name} not in ('a')")
+ contains("inputSplitNum=6")
+ }
+
+ explain {
+ sql("select * from predict_for_in where ${col_name} not in ('x')")
+ contains("inputSplitNum=9")
+ }
+ }
+
+ explain_one_column('dt')
+ explain_one_column('hh')
+
+
+ sql """ drop catalog if exists ${catalog_name} """
+}
+
+
+/*
+
+for spark:
+
+create table predict_for_in(id int, dt string, hh string) partitioned
by(dt,hh);
+
+insert into predict_for_in values (1, 'a', 'a');
+insert into predict_for_in values (2, 'a', 'b');
+insert into predict_for_in values (3, 'a', 'c');
+
+insert into predict_for_in values (4, 'b', 'a');
+insert into predict_for_in values (5, 'b', 'b');
+insert into predict_for_in values (6, 'b', 'c');
+
+insert into predict_for_in values (7, 'c', 'a');
+insert into predict_for_in values (8, 'c', 'b');
+insert into predict_for_in values (9, 'c', 'c');
+
+*/
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]