This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b9539f8bc [flink] Fix lookup join with lookup.dynamic-partition and 
cross-partition table throws ArrayIndexOutOfBoundsException (#3869)
b9539f8bc is described below

commit b9539f8bc9c9383f6c7b438839285d10172227fb
Author: yuzelin <[email protected]>
AuthorDate: Thu Aug 1 20:10:36 2024 +0800

    [flink] Fix lookup join with lookup.dynamic-partition and cross-partition 
table throws ArrayIndexOutOfBoundsException (#3869)
---
 .../flink/lookup/DynamicPartitionLoader.java       |  4 ++-
 .../flink/lookup/FileStoreLookupFunction.java      |  8 ++---
 .../org/apache/paimon/flink/LookupJoinITCase.java  | 35 ++++++++++++----------
 3 files changed, 26 insertions(+), 21 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
index aa80dc657..a8660ee8c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
@@ -66,10 +66,12 @@ public class DynamicPartitionLoader implements Serializable 
{
         this.comparator = 
CodeGenUtils.newRecordComparator(partitionType.getFieldTypes());
     }
 
-    public void addJoinKeys(List<String> joinKeys) {
+    public void addPartitionKeysTo(List<String> joinKeys, List<String> 
projectFields) {
         List<String> partitionKeys = table.partitionKeys();
         checkArgument(joinKeys.stream().noneMatch(partitionKeys::contains));
         joinKeys.addAll(partitionKeys);
+
+        partitionKeys.stream().filter(k -> 
!projectFields.contains(k)).forEach(projectFields::add);
     }
 
     @Nullable
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index 65c9c7302..01ebbde20 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -109,10 +109,6 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
                         .mapToObj(i -> 
table.rowType().getFieldNames().get(projection[i]))
                         .collect(Collectors.toList());
 
-        if (partitionLoader != null) {
-            partitionLoader.addJoinKeys(joinKeys);
-        }
-
         this.projectFields =
                 Arrays.stream(projection)
                         .mapToObj(i -> table.rowType().getFieldNames().get(i))
@@ -125,6 +121,10 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
             }
         }
 
+        if (partitionLoader != null) {
+            partitionLoader.addPartitionKeysTo(joinKeys, projectFields);
+        }
+
         this.predicate = predicate;
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
index 26982e44a..5b35cb131 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
@@ -641,25 +641,28 @@ public class LookupJoinITCase extends CatalogITCaseBase {
         iterator.close();
     }
 
-    @Test
-    public void testLookupMaxPtPartitionedTablePartialCache() throws Exception 
{
-        innerTestLookupMaxPtPartitionedTable(LookupCacheMode.AUTO);
-    }
-
-    @Test
-    public void testLookupMaxPtPartitionedTableFullCache() throws Exception {
-        innerTestLookupMaxPtPartitionedTable(LookupCacheMode.FULL);
-    }
-
-    private void innerTestLookupMaxPtPartitionedTable(LookupCacheMode mode) 
throws Exception {
-        tEnv.executeSql(
-                "CREATE TABLE PARTITIONED_DIM (pt STRING, k INT, v INT, 
PRIMARY KEY (pt, k) NOT ENFORCED)"
+    @ParameterizedTest
+    @EnumSource(LookupCacheMode.class)
+    public void testLookupMaxPtPartitionedTable(LookupCacheMode mode) throws 
Exception {
+        boolean testDynamicBucket = ThreadLocalRandom.current().nextBoolean();
+        String primaryKeys;
+        String bucket;
+        if (testDynamicBucket) {
+            primaryKeys = "k";
+            bucket = "-1";
+        } else {
+            primaryKeys = "pt, k";
+            bucket = "1";
+        }
+        sql(
+                "CREATE TABLE PARTITIONED_DIM (pt STRING, k INT, v INT, 
PRIMARY KEY (%s) NOT ENFORCED)"
                         + "PARTITIONED BY (`pt`) WITH ("
-                        + "'bucket' = '1', "
+                        + "'bucket' = '%s', "
                         + "'lookup.dynamic-partition' = 'max_pt()', "
                         + "'lookup.dynamic-partition.refresh-interval' = '1 
ms', "
-                        + String.format("'lookup.cache' = '%s', ", mode)
-                        + "'continuous.discovery-interval'='1 ms')");
+                        + "'lookup.cache' = '%s', "
+                        + "'continuous.discovery-interval'='1 ms')",
+                primaryKeys, bucket, mode);
         String query =
                 "SELECT T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM for 
system_time as of T.proctime AS D ON T.i = D.k";
         BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(sEnv.executeSql(query).collect());

Reply via email to