This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.1
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/release-1.1 by this push:
     new 47a0655997 [flink] Fix lookup failed to join keys when using max_pt() 
and lookup table is updated by insert overwrite (#5535)
47a0655997 is described below

commit 47a065599759e3418397c372bacd4c6fc31e6249
Author: tsreaper <[email protected]>
AuthorDate: Fri Apr 25 15:31:21 2025 +0800

    [flink] Fix lookup failed to join keys when using max_pt() and lookup table 
is updated by insert overwrite (#5535)
---
 .../flink/lookup/DynamicPartitionLoader.java       |  2 ++
 .../org/apache/paimon/flink/LookupJoinITCase.java  | 31 ++++++++++++++++++++++
 2 files changed, 33 insertions(+)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
index 590cd962e8..1683e8707a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
@@ -56,8 +56,10 @@ public class DynamicPartitionLoader extends PartitionLoader {
     @Override
     public void open() {
         super.open();
+
         RowType partitionType = table.rowType().project(table.partitionKeys());
         this.comparator = 
CodeGenUtils.newRecordComparator(partitionType.getFieldTypes());
+        this.lastRefresh = null;
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
index 7c5f083564..36844d4c52 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
@@ -1071,4 +1071,35 @@ public class LookupJoinITCase extends CatalogITCaseBase {
         assertThat(result).containsExactlyInAnyOrder(Row.of(1, 11), Row.of(1, 
12), Row.of(2, 22));
         iterator.close();
     }
+
+    @Test
+    public void testMaxPtAndOverwrite() throws Exception {
+        sql(
+                "CREATE TABLE PARTITIONED_DIM (pt INT, k INT, v INT) "
+                        + "PARTITIONED BY (`pt`) WITH ("
+                        + "'bucket' = '2', "
+                        + "'bucket-key' = 'k', "
+                        + "'lookup.dynamic-partition' = 'max_pt()', "
+                        + "'lookup.dynamic-partition.refresh-interval' = 
'99999 s', "
+                        + "'continuous.discovery-interval'='1 ms')");
+        sql(
+                "INSERT INTO PARTITIONED_DIM VALUES (1, 1, 101), (1, 2, 102), 
(2, 1, 201), (2, 2, 202)");
+
+        sql("INSERT INTO T VALUES (1), (2), (3)");
+        String query =
+                "SELECT T.i, D.v FROM T "
+                        + "LEFT JOIN PARTITIONED_DIM /*+ 
OPTIONS('scan.partitions' = 'max_pt()') */ "
+                        + "for system_time as of T.proctime AS D ON T.i = D.k";
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(sEnv.executeSql(query).collect());
+        List<Row> result = iterator.collect(3);
+        assertThat(result)
+                .containsExactlyInAnyOrder(Row.of(1, 201), Row.of(2, 202), 
Row.of(3, null));
+
+        sql(
+                "INSERT OVERWRITE PARTITIONED_DIM PARTITION (pt = 2) VALUES 
(1, 211), (2, 212), (3, 213)");
+        sql("INSERT INTO T VALUES (1), (2), (3)");
+        result = iterator.collect(3);
+        assertThat(result)
+                .containsExactlyInAnyOrder(Row.of(1, 211), Row.of(2, 212), 
Row.of(3, 213));
+    }
 }

Reply via email to