This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 92ec5b0b3 [lookup] Fix NPE when FileStoreLookupFunction init empty 
table with dynamic partition (#3138)
92ec5b0b3 is described below

commit 92ec5b0b38308aab3cacdaa24a3f6100fe088488
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Apr 2 11:17:05 2024 +0800

    [lookup] Fix NPE when FileStoreLookupFunction init empty table with dynamic 
partition (#3138)
    
    This closes #3138.
---
 .../flink/lookup/DynamicPartitionLoader.java       |  2 +-
 .../flink/lookup/FileStoreLookupFunction.java      |  4 +++
 .../flink/lookup/FileStoreLookupFunctionTest.java  | 29 ++++++++++++++++++++++
 3 files changed, 34 insertions(+), 1 deletion(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
index 3e4206332..aa80dc657 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
@@ -53,7 +53,7 @@ public class DynamicPartitionLoader implements Serializable {
     private Comparator<InternalRow> comparator;
 
     private LocalDateTime lastRefresh;
-    private BinaryRow partition;
+    @Nullable private BinaryRow partition;
 
     private DynamicPartitionLoader(Table table, Duration refreshInterval) {
         this.table = table;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index a321da634..6a31c432f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -236,6 +236,10 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
 
         boolean partitionChanged = partitionLoader.checkRefresh();
         BinaryRow partition = partitionLoader.partition();
+        if (partition == null) {
+            return null;
+        }
+
         
lookupTable.specificPartitionFilter(createSpecificPartFilter(partition));
 
         if (partitionChanged && reopen) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
index c21989ee8..a8aa2530d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.lookup;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.FlinkRowData;
 import 
org.apache.paimon.flink.lookup.PrimaryKeyPartialLookupTable.LocalQueryExecutor;
 import 
org.apache.paimon.flink.lookup.PrimaryKeyPartialLookupTable.QueryExecutor;
@@ -83,12 +84,20 @@ public class FileStoreLookupFunctionTest {
     }
 
     private void createLookupFunction(boolean isPartition, boolean 
joinEqualPk) throws Exception {
+        createLookupFunction(isPartition, joinEqualPk, false);
+    }
+
+    private void createLookupFunction(
+            boolean isPartition, boolean joinEqualPk, boolean 
dynamicPartition) throws Exception {
         SchemaManager schemaManager = new SchemaManager(fileIO, tablePath);
         Options conf = new Options();
         conf.set(CoreOptions.BUCKET, 2);
         conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 3);
         conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 2);
         conf.set(RocksDBOptions.LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL, 
Duration.ofSeconds(1));
+        if (dynamicPartition) {
+            conf.set(FlinkConnectorOptions.LOOKUP_DYNAMIC_PARTITION, 
"max_pt()");
+        }
 
         RowType rowType =
                 RowType.of(
@@ -178,6 +187,26 @@ public class FileStoreLookupFunctionTest {
         lookupFunction.lookup(new FlinkRowData(GenericRow.of(1, 1, 10L)));
     }
 
+    @Test
+    public void testLookupDynamicPartition() throws Exception {
+        createLookupFunction(true, false, true);
+        commit(writeCommit(1));
+        lookupFunction.lookup(new FlinkRowData(GenericRow.of(1, 1, 10L)));
+        assertThat(
+                        TraceableFileIO.openInputStreams(
+                                        s -> 
s.toString().contains(tempDir.toString()))
+                                .size())
+                .isEqualTo(0);
+
+        commit(writeCommit(10));
+        lookupFunction.lookup(new FlinkRowData(GenericRow.of(1, 1, 10L)));
+        assertThat(
+                        TraceableFileIO.openInputStreams(
+                                        s -> 
s.toString().contains(tempDir.toString()))
+                                .size())
+                .isEqualTo(0);
+    }
+
     private void commit(List<CommitMessage> messages) throws Exception {
         TableCommitImpl commit = table.newCommit(commitUser);
         commit.commit(messages);

Reply via email to