This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 92ec5b0b3 [lookup] Fix NPE when FileStoreLookupFunction init empty
table with dynamic partition (#3138)
92ec5b0b3 is described below
commit 92ec5b0b38308aab3cacdaa24a3f6100fe088488
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Apr 2 11:17:05 2024 +0800
[lookup] Fix NPE when FileStoreLookupFunction init empty table with dynamic
partition (#3138)
This closes #3138.
---
.../flink/lookup/DynamicPartitionLoader.java | 2 +-
.../flink/lookup/FileStoreLookupFunction.java | 4 +++
.../flink/lookup/FileStoreLookupFunctionTest.java | 29 ++++++++++++++++++++++
3 files changed, 34 insertions(+), 1 deletion(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
index 3e4206332..aa80dc657 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
@@ -53,7 +53,7 @@ public class DynamicPartitionLoader implements Serializable {
private Comparator<InternalRow> comparator;
private LocalDateTime lastRefresh;
- private BinaryRow partition;
+ @Nullable private BinaryRow partition;
private DynamicPartitionLoader(Table table, Duration refreshInterval) {
this.table = table;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index a321da634..6a31c432f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -236,6 +236,10 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
boolean partitionChanged = partitionLoader.checkRefresh();
BinaryRow partition = partitionLoader.partition();
+ if (partition == null) {
+ return null;
+ }
+
lookupTable.specificPartitionFilter(createSpecificPartFilter(partition));
if (partitionChanged && reopen) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
index c21989ee8..a8aa2530d 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.lookup;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.FlinkRowData;
import
org.apache.paimon.flink.lookup.PrimaryKeyPartialLookupTable.LocalQueryExecutor;
import
org.apache.paimon.flink.lookup.PrimaryKeyPartialLookupTable.QueryExecutor;
@@ -83,12 +84,20 @@ public class FileStoreLookupFunctionTest {
}
private void createLookupFunction(boolean isPartition, boolean
joinEqualPk) throws Exception {
+ createLookupFunction(isPartition, joinEqualPk, false);
+ }
+
+ private void createLookupFunction(
+ boolean isPartition, boolean joinEqualPk, boolean
dynamicPartition) throws Exception {
SchemaManager schemaManager = new SchemaManager(fileIO, tablePath);
Options conf = new Options();
conf.set(CoreOptions.BUCKET, 2);
conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 3);
conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 2);
conf.set(RocksDBOptions.LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL,
Duration.ofSeconds(1));
+ if (dynamicPartition) {
+ conf.set(FlinkConnectorOptions.LOOKUP_DYNAMIC_PARTITION,
"max_pt()");
+ }
RowType rowType =
RowType.of(
@@ -178,6 +187,26 @@ public class FileStoreLookupFunctionTest {
lookupFunction.lookup(new FlinkRowData(GenericRow.of(1, 1, 10L)));
}
+ @Test
+ public void testLookupDynamicPartition() throws Exception {
+ createLookupFunction(true, false, true);
+ commit(writeCommit(1));
+ lookupFunction.lookup(new FlinkRowData(GenericRow.of(1, 1, 10L)));
+ assertThat(
+ TraceableFileIO.openInputStreams(
+ s ->
s.toString().contains(tempDir.toString()))
+ .size())
+ .isEqualTo(0);
+
+ commit(writeCommit(10));
+ lookupFunction.lookup(new FlinkRowData(GenericRow.of(1, 1, 10L)));
+ assertThat(
+ TraceableFileIO.openInputStreams(
+ s ->
s.toString().contains(tempDir.toString()))
+ .size())
+ .isEqualTo(0);
+ }
+
private void commit(List<CommitMessage> messages) throws Exception {
TableCommitImpl commit = table.newCommit(commitUser);
commit.commit(messages);