This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.1
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/release-1.1 by this push:
new 47a0655997 [flink] Fix lookup failed to join keys when using max_pt()
and lookup table is updated by insert overwrite (#5535)
47a0655997 is described below
commit 47a065599759e3418397c372bacd4c6fc31e6249
Author: tsreaper <[email protected]>
AuthorDate: Fri Apr 25 15:31:21 2025 +0800
[flink] Fix lookup failed to join keys when using max_pt() and lookup table
is updated by insert overwrite (#5535)
---
.../flink/lookup/DynamicPartitionLoader.java | 2 ++
.../org/apache/paimon/flink/LookupJoinITCase.java | 31 ++++++++++++++++++++++
2 files changed, 33 insertions(+)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
index 590cd962e8..1683e8707a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
@@ -56,8 +56,10 @@ public class DynamicPartitionLoader extends PartitionLoader {
@Override
public void open() {
super.open();
+
RowType partitionType = table.rowType().project(table.partitionKeys());
this.comparator =
CodeGenUtils.newRecordComparator(partitionType.getFieldTypes());
+ this.lastRefresh = null;
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
index 7c5f083564..36844d4c52 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
@@ -1071,4 +1071,35 @@ public class LookupJoinITCase extends CatalogITCaseBase {
assertThat(result).containsExactlyInAnyOrder(Row.of(1, 11), Row.of(1,
12), Row.of(2, 22));
iterator.close();
}
+
+ @Test
+ public void testMaxPtAndOverwrite() throws Exception {
+ sql(
+ "CREATE TABLE PARTITIONED_DIM (pt INT, k INT, v INT) "
+ + "PARTITIONED BY (`pt`) WITH ("
+ + "'bucket' = '2', "
+ + "'bucket-key' = 'k', "
+ + "'lookup.dynamic-partition' = 'max_pt()', "
+ + "'lookup.dynamic-partition.refresh-interval' =
'99999 s', "
+ + "'continuous.discovery-interval'='1 ms')");
+ sql(
+ "INSERT INTO PARTITIONED_DIM VALUES (1, 1, 101), (1, 2, 102),
(2, 1, 201), (2, 2, 202)");
+
+ sql("INSERT INTO T VALUES (1), (2), (3)");
+ String query =
+ "SELECT T.i, D.v FROM T "
+ + "LEFT JOIN PARTITIONED_DIM /*+
OPTIONS('scan.partitions' = 'max_pt()') */ "
+ + "for system_time as of T.proctime AS D ON T.i = D.k";
+ BlockingIterator<Row, Row> iterator =
BlockingIterator.of(sEnv.executeSql(query).collect());
+ List<Row> result = iterator.collect(3);
+ assertThat(result)
+ .containsExactlyInAnyOrder(Row.of(1, 201), Row.of(2, 202),
Row.of(3, null));
+
+ sql(
+ "INSERT OVERWRITE PARTITIONED_DIM PARTITION (pt = 2) VALUES
(1, 211), (2, 212), (3, 213)");
+ sql("INSERT INTO T VALUES (1), (2), (3)");
+ result = iterator.collect(3);
+ assertThat(result)
+ .containsExactlyInAnyOrder(Row.of(1, 211), Row.of(2, 212),
Row.of(3, 213));
+ }
}