This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.1 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 4565d6466316ca68ff2e50108aacf39f0a8d86ba Author: tsreaper <[email protected]> AuthorDate: Sun Apr 20 18:30:40 2025 +0800 [flink] Fix IllegalArgumentException when setting scan.partitions to max_pt() in older Flink versions (#5491) --- paimon-flink/paimon-flink-1.17/pom.xml | 8 ++ .../org/apache/paimon/flink/LookupJoinITCase.java | 87 ++++++++++++++++++++++ .../paimon/flink/source/FlinkTableSource.java | 33 ++++---- 3 files changed, 115 insertions(+), 13 deletions(-) diff --git a/paimon-flink/paimon-flink-1.17/pom.xml b/paimon-flink/paimon-flink-1.17/pom.xml index cef4579146..073dbf1c1d 100644 --- a/paimon-flink/paimon-flink-1.17/pom.xml +++ b/paimon-flink/paimon-flink-1.17/pom.xml @@ -36,6 +36,7 @@ under the License. <properties> <flink.version>1.17.2</flink.version> <iceberg.flink.version>1.17</iceberg.flink.version> + <frocksdbjni.version>6.20.3-ververica-2.0</frocksdbjni.version> </properties> <dependencies> @@ -114,6 +115,13 @@ under the License. <version>${iceberg.version}</version> <scope>test</scope> </dependency> + + <dependency> + <groupId>com.ververica</groupId> + <artifactId>frocksdbjni</artifactId> + <version>${frocksdbjni.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> diff --git a/paimon-flink/paimon-flink-1.17/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java b/paimon-flink/paimon-flink-1.17/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java new file mode 100644 index 0000000000..be52f06072 --- /dev/null +++ b/paimon-flink/paimon-flink-1.17/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink; + +import org.apache.paimon.flink.FlinkConnectorOptions.LookupCacheMode; +import org.apache.paimon.utils.BlockingIterator; + +import org.apache.flink.types.Row; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +import static org.assertj.core.api.Assertions.assertThat; + +/** ITCase for lookup join for Flink 1.17. */ +public class LookupJoinITCase extends CatalogITCaseBase { + + @Override + public List<String> ddl() { + return Collections.singletonList("CREATE TABLE T (i INT, `proctime` AS PROCTIME())"); + } + + @Override + protected int defaultParallelism() { + return 1; + } + + @ParameterizedTest + @EnumSource(LookupCacheMode.class) + public void testLookupMaxPtPartitionedTable(LookupCacheMode mode) throws Exception { + boolean testDynamicBucket = ThreadLocalRandom.current().nextBoolean(); + String primaryKeys; + String bucket; + if (testDynamicBucket) { + primaryKeys = "k"; + bucket = "-1"; + } else { + primaryKeys = "pt, k"; + bucket = "1"; + } + sql( + "CREATE TABLE PARTITIONED_DIM (pt STRING, k INT, v INT, PRIMARY KEY (%s) NOT ENFORCED)" + + "PARTITIONED BY (`pt`) WITH (" + + "'bucket' = '%s', " + + "'lookup.dynamic-partition' = 'max_pt()', " + + "'lookup.dynamic-partition.refresh-interval' = '1 ms', " + + "'lookup.cache' = '%s', " + + "'continuous.discovery-interval'='1 ms')", + primaryKeys, bucket, mode); + String query = + "SELECT T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM for system_time as of T.proctime AS D ON T.i = D.k"; + BlockingIterator<Row, Row> iterator = BlockingIterator.of(sEnv.executeSql(query).collect()); + + sql("INSERT INTO PARTITIONED_DIM VALUES ('1', 1, 2)"); + Thread.sleep(2000); // wait refresh + sql("INSERT INTO T VALUES (1)"); + List<Row> result = iterator.collect(1); + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 2)); + + sql("INSERT INTO PARTITIONED_DIM VALUES ('2', 1, 3)"); + Thread.sleep(2000); // wait refresh + sql("INSERT INTO T VALUES (1)"); + result = iterator.collect(1); + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 3)); + + iterator.close(); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java index 359ca2a595..0b9c4616de 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java @@ -69,6 +69,7 @@ public abstract class FlinkTableSource "%s%s", PAIMON_PREFIX, FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key()); protected final Table table; + protected final Options options; @Nullable protected Predicate predicate; @Nullable protected int[][] projectFields; @@ -85,6 +86,7 @@ public abstract class FlinkTableSource @Nullable int[][] projectFields, @Nullable Long limit) { this.table = table; + this.options = Options.fromMap(table.options()); this.predicate = predicate; this.projectFields = projectFields; @@ -130,18 +132,24 @@ public abstract class FlinkTableSource * lookup sources are handled in {@link org.apache.paimon.flink.lookup.PartitionLoader}. */ protected Predicate getPredicateWithScanPartitions() { - if (table.options().containsKey(FlinkConnectorOptions.SCAN_PARTITIONS.key())) { - Predicate partitionPredicate = - PartitionPredicate.createPartitionPredicate( - ParameterUtils.getPartitions( - table.options() - .get(FlinkConnectorOptions.SCAN_PARTITIONS.key()) - .split(";")), - table.rowType(), - table.options() - .getOrDefault( - CoreOptions.PARTITION_DEFAULT_NAME.key(), - CoreOptions.PARTITION_DEFAULT_NAME.defaultValue())); + if (options.contains(FlinkConnectorOptions.SCAN_PARTITIONS)) { + Predicate partitionPredicate; + try { + partitionPredicate = + PartitionPredicate.createPartitionPredicate( + ParameterUtils.getPartitions( + options.get(FlinkConnectorOptions.SCAN_PARTITIONS) + .split(";")), + table.rowType(), + options.get(CoreOptions.PARTITION_DEFAULT_NAME)); + } catch (IllegalArgumentException e) { + // In older versions of Flink, however, lookup sources will first be treated as + // normal sources. So this method will also be visited by lookup tables, whose + // option value might be max_pt() or max_two_pt(). In this case we ignore the + // filters. + return predicate; + } + if (predicate == null) { return partitionPredicate; } else { @@ -171,7 +179,6 @@ public abstract class FlinkTableSource @Nullable protected Integer inferSourceParallelism(StreamExecutionEnvironment env) { - Options options = Options.fromMap(this.table.options()); Configuration envConfig = (Configuration) env.getConfiguration(); if (envConfig.containsKey(FLINK_INFER_SCAN_PARALLELISM)) { options.set(
