This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 33c1a8ba70 [flink] Fix IllegalArgumentException when setting
scan.partitions to max_pt() in older Flink versions (#5491)
33c1a8ba70 is described below
commit 33c1a8ba70aca49f05bca9bec1fd1de3ca1d5981
Author: tsreaper <[email protected]>
AuthorDate: Sun Apr 20 18:30:40 2025 +0800
[flink] Fix IllegalArgumentException when setting scan.partitions to
max_pt() in older Flink versions (#5491)
---
paimon-flink/paimon-flink-1.17/pom.xml | 8 ++
.../org/apache/paimon/flink/LookupJoinITCase.java | 87 ++++++++++++++++++++++
.../paimon/flink/source/FlinkTableSource.java | 33 ++++----
3 files changed, 115 insertions(+), 13 deletions(-)
diff --git a/paimon-flink/paimon-flink-1.17/pom.xml
b/paimon-flink/paimon-flink-1.17/pom.xml
index d1cde4b94d..e65b1db50b 100644
--- a/paimon-flink/paimon-flink-1.17/pom.xml
+++ b/paimon-flink/paimon-flink-1.17/pom.xml
@@ -36,6 +36,7 @@ under the License.
<properties>
<flink.version>1.17.2</flink.version>
<iceberg.flink.version>1.17</iceberg.flink.version>
+ <frocksdbjni.version>6.20.3-ververica-2.0</frocksdbjni.version>
</properties>
<dependencies>
@@ -114,6 +115,13 @@ under the License.
<version>${iceberg.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>com.ververica</groupId>
+ <artifactId>frocksdbjni</artifactId>
+ <version>${frocksdbjni.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/paimon-flink/paimon-flink-1.17/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
b/paimon-flink/paimon-flink-1.17/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
new file mode 100644
index 0000000000..be52f06072
--- /dev/null
+++
b/paimon-flink/paimon-flink-1.17/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.flink.FlinkConnectorOptions.LookupCacheMode;
+import org.apache.paimon.utils.BlockingIterator;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for lookup join for Flink 1.17. */
+public class LookupJoinITCase extends CatalogITCaseBase {
+
+ @Override
+ public List<String> ddl() {
+ return Collections.singletonList("CREATE TABLE T (i INT, `proctime` AS
PROCTIME())");
+ }
+
+ @Override
+ protected int defaultParallelism() {
+ return 1;
+ }
+
+ @ParameterizedTest
+ @EnumSource(LookupCacheMode.class)
+ public void testLookupMaxPtPartitionedTable(LookupCacheMode mode) throws
Exception {
+ boolean testDynamicBucket = ThreadLocalRandom.current().nextBoolean();
+ String primaryKeys;
+ String bucket;
+ if (testDynamicBucket) {
+ primaryKeys = "k";
+ bucket = "-1";
+ } else {
+ primaryKeys = "pt, k";
+ bucket = "1";
+ }
+ sql(
+ "CREATE TABLE PARTITIONED_DIM (pt STRING, k INT, v INT,
PRIMARY KEY (%s) NOT ENFORCED)"
+ + "PARTITIONED BY (`pt`) WITH ("
+ + "'bucket' = '%s', "
+ + "'lookup.dynamic-partition' = 'max_pt()', "
+ + "'lookup.dynamic-partition.refresh-interval' = '1
ms', "
+ + "'lookup.cache' = '%s', "
+ + "'continuous.discovery-interval'='1 ms')",
+ primaryKeys, bucket, mode);
+ String query =
+ "SELECT T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM for
system_time as of T.proctime AS D ON T.i = D.k";
+ BlockingIterator<Row, Row> iterator =
BlockingIterator.of(sEnv.executeSql(query).collect());
+
+ sql("INSERT INTO PARTITIONED_DIM VALUES ('1', 1, 2)");
+ Thread.sleep(2000); // wait refresh
+ sql("INSERT INTO T VALUES (1)");
+ List<Row> result = iterator.collect(1);
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 2));
+
+ sql("INSERT INTO PARTITIONED_DIM VALUES ('2', 1, 3)");
+ Thread.sleep(2000); // wait refresh
+ sql("INSERT INTO T VALUES (1)");
+ result = iterator.collect(1);
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 3));
+
+ iterator.close();
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
index 359ca2a595..0b9c4616de 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
@@ -69,6 +69,7 @@ public abstract class FlinkTableSource
"%s%s", PAIMON_PREFIX,
FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key());
protected final Table table;
+ protected final Options options;
@Nullable protected Predicate predicate;
@Nullable protected int[][] projectFields;
@@ -85,6 +86,7 @@ public abstract class FlinkTableSource
@Nullable int[][] projectFields,
@Nullable Long limit) {
this.table = table;
+ this.options = Options.fromMap(table.options());
this.predicate = predicate;
this.projectFields = projectFields;
@@ -130,18 +132,24 @@ public abstract class FlinkTableSource
* lookup sources are handled in {@link
org.apache.paimon.flink.lookup.PartitionLoader}.
*/
protected Predicate getPredicateWithScanPartitions() {
- if
(table.options().containsKey(FlinkConnectorOptions.SCAN_PARTITIONS.key())) {
- Predicate partitionPredicate =
- PartitionPredicate.createPartitionPredicate(
- ParameterUtils.getPartitions(
- table.options()
-
.get(FlinkConnectorOptions.SCAN_PARTITIONS.key())
- .split(";")),
- table.rowType(),
- table.options()
- .getOrDefault(
-
CoreOptions.PARTITION_DEFAULT_NAME.key(),
-
CoreOptions.PARTITION_DEFAULT_NAME.defaultValue()));
+ if (options.contains(FlinkConnectorOptions.SCAN_PARTITIONS)) {
+ Predicate partitionPredicate;
+ try {
+ partitionPredicate =
+ PartitionPredicate.createPartitionPredicate(
+ ParameterUtils.getPartitions(
+
options.get(FlinkConnectorOptions.SCAN_PARTITIONS)
+ .split(";")),
+ table.rowType(),
+
options.get(CoreOptions.PARTITION_DEFAULT_NAME));
+ } catch (IllegalArgumentException e) {
+ // In older versions of Flink, however, lookup sources will
first be treated as
+ // normal sources. So this method will also be visited by
lookup tables, whose
+ // option value might be max_pt() or max_two_pt(). In this
case we ignore the
+ // filters.
+ return predicate;
+ }
+
if (predicate == null) {
return partitionPredicate;
} else {
@@ -171,7 +179,6 @@ public abstract class FlinkTableSource
@Nullable
protected Integer inferSourceParallelism(StreamExecutionEnvironment env) {
- Options options = Options.fromMap(this.table.options());
Configuration envConfig = (Configuration) env.getConfiguration();
if (envConfig.containsKey(FLINK_INFER_SCAN_PARALLELISM)) {
options.set(