This is an automated email from the ASF dual-hosted git repository.

zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ab07e6f5f [flink][lookup] Support async refresh of lookup table (#3297)
ab07e6f5f is described below

commit ab07e6f5f78d1b2b48a96b7693828616b84440e5
Author: Fang Yong <[email protected]>
AuthorDate: Wed May 22 19:52:10 2024 +0800

    [flink][lookup] Support async refresh of lookup table (#3297)
    
    * [flink][lookup] Support async refresh of lookup table
---
 .../generated/flink_connector_configuration.html   |  12 +++
 .../apache/paimon/flink/FlinkConnectorOptions.java |  13 +++
 .../paimon/flink/lookup/FullCacheLookupTable.java  | 116 ++++++++++++++++++++-
 .../paimon/flink/lookup/LookupStreamingReader.java |   5 +
 .../flink/lookup/NoPrimaryKeyLookupTable.java      |  26 ++---
 .../paimon/flink/lookup/PrimaryKeyLookupTable.java |  33 +++---
 .../flink/lookup/SecondaryIndexLookupTable.java    |  51 ++++-----
 .../flink/lookup/FileStoreLookupFunctionTest.java  |  46 +++++---
 .../paimon/flink/lookup/LookupTableTest.java       |  73 +++++++++++++
 9 files changed, 293 insertions(+), 82 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 0280581d2..4fb5fc271 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -68,6 +68,18 @@ under the License.
             <td>Duration</td>
             <td>Specific dynamic partition refresh interval for lookup, scan 
all partitions and obtain corresponding partition.</td>
         </tr>
+        <tr>
+            <td><h5>lookup.refresh.async</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to refresh lookup table in an async thread.</td>
+        </tr>
+        <tr>
+            <td><h5>lookup.refresh.async.pending-snapshot-count</h5></td>
+            <td style="word-wrap: break-word;">5</td>
+            <td>Integer</td>
+            <td>If the pending snapshot count exceeds the threshold, lookup 
operator will refresh the table in sync.</td>
+        </tr>
         <tr>
             <td><h5>partition.idle-time-to-done</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index e02b1c124..275ce836a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -311,6 +311,19 @@ public class FlinkConnectorOptions {
                             "Specific dynamic partition refresh interval for 
lookup, "
                                     + "scan all partitions and obtain 
corresponding partition.");
 
+    public static final ConfigOption<Boolean> LOOKUP_REFRESH_ASYNC =
+            ConfigOptions.key("lookup.refresh.async")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Whether to refresh lookup table in an 
async thread.");
+
+    public static final ConfigOption<Integer> 
LOOKUP_REFRESH_ASYNC_PENDING_SNAPSHOT_COUNT =
+            ConfigOptions.key("lookup.refresh.async.pending-snapshot-count")
+                    .intType()
+                    .defaultValue(5)
+                    .withDescription(
+                            "If the pending snapshot count exceeds the 
threshold, lookup operator will refresh the table in sync.");
+
     public static final ConfigOption<Boolean> SINK_AUTO_TAG_FOR_SAVEPOINT =
             ConfigOptions.key("sink.savepoint.auto-tag")
                     .booleanType()
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
index 9ed83a9bb..1c8db0916 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
@@ -26,12 +26,14 @@ import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.lookup.BulkLoader;
 import org.apache.paimon.lookup.RocksDBState;
 import org.apache.paimon.lookup.RocksDBStateFactory;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.reader.RecordReaderIterator;
 import org.apache.paimon.sort.BinaryExternalSortBuffer;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.ExecutorThreadFactory;
 import org.apache.paimon.utils.FieldsComparator;
 import org.apache.paimon.utils.FileIOUtils;
 import org.apache.paimon.utils.MutableObjectIterator;
@@ -39,6 +41,11 @@ import org.apache.paimon.utils.PartialRow;
 import org.apache.paimon.utils.TypeUtils;
 import org.apache.paimon.utils.UserDefinedSeqComparator;
 
+import 
org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
 
 import java.io.File;
@@ -47,24 +54,40 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC;
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC_PENDING_SNAPSHOT_COUNT;
 
 /** Lookup table of full cache. */
 public abstract class FullCacheLookupTable implements LookupTable {
+    private static final Logger LOG = 
LoggerFactory.getLogger(FullCacheLookupTable.class);
 
+    protected final Object lock = new Object();
     protected final Context context;
     protected final RowType projectedType;
+    protected final boolean refreshAsync;
 
     @Nullable protected final FieldsComparator userDefinedSeqComparator;
     protected final int appendUdsFieldNumber;
 
     protected RocksDBStateFactory stateFactory;
+    private final ExecutorService refreshExecutor;
+    private final AtomicReference<Exception> cachedException;
+    private final int maxPendingSnapshotCount;
+    private final FileStoreTable table;
+    private Future<?> refreshFuture;
     private LookupStreamingReader reader;
     private Predicate specificPartition;
 
     public FullCacheLookupTable(Context context) {
         this.context = context;
-        FileStoreTable table = context.table;
+        this.table = context.table;
         List<String> sequenceFields = new ArrayList<>();
         if (table.primaryKeys().size() > 0) {
             sequenceFields = new CoreOptions(table.options()).sequenceField();
@@ -91,7 +114,20 @@ public abstract class FullCacheLookupTable implements 
LookupTable {
             this.userDefinedSeqComparator = null;
             this.appendUdsFieldNumber = 0;
         }
+
+        Options options = Options.fromMap(context.table.options());
         this.projectedType = projectedType;
+        this.refreshAsync = options.get(LOOKUP_REFRESH_ASYNC);
+        this.refreshExecutor =
+                this.refreshAsync
+                        ? Executors.newSingleThreadExecutor(
+                                new ExecutorThreadFactory(
+                                        String.format(
+                                                "%s-lookup-refresh",
+                                                
Thread.currentThread().getName())))
+                        : MoreExecutors.newDirectExecutorService();
+        this.cachedException = new AtomicReference<>();
+        this.maxPendingSnapshotCount = 
options.get(LOOKUP_REFRESH_ASYNC_PENDING_SNAPSHOT_COUNT);
     }
 
     @Override
@@ -145,6 +181,51 @@ public abstract class FullCacheLookupTable implements 
LookupTable {
 
     @Override
     public void refresh() throws Exception {
+        Long latestSnapshotId = table.snapshotManager().latestSnapshotId();
+        Long nextSnapshotId = reader.nextSnapshotId();
+        if (latestSnapshotId != null
+                && nextSnapshotId != null
+                && latestSnapshotId - nextSnapshotId > 
maxPendingSnapshotCount) {
+            LOG.warn(
+                    "The latest snapshot id {} is much greater than the next 
snapshot id {} for {}}, "
+                            + "you may need to increase the parallelism of 
lookup operator.",
+                    latestSnapshotId,
+                    nextSnapshotId,
+                    maxPendingSnapshotCount);
+            if (refreshFuture != null) {
+                // Wait the previous refresh task to be finished.
+                refreshFuture.get();
+            }
+            doRefresh();
+        } else {
+            Future<?> currentFuture = null;
+            try {
+                currentFuture =
+                        refreshExecutor.submit(
+                                () -> {
+                                    try {
+                                        doRefresh();
+                                    } catch (Exception e) {
+                                        LOG.error(
+                                                "Refresh lookup table {} 
failed",
+                                                context.table.name(),
+                                                e);
+                                        cachedException.set(e);
+                                    }
+                                });
+            } catch (RejectedExecutionException ignored) {
+                LOG.warn(
+                        "Add refresh task for lookup table {} failed",
+                        context.table.name(),
+                        ignored);
+            }
+            if (currentFuture != null) {
+                refreshFuture = currentFuture;
+            }
+        }
+    }
+
+    private void doRefresh() throws Exception {
         while (true) {
             try (RecordReaderIterator<InternalRow> batch =
                     new RecordReaderIterator<>(reader.nextBatch(false))) {
@@ -158,7 +239,14 @@ public abstract class FullCacheLookupTable implements 
LookupTable {
 
     @Override
     public final List<InternalRow> get(InternalRow key) throws IOException {
-        List<InternalRow> values = innerGet(key);
+        List<InternalRow> values;
+        if (refreshAsync) {
+            synchronized (lock) {
+                values = innerGet(key);
+            }
+        } else {
+            values = innerGet(key);
+        }
         if (appendUdsFieldNumber == 0) {
             return values;
         }
@@ -171,9 +259,23 @@ public abstract class FullCacheLookupTable implements 
LookupTable {
         return dropSequence;
     }
 
+    public void refresh(Iterator<InternalRow> input) throws IOException {
+        Predicate predicate = projectedPredicate();
+        while (input.hasNext()) {
+            InternalRow row = input.next();
+            if (refreshAsync) {
+                synchronized (lock) {
+                    refreshRow(row, predicate);
+                }
+            } else {
+                refreshRow(row, predicate);
+            }
+        }
+    }
+
     public abstract List<InternalRow> innerGet(InternalRow key) throws 
IOException;
 
-    public abstract void refresh(Iterator<InternalRow> input) throws 
IOException;
+    protected abstract void refreshRow(InternalRow row, Predicate predicate) 
throws IOException;
 
     @Nullable
     public Predicate projectedPredicate() {
@@ -188,8 +290,12 @@ public abstract class FullCacheLookupTable implements 
LookupTable {
 
     @Override
     public void close() throws IOException {
-        stateFactory.close();
-        FileIOUtils.deleteDirectory(context.tempPath);
+        try {
+            stateFactory.close();
+            FileIOUtils.deleteDirectory(context.tempPath);
+        } finally {
+            refreshExecutor.shutdown();
+        }
     }
 
     /** Bulk loader for the table. */
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
index 6e32bb7d3..ce64e27e3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
@@ -144,4 +144,9 @@ public class LookupStreamingReader {
         }
         return reader;
     }
+
+    @Nullable
+    public Long nextSnapshotId() {
+        return scan.checkpoint();
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
index eaad549ee..84587083b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
@@ -74,21 +74,21 @@ public class NoPrimaryKeyLookupTable extends 
FullCacheLookupTable {
             throw new IllegalArgumentException(
                     "Append table does not support user defined sequence 
fields.");
         }
+        super.refresh(incremental);
+    }
 
-        Predicate predicate = projectedPredicate();
-        while (incremental.hasNext()) {
-            InternalRow row = incremental.next();
-            joinKeyRow.replaceRow(row);
-            if (row.getRowKind() == RowKind.INSERT || row.getRowKind() == 
RowKind.UPDATE_AFTER) {
-                if (predicate == null || predicate.test(row)) {
-                    state.add(joinKeyRow, row);
-                }
-            } else {
-                throw new RuntimeException(
-                        String.format(
-                                "Received %s message. Only INSERT/UPDATE_AFTER 
values are expected here.",
-                                row.getRowKind()));
+    @Override
+    protected void refreshRow(InternalRow row, Predicate predicate) throws 
IOException {
+        joinKeyRow.replaceRow(row);
+        if (row.getRowKind() == RowKind.INSERT || row.getRowKind() == 
RowKind.UPDATE_AFTER) {
+            if (predicate == null || predicate.test(row)) {
+                state.add(joinKeyRow, row);
             }
+        } else {
+            throw new RuntimeException(
+                    String.format(
+                            "Received %s message. Only INSERT/UPDATE_AFTER 
values are expected here.",
+                            row.getRowKind()));
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
index 375b93461..c06120d61 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
@@ -33,7 +33,6 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 
 /** A {@link LookupTable} for primary key table. */
@@ -95,29 +94,25 @@ public class PrimaryKeyLookupTable extends 
FullCacheLookupTable {
     }
 
     @Override
-    public void refresh(Iterator<InternalRow> incremental) throws IOException {
-        Predicate predicate = projectedPredicate();
-        while (incremental.hasNext()) {
-            InternalRow row = incremental.next();
-            primaryKeyRow.replaceRow(row);
-            if (userDefinedSeqComparator != null) {
-                InternalRow previous = tableState.get(primaryKeyRow);
-                if (previous != null && 
userDefinedSeqComparator.compare(previous, row) > 0) {
-                    continue;
-                }
+    protected void refreshRow(InternalRow row, Predicate predicate) throws 
IOException {
+        primaryKeyRow.replaceRow(row);
+        if (userDefinedSeqComparator != null) {
+            InternalRow previous = tableState.get(primaryKeyRow);
+            if (previous != null && userDefinedSeqComparator.compare(previous, 
row) > 0) {
+                return;
             }
+        }
 
-            if (row.getRowKind() == RowKind.INSERT || row.getRowKind() == 
RowKind.UPDATE_AFTER) {
-                if (predicate == null || predicate.test(row)) {
-                    tableState.put(primaryKeyRow, row);
-                } else {
-                    // The new record under primary key is filtered
-                    // We need to delete this primary key as it no longer 
exists.
-                    tableState.delete(primaryKeyRow);
-                }
+        if (row.getRowKind() == RowKind.INSERT || row.getRowKind() == 
RowKind.UPDATE_AFTER) {
+            if (predicate == null || predicate.test(row)) {
+                tableState.put(primaryKeyRow, row);
             } else {
+                // The new record under primary key is filtered
+                // We need to delete this primary key as it no longer exists.
                 tableState.delete(primaryKeyRow);
             }
+        } else {
+            tableState.delete(primaryKeyRow);
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
index f551f17cc..5ebace6cd 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
@@ -28,7 +28,6 @@ import org.apache.paimon.utils.TypeUtils;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 
 /** A {@link LookupTable} for primary key table which provides lookup by 
secondary key. */
@@ -74,40 +73,36 @@ public class SecondaryIndexLookupTable extends 
PrimaryKeyLookupTable {
     }
 
     @Override
-    public void refresh(Iterator<InternalRow> incremental) throws IOException {
-        Predicate predicate = projectedPredicate();
-        while (incremental.hasNext()) {
-            InternalRow row = incremental.next();
-            primaryKeyRow.replaceRow(row);
+    protected void refreshRow(InternalRow row, Predicate predicate) throws 
IOException {
+        primaryKeyRow.replaceRow(row);
 
-            boolean previousFetched = false;
-            InternalRow previous = null;
-            if (userDefinedSeqComparator != null) {
-                previous = tableState.get(primaryKeyRow);
-                previousFetched = true;
-                if (previous != null && 
userDefinedSeqComparator.compare(previous, row) > 0) {
-                    continue;
-                }
+        boolean previousFetched = false;
+        InternalRow previous = null;
+        if (userDefinedSeqComparator != null) {
+            previous = tableState.get(primaryKeyRow);
+            previousFetched = true;
+            if (previous != null && userDefinedSeqComparator.compare(previous, 
row) > 0) {
+                return;
             }
+        }
 
-            if (row.getRowKind() == RowKind.INSERT || row.getRowKind() == 
RowKind.UPDATE_AFTER) {
-                if (!previousFetched) {
-                    previous = tableState.get(primaryKeyRow);
-                }
-                if (previous != null) {
-                    indexState.retract(secKeyRow.replaceRow(previous), 
primaryKeyRow);
-                }
+        if (row.getRowKind() == RowKind.INSERT || row.getRowKind() == 
RowKind.UPDATE_AFTER) {
+            if (!previousFetched) {
+                previous = tableState.get(primaryKeyRow);
+            }
+            if (previous != null) {
+                indexState.retract(secKeyRow.replaceRow(previous), 
primaryKeyRow);
+            }
 
-                if (predicate == null || predicate.test(row)) {
-                    tableState.put(primaryKeyRow, row);
-                    indexState.add(secKeyRow.replaceRow(row), primaryKeyRow);
-                } else {
-                    tableState.delete(primaryKeyRow);
-                }
+            if (predicate == null || predicate.test(row)) {
+                tableState.put(primaryKeyRow, row);
+                indexState.add(secKeyRow.replaceRow(row), primaryKeyRow);
             } else {
                 tableState.delete(primaryKeyRow);
-                indexState.retract(secKeyRow.replaceRow(row), primaryKeyRow);
             }
+        } else {
+            tableState.delete(primaryKeyRow);
+            indexState.retract(secKeyRow.replaceRow(row), primaryKeyRow);
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
index a8aa2530d..14b0ba920 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
@@ -46,6 +46,8 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.net.InetSocketAddress;
 import java.nio.file.Path;
@@ -79,18 +81,24 @@ public class FileStoreLookupFunctionTest {
         tablePath = new org.apache.paimon.fs.Path(tempDir.toString());
     }
 
-    private void createLookupFunction() throws Exception {
-        createLookupFunction(true, false);
+    private void createLookupFunction(boolean refreshAsync) throws Exception {
+        createLookupFunction(true, false, refreshAsync);
     }
 
-    private void createLookupFunction(boolean isPartition, boolean 
joinEqualPk) throws Exception {
-        createLookupFunction(isPartition, joinEqualPk, false);
+    private void createLookupFunction(
+            boolean isPartition, boolean joinEqualPk, boolean refreshAsync) 
throws Exception {
+        createLookupFunction(isPartition, joinEqualPk, false, refreshAsync);
     }
 
     private void createLookupFunction(
-            boolean isPartition, boolean joinEqualPk, boolean 
dynamicPartition) throws Exception {
+            boolean isPartition,
+            boolean joinEqualPk,
+            boolean dynamicPartition,
+            boolean refreshAsync)
+            throws Exception {
         SchemaManager schemaManager = new SchemaManager(fileIO, tablePath);
         Options conf = new Options();
+        conf.set(FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC, refreshAsync);
         conf.set(CoreOptions.BUCKET, 2);
         conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 3);
         conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 2);
@@ -132,18 +140,20 @@ public class FileStoreLookupFunctionTest {
         }
     }
 
-    @Test
-    public void testDefaultLocalPartial() throws Exception {
-        createLookupFunction(false, true);
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testDefaultLocalPartial(boolean refreshAsync) throws Exception 
{
+        createLookupFunction(false, true, refreshAsync);
         
assertThat(lookupFunction.lookupTable()).isInstanceOf(PrimaryKeyPartialLookupTable.class);
         QueryExecutor queryExecutor =
                 ((PrimaryKeyPartialLookupTable) 
lookupFunction.lookupTable()).queryExecutor();
         assertThat(queryExecutor).isInstanceOf(LocalQueryExecutor.class);
     }
 
-    @Test
-    public void testDefaultRemotePartial() throws Exception {
-        createLookupFunction(false, true);
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testDefaultRemotePartial(boolean refreshAsync) throws 
Exception {
+        createLookupFunction(false, true, refreshAsync);
         ServiceManager serviceManager = new ServiceManager(fileIO, tablePath);
         serviceManager.resetService(
                 PRIMARY_KEY_LOOKUP, new InetSocketAddress[] {new 
InetSocketAddress(1)});
@@ -154,9 +164,10 @@ public class FileStoreLookupFunctionTest {
         assertThat(queryExecutor).isInstanceOf(RemoteQueryExecutor.class);
     }
 
-    @Test
-    public void testLookupScanLeak() throws Exception {
-        createLookupFunction();
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testLookupScanLeak(boolean refreshAsync) throws Exception {
+        createLookupFunction(refreshAsync);
         commit(writeCommit(1));
         lookupFunction.lookup(new FlinkRowData(GenericRow.of(1, 1, 10L)));
         assertThat(
@@ -174,9 +185,10 @@ public class FileStoreLookupFunctionTest {
                 .isEqualTo(0);
     }
 
-    @Test
-    public void testLookupExpiredSnapshot() throws Exception {
-        createLookupFunction();
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testLookupExpiredSnapshot(boolean refreshAsync) throws 
Exception {
+        createLookupFunction(refreshAsync);
         commit(writeCommit(1));
         lookupFunction.lookup(new FlinkRowData(GenericRow.of(1, 1, 10L)));
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
index d9cb58b43..2b45b38bb 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
@@ -25,6 +25,7 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.JoinedRow;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.disk.IOManagerImpl;
+import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.lookup.FullCacheLookupTable.TableBulkLoader;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.local.LocalFileIO;
@@ -37,6 +38,9 @@ import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
 import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
@@ -47,6 +51,8 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
 
 import java.io.IOException;
@@ -60,6 +66,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeoutException;
 
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
@@ -596,6 +603,72 @@ public class LookupTableTest extends TableTestBase {
         assertRow(result.get(0), 22, -2);
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testPKLookupTableRefreshAsync(boolean refreshAsync) throws 
Exception {
+        Options options = new Options();
+        options.set(FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC, refreshAsync);
+        FileStoreTable storeTable = createTable(singletonList("f0"), options);
+        FullCacheLookupTable.Context context =
+                new FullCacheLookupTable.Context(
+                        storeTable,
+                        new int[] {0, 1, 2},
+                        null,
+                        null,
+                        tempDir.toFile(),
+                        singletonList("f0"));
+        table = FullCacheLookupTable.create(context, 
ThreadLocalRandom.current().nextInt(2) * 10);
+        table.open();
+
+        // Batch insert 100_000 records into table store
+        BatchWriteBuilder writeBuilder = storeTable.newBatchWriteBuilder();
+        Set<Integer> insertKeys = new HashSet<>();
+        try (BatchTableWrite write = writeBuilder.newWrite()) {
+            for (int i = 1; i <= 100_000; i++) {
+                insertKeys.add(i);
+                write.write(row(i, 11 * i, 111 * i), 0);
+            }
+            try (BatchTableCommit commit = writeBuilder.newCommit()) {
+                commit.commit(write.prepareCommit());
+            }
+        }
+
+        // Refresh lookup table
+        table.refresh();
+        Set<Integer> batchKeys = new HashSet<>();
+        long start = System.currentTimeMillis();
+        while (batchKeys.size() < 100_000) {
+            Thread.sleep(10);
+            for (int i = 1; i <= 100_000; i++) {
+                List<InternalRow> result = table.get(row(i));
+                if (!result.isEmpty()) {
+                    assertThat(result).hasSize(1);
+                    assertRow(result.get(0), i, 11 * i, 111 * i);
+                    batchKeys.add(i);
+                }
+            }
+            if (System.currentTimeMillis() - start > 30_000) {
+                throw new TimeoutException();
+            }
+        }
+        assertThat(batchKeys).isEqualTo(insertKeys);
+
+        // Add 10 snapshots and refresh lookup table
+        for (int k = 0; k < 10; k++) {
+            try (BatchTableWrite write = writeBuilder.newWrite()) {
+                for (int i = 1; i <= 100; i++) {
+                    write.write(row(i, 11 * i, 111 * i), 0);
+                }
+                try (BatchTableCommit commit = writeBuilder.newCommit()) {
+                    commit.commit(write.prepareCommit());
+                }
+            }
+        }
+        table.refresh();
+
+        table.close();
+    }
+
     private FileStoreTable createDimTable() throws Exception {
         FileIO fileIO = LocalFileIO.create();
         org.apache.paimon.fs.Path tablePath =

Reply via email to