This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b9539f8bc [flink] Fix lookup join with lookup.dynamic-partition and
cross-partition table throws ArrayIndexOutOfBoundsException (#3869)
b9539f8bc is described below
commit b9539f8bc9c9383f6c7b438839285d10172227fb
Author: yuzelin <[email protected]>
AuthorDate: Thu Aug 1 20:10:36 2024 +0800
[flink] Fix lookup join with lookup.dynamic-partition and cross-partition
table throws ArrayIndexOutOfBoundsException (#3869)
---
.../flink/lookup/DynamicPartitionLoader.java | 4 ++-
.../flink/lookup/FileStoreLookupFunction.java | 8 ++---
.../org/apache/paimon/flink/LookupJoinITCase.java | 35 ++++++++++++----------
3 files changed, 26 insertions(+), 21 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
index aa80dc657..a8660ee8c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
@@ -66,10 +66,12 @@ public class DynamicPartitionLoader implements Serializable
{
this.comparator =
CodeGenUtils.newRecordComparator(partitionType.getFieldTypes());
}
- public void addJoinKeys(List<String> joinKeys) {
+ public void addPartitionKeysTo(List<String> joinKeys, List<String>
projectFields) {
List<String> partitionKeys = table.partitionKeys();
checkArgument(joinKeys.stream().noneMatch(partitionKeys::contains));
joinKeys.addAll(partitionKeys);
+
+ partitionKeys.stream().filter(k ->
!projectFields.contains(k)).forEach(projectFields::add);
}
@Nullable
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index 65c9c7302..01ebbde20 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -109,10 +109,6 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
.mapToObj(i ->
table.rowType().getFieldNames().get(projection[i]))
.collect(Collectors.toList());
- if (partitionLoader != null) {
- partitionLoader.addJoinKeys(joinKeys);
- }
-
this.projectFields =
Arrays.stream(projection)
.mapToObj(i -> table.rowType().getFieldNames().get(i))
@@ -125,6 +121,10 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
}
}
+ if (partitionLoader != null) {
+ partitionLoader.addPartitionKeysTo(joinKeys, projectFields);
+ }
+
this.predicate = predicate;
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
index 26982e44a..5b35cb131 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
@@ -641,25 +641,28 @@ public class LookupJoinITCase extends CatalogITCaseBase {
iterator.close();
}
- @Test
- public void testLookupMaxPtPartitionedTablePartialCache() throws Exception
{
- innerTestLookupMaxPtPartitionedTable(LookupCacheMode.AUTO);
- }
-
- @Test
- public void testLookupMaxPtPartitionedTableFullCache() throws Exception {
- innerTestLookupMaxPtPartitionedTable(LookupCacheMode.FULL);
- }
-
- private void innerTestLookupMaxPtPartitionedTable(LookupCacheMode mode)
throws Exception {
- tEnv.executeSql(
- "CREATE TABLE PARTITIONED_DIM (pt STRING, k INT, v INT,
PRIMARY KEY (pt, k) NOT ENFORCED)"
+ @ParameterizedTest
+ @EnumSource(LookupCacheMode.class)
+ public void testLookupMaxPtPartitionedTable(LookupCacheMode mode) throws
Exception {
+ boolean testDynamicBucket = ThreadLocalRandom.current().nextBoolean();
+ String primaryKeys;
+ String bucket;
+ if (testDynamicBucket) {
+ primaryKeys = "k";
+ bucket = "-1";
+ } else {
+ primaryKeys = "pt, k";
+ bucket = "1";
+ }
+ sql(
+ "CREATE TABLE PARTITIONED_DIM (pt STRING, k INT, v INT,
PRIMARY KEY (%s) NOT ENFORCED)"
+ "PARTITIONED BY (`pt`) WITH ("
- + "'bucket' = '1', "
+ + "'bucket' = '%s', "
+ "'lookup.dynamic-partition' = 'max_pt()', "
+ "'lookup.dynamic-partition.refresh-interval' = '1
ms', "
- + String.format("'lookup.cache' = '%s', ", mode)
- + "'continuous.discovery-interval'='1 ms')");
+ + "'lookup.cache' = '%s', "
+ + "'continuous.discovery-interval'='1 ms')",
+ primaryKeys, bucket, mode);
String query =
"SELECT T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM for
system_time as of T.proctime AS D ON T.i = D.k";
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(sEnv.executeSql(query).collect());