This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a19720ccf6 [Feature][Connector-V2] Support like predicate pushdown in
paimon (#9484)
a19720ccf6 is described below
commit a19720ccf67b2e3ef66a217e841dba12ccadb28e
Author: xiaochen <[email protected]>
AuthorDate: Fri Jul 4 09:33:13 2025 +0800
[Feature][Connector-V2] Support like predicate pushdown in paimon (#9484)
---
docs/en/connector-v2/source/Paimon.md | 4 +-
docs/zh/connector-v2/source/Paimon.md | 2 +-
.../converter/SqlToPaimonPredicateConverter.java | 21 +++++++++
.../source/converter/SqlToPaimonConverterTest.java | 17 ++++++++
.../e2e/connector/paimon/PaimonSinkCDCIT.java | 9 ++--
.../resources/paimon_to_assert_with_filter8.conf | 50 ++++++++++++++++++++++
6 files changed, 97 insertions(+), 6 deletions(-)
diff --git a/docs/en/connector-v2/source/Paimon.md
b/docs/en/connector-v2/source/Paimon.md
index 6ef6d49071..648be40dd2 100644
--- a/docs/en/connector-v2/source/Paimon.md
+++ b/docs/en/connector-v2/source/Paimon.md
@@ -58,7 +58,7 @@ The file path of `hdfs-site.xml`
### query [string]
The filter condition of the table read. For example: `select * from st_test
where id > 100`. If not specified, all rows are read.
-Currently, where conditions only support <, <=, >, >=, =, !=, or, and,is null,
is not null, between...and, in, not in, and others are not supported.
+Currently, where conditions only support <, <=, >, >=, =, !=, or, and,is null,
is not null, between...and, in, not in, like(pattern matching with prefix only)
,and others are not supported.
The Having, Group By, Order By clauses are currently unsupported, because
these clauses are not supported by Paimon.
you can also project specific columns, for example: select id, name from
st_test where id > 100.
The limit will be supported in the future.
@@ -198,7 +198,7 @@ If you want to read the changelog of the Paimon table,
first set the `changelog-
### Note
-Currently, batch reads are always the latest snapshot read, so to read full
changelog data, you need to use stream reads and start stream reads before
writing data to the Piamon table, and to ensure order, the parallelism of the
stream read task should be set to 1.
+Currently, batch reads are always the latest snapshot read, so to read full
changelog data, you need to use stream reads and start stream reads before
writing data to the Paimon table, and to ensure order, the parallelism of the
stream read task should be set to 1.
### Streaming read example
```hocon
diff --git a/docs/zh/connector-v2/source/Paimon.md
b/docs/zh/connector-v2/source/Paimon.md
index 81784cc7c4..8ce84cf533 100644
--- a/docs/zh/connector-v2/source/Paimon.md
+++ b/docs/zh/connector-v2/source/Paimon.md
@@ -59,7 +59,7 @@ Paimon 的 catalog uri,仅当 catalog_type 为 hive 时需要
读取表格的筛选条件,例如:`select * from st_test where id > 100`。如果未指定,则将读取所有记录。
-目前,`where` 支持`<, <=, >, >=, =, !=, or, and,is null, is not null,
between...and, in , not in`,其他暂不支持。
+目前,`where` 支持`<, <=, >, >=, =, !=, or, and,is null, is not null,
between...and, in , not in, like(pattern matching with prefix only)`,其他暂不支持。
Projection 已支持,你可以选择特定的列,例如:select id, name from st_test where id > 100。
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java
index a3fa418750..83ac14e345 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java
@@ -50,6 +50,7 @@ import
net.sf.jsqlparser.expression.operators.relational.GreaterThan;
import net.sf.jsqlparser.expression.operators.relational.GreaterThanEquals;
import net.sf.jsqlparser.expression.operators.relational.InExpression;
import net.sf.jsqlparser.expression.operators.relational.IsNullExpression;
+import net.sf.jsqlparser.expression.operators.relational.LikeExpression;
import net.sf.jsqlparser.expression.operators.relational.MinorThan;
import net.sf.jsqlparser.expression.operators.relational.MinorThanEquals;
import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo;
@@ -68,6 +69,8 @@ import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import java.util.stream.IntStream;
public class SqlToPaimonPredicateConverter {
@@ -241,6 +244,24 @@ public class SqlToPaimonPredicateConverter {
Object paimonEndVal =
convertValueByPaimonDataType(rowType,
column.getColumnName(), jsqlEndVal);
return builder.between(columnIndex, paimonStartVal, paimonEndVal);
+ } else if (expression instanceof LikeExpression) {
+ LikeExpression like = (LikeExpression) expression;
+ Column column = (Column) like.getLeftExpression();
+ int columnIndex = getColumnIndex(builder, column);
+ Object rightPredicate =
getJSQLParserDataTypeValue(like.getRightExpression());
+ Object rightVal =
+ convertValueByPaimonDataType(rowType,
column.getColumnName(), rightPredicate);
+
+ Pattern BEGIN_PATTERN = Pattern.compile("([^%]+)%");
+ Matcher matcher = BEGIN_PATTERN.matcher(rightVal.toString());
+ if (matcher.matches()) {
+ return builder.startsWith(columnIndex,
BinaryString.fromString(matcher.group(1)));
+ } else {
+ throw new IllegalArgumentException(
+ "Unsupported expression type: "
+ + expression.getClass().getSimpleName()
+ + ", only support like pattern matching with
prefix");
+ }
} else if (expression instanceof Parenthesis) {
Parenthesis parenthesis = (Parenthesis) expression;
return parseExpressionToPredicate(builder, rowType,
parenthesis.getExpression());
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonConverterTest.java
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonConverterTest.java
index 26eee05477..519784ee67 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonConverterTest.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonConverterTest.java
@@ -248,4 +248,21 @@ public class SqlToPaimonConverterTest {
int[] expectedProjectionIndex = {4, 7, 0, 12, 2};
assertArrayEquals(projectionIndex, expectedProjectionIndex);
}
+
+ @Test
+ public void testConvertSqlWhereToPaimonPredicateWithStartWith() {
+ String query = "SELECT * FROM table WHERE varchar_col like 'te%'";
+
+ PlainSelect plainSelect = convertToPlainSelect(query);
+ Predicate predicate =
+ SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(
+ rowType, plainSelect);
+
+ assertNotNull(predicate);
+
+ PredicateBuilder builder = new PredicateBuilder(rowType);
+ Predicate expectedPredicate =
PredicateBuilder.or(builder.startsWith(1, "te"));
+
+ assertEquals(expectedPredicate.toString(), predicate.toString());
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
index b2926039c3..d4f5bb5f9d 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
@@ -492,6 +492,9 @@ public class PaimonSinkCDCIT extends AbstractPaimonIT
implements TestResource {
Container.ExecResult readResult7 =
container.executeJob("/paimon_to_assert_with_filter7.conf");
Assertions.assertEquals(0, readResult7.getExitCode());
+ Container.ExecResult readResult8 =
+ container.executeJob("/paimon_to_assert_with_filter8.conf");
+ Assertions.assertEquals(0, readResult8.getExitCode());
}
@TestTemplate
@@ -533,11 +536,11 @@ public class PaimonSinkCDCIT extends AbstractPaimonIT
implements TestResource {
@TestTemplate
public void testChangelogLookup(TestContainer container) throws Exception {
- // create Piamon table (changelog-producer=lookup)
+ // create Paimon table (changelog-producer=lookup)
Container.ExecResult writeResult =
container.executeJob("/changelog_fake_cdc_sink_paimon_case1_ddl.conf");
Assertions.assertEquals(0, writeResult.getExitCode());
- TimeUnit.SECONDS.sleep(20);
+ TimeUnit.SECONDS.sleep(120);
String[] jobIds =
new String[] {
String.valueOf(JobIdGenerator.newJobId()),
@@ -583,7 +586,7 @@ public class PaimonSinkCDCIT extends AbstractPaimonIT
implements TestResource {
}
}));
// stream job running 30 seconds
- TimeUnit.SECONDS.sleep(30);
+ TimeUnit.SECONDS.sleep(120);
// cancel stream job
container.cancelJob(jobIds[1]);
container.cancelJob(jobIds[2]);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter8.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter8.conf
new file mode 100644
index 0000000000..1e0947403c
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter8.conf
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Paimon {
+ warehouse = "/tmp/paimon"
+ database = "full_type"
+ table = "st_test"
+ query = "select * from st_test where c_string like 'c_string2%'"
+ plugin_output = paimon_source
+ }
+}
+
+sink {
+ Assert {
+ plugin_input = paimon_source
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 1
+ }
+ {
+ rule_type = MIN_ROW
+ rule_value = 1
+ }
+ ]
+
+ }
+ }
+}