This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 9e01c84e76 [Improve][Connector-V2] Support like predicate pushdown in
paimon (#9653)
9e01c84e76 is described below
commit 9e01c84e7675758e090c2db99d78475657664edf
Author: xiaochen <[email protected]>
AuthorDate: Wed Aug 6 21:07:23 2025 +0800
[Improve][Connector-V2] Support like predicate pushdown in paimon (#9653)
---
docs/en/connector-v2/source/Paimon.md | 2 +-
docs/zh/connector-v2/source/Paimon.md | 2 +-
.../converter/SqlToPaimonPredicateConverter.java | 33 ++++++++----
.../source/converter/SqlToPaimonConverterTest.java | 29 ++++++++++-
.../e2e/connector/paimon/PaimonSinkCDCIT.java | 6 +++
.../resources/paimon_to_assert_with_filter10.conf | 60 ++++++++++++++++++++++
.../resources/paimon_to_assert_with_filter9.conf | 60 ++++++++++++++++++++++
7 files changed, 180 insertions(+), 12 deletions(-)
diff --git a/docs/en/connector-v2/source/Paimon.md
b/docs/en/connector-v2/source/Paimon.md
index f3b268044c..325531da56 100644
--- a/docs/en/connector-v2/source/Paimon.md
+++ b/docs/en/connector-v2/source/Paimon.md
@@ -85,7 +85,7 @@ The file path of `hdfs-site.xml`
### query [string]
The filter condition of the table read. For example: `select * from st_test
where id > 100`. If not specified, all rows are read.
-Currently, where conditions only support <, <=, >, >=, =, !=, or, and,is null,
is not null, between...and, in, not in, like(pattern matching with prefix only)
,and others are not supported.
+Currently, where conditions only support <, <=, >, >=, =, !=, or, and,is null,
is not null, between...and, in, not in, like, and others are not supported.
The Having, Group By, Order By clauses are currently unsupported, because
these clauses are not supported by Paimon.
you can also project specific columns, for example: select id, name from
st_test where id > 100.
The limit will be supported in the future.
diff --git a/docs/zh/connector-v2/source/Paimon.md
b/docs/zh/connector-v2/source/Paimon.md
index df868cff63..1c139f5bff 100644
--- a/docs/zh/connector-v2/source/Paimon.md
+++ b/docs/zh/connector-v2/source/Paimon.md
@@ -86,7 +86,7 @@ Paimon 的 catalog uri,仅当 catalog_type 为 hive 时需要
读取表格的筛选条件,例如:`select * from st_test where id > 100`。如果未指定,则将读取所有记录。
-目前,`where` 支持`<, <=, >, >=, =, !=, or, and,is null, is not null,
between...and, in , not in, like(pattern matching with prefix only)`,其他暂不支持。
+目前,`where` 支持`<, <=, >, >=, =, !=, or, and,is null, is not null,
between...and, in , not in, like`,其他暂不支持。
Projection 已支持,你可以选择特定的列,例如:select id, name from st_test where id > 100。
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java
index 83ac14e345..1d881b4abd 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java
@@ -252,16 +252,31 @@ public class SqlToPaimonPredicateConverter {
Object rightVal =
convertValueByPaimonDataType(rowType,
column.getColumnName(), rightPredicate);
- Pattern BEGIN_PATTERN = Pattern.compile("([^%]+)%");
- Matcher matcher = BEGIN_PATTERN.matcher(rightVal.toString());
- if (matcher.matches()) {
- return builder.startsWith(columnIndex,
BinaryString.fromString(matcher.group(1)));
- } else {
- throw new IllegalArgumentException(
- "Unsupported expression type: "
- + expression.getClass().getSimpleName()
- + ", only support like pattern matching with
prefix");
+ Pattern BEGIN_PATTERN = Pattern.compile("([^%]+)%$");
+ Matcher beginMatcher = BEGIN_PATTERN.matcher(rightVal.toString());
+ if (beginMatcher.matches()) {
+ return builder.startsWith(
+ columnIndex,
BinaryString.fromString(beginMatcher.group(1)));
+ }
+
+ Pattern END_PATTERN = Pattern.compile("^%([^%]+)");
+ Matcher endMatcher = END_PATTERN.matcher(rightVal.toString());
+ if (endMatcher.matches()) {
+ return builder.endsWith(columnIndex,
BinaryString.fromString(endMatcher.group(1)));
}
+
+ Pattern CONTAINS_PATTERN = Pattern.compile("^%([^%]+)%$");
+ Matcher containsMatcher =
CONTAINS_PATTERN.matcher(rightVal.toString());
+ if (containsMatcher.matches()) {
+ return builder.contains(
+ columnIndex,
BinaryString.fromString(containsMatcher.group(1)));
+ }
+ throw new IllegalArgumentException(
+ String.format(
+ "Invalid LIKE pattern: '%s'. Supported patterns
are: 'prefix%%', '%%suffix', and '%%substring%%'. "
+ + "Please ensure your pattern matches one
of these formats.",
+ rightVal.toString()));
+
} else if (expression instanceof Parenthesis) {
Parenthesis parenthesis = (Parenthesis) expression;
return parseExpressionToPredicate(builder, rowType,
parenthesis.getExpression());
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonConverterTest.java
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonConverterTest.java
index 519784ee67..812d1da98e 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonConverterTest.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonConverterTest.java
@@ -250,7 +250,7 @@ public class SqlToPaimonConverterTest {
}
@Test
- public void testConvertSqlWhereToPaimonPredicateWithStartWith() {
+ public void testConvertSqlWhereToPaimonLikePredicate() {
String query = "SELECT * FROM table WHERE varchar_col like 'te%'";
PlainSelect plainSelect = convertToPlainSelect(query);
@@ -264,5 +264,32 @@ public class SqlToPaimonConverterTest {
Predicate expectedPredicate =
PredicateBuilder.or(builder.startsWith(1, "te"));
assertEquals(expectedPredicate.toString(), predicate.toString());
+
+ query = "SELECT * FROM table WHERE varchar_col like '%st'";
+
+ plainSelect = convertToPlainSelect(query);
+ predicate =
+ SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(
+ rowType, plainSelect);
+
+ assertNotNull(predicate);
+
+ builder = new PredicateBuilder(rowType);
+ expectedPredicate = PredicateBuilder.or(builder.endsWith(1, "st"));
+
+ assertEquals(expectedPredicate.toString(), predicate.toString());
+
+ query = "SELECT * FROM table WHERE varchar_col like '%es%'";
+ plainSelect = convertToPlainSelect(query);
+ predicate =
+ SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(
+ rowType, plainSelect);
+
+ assertNotNull(predicate);
+
+ builder = new PredicateBuilder(rowType);
+ expectedPredicate = PredicateBuilder.or(builder.contains(1, "es"));
+
+ assertEquals(expectedPredicate.toString(), predicate.toString());
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
index 3def9a3497..48bffcb377 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
@@ -477,6 +477,12 @@ public class PaimonSinkCDCIT extends AbstractPaimonIT
implements TestResource {
Container.ExecResult readResult8 =
container.executeJob("/paimon_to_assert_with_filter8.conf");
Assertions.assertEquals(0, readResult8.getExitCode());
+ Container.ExecResult readResult9 =
+ container.executeJob("/paimon_to_assert_with_filter9.conf");
+ Assertions.assertEquals(0, readResult9.getExitCode());
+ Container.ExecResult readResult10 =
+ container.executeJob("/paimon_to_assert_with_filter10.conf");
+ Assertions.assertEquals(0, readResult10.getExitCode());
}
@TestTemplate
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter10.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter10.conf
new file mode 100644
index 0000000000..326d195d33
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter10.conf
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Paimon {
+ warehouse = "/tmp/seatunnel_mnt/paimon"
+ database = "full_type"
+ table = "st_test"
+ query = "select * from st_test where c_string like '%string%'"
+ plugin_output = paimon_source
+ }
+}
+
+sink {
+ Assert {
+ plugin_input = paimon_source
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 3
+ }
+ {
+ rule_type = MIN_ROW
+ rule_value = 3
+ }
+ ]
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter9.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter9.conf
new file mode 100644
index 0000000000..252bed328d
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter9.conf
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Paimon {
+ warehouse = "/tmp/seatunnel_mnt/paimon"
+ database = "full_type"
+ table = "st_test"
+ query = "select * from st_test where c_string like '%string2'"
+ plugin_output = paimon_source
+ }
+}
+
+sink {
+ Assert {
+ plugin_input = paimon_source
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 1
+ }
+ {
+ rule_type = MIN_ROW
+ rule_value = 1
+ }
+ ]
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}