This is an automated email from the ASF dual-hosted git repository.
zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ab07e6f5f [flink][lookup] Support async refresh of lookup table (#3297)
ab07e6f5f is described below
commit ab07e6f5f78d1b2b48a96b7693828616b84440e5
Author: Fang Yong <[email protected]>
AuthorDate: Wed May 22 19:52:10 2024 +0800
[flink][lookup] Support async refresh of lookup table (#3297)
* [flink][lookup] Support async refresh of lookup table
---
.../generated/flink_connector_configuration.html | 12 +++
.../apache/paimon/flink/FlinkConnectorOptions.java | 13 +++
.../paimon/flink/lookup/FullCacheLookupTable.java | 116 ++++++++++++++++++++-
.../paimon/flink/lookup/LookupStreamingReader.java | 5 +
.../flink/lookup/NoPrimaryKeyLookupTable.java | 26 ++---
.../paimon/flink/lookup/PrimaryKeyLookupTable.java | 33 +++---
.../flink/lookup/SecondaryIndexLookupTable.java | 51 ++++-----
.../flink/lookup/FileStoreLookupFunctionTest.java | 46 +++++---
.../paimon/flink/lookup/LookupTableTest.java | 73 +++++++++++++
9 files changed, 293 insertions(+), 82 deletions(-)
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 0280581d2..4fb5fc271 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -68,6 +68,18 @@ under the License.
<td>Duration</td>
<td>Specific dynamic partition refresh interval for lookup, scan
all partitions and obtain corresponding partition.</td>
</tr>
+ <tr>
+ <td><h5>lookup.refresh.async</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether to refresh lookup table in an async thread.</td>
+ </tr>
+ <tr>
+ <td><h5>lookup.refresh.async.pending-snapshot-count</h5></td>
+ <td style="word-wrap: break-word;">5</td>
+ <td>Integer</td>
+ <td>If the pending snapshot count exceeds the threshold, lookup
operator will refresh the table in sync.</td>
+ </tr>
<tr>
<td><h5>partition.idle-time-to-done</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index e02b1c124..275ce836a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -311,6 +311,19 @@ public class FlinkConnectorOptions {
"Specific dynamic partition refresh interval for
lookup, "
+ "scan all partitions and obtain
corresponding partition.");
+ public static final ConfigOption<Boolean> LOOKUP_REFRESH_ASYNC =
+ ConfigOptions.key("lookup.refresh.async")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether to refresh lookup table in an
async thread.");
+
+ public static final ConfigOption<Integer>
LOOKUP_REFRESH_ASYNC_PENDING_SNAPSHOT_COUNT =
+ ConfigOptions.key("lookup.refresh.async.pending-snapshot-count")
+ .intType()
+ .defaultValue(5)
+ .withDescription(
+ "If the pending snapshot count exceeds the
threshold, lookup operator will refresh the table in sync.");
+
public static final ConfigOption<Boolean> SINK_AUTO_TAG_FOR_SAVEPOINT =
ConfigOptions.key("sink.savepoint.auto-tag")
.booleanType()
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
index 9ed83a9bb..1c8db0916 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
@@ -26,12 +26,14 @@ import org.apache.paimon.disk.IOManager;
import org.apache.paimon.lookup.BulkLoader;
import org.apache.paimon.lookup.RocksDBState;
import org.apache.paimon.lookup.RocksDBStateFactory;
+import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.sort.BinaryExternalSortBuffer;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.ExecutorThreadFactory;
import org.apache.paimon.utils.FieldsComparator;
import org.apache.paimon.utils.FileIOUtils;
import org.apache.paimon.utils.MutableObjectIterator;
@@ -39,6 +41,11 @@ import org.apache.paimon.utils.PartialRow;
import org.apache.paimon.utils.TypeUtils;
import org.apache.paimon.utils.UserDefinedSeqComparator;
+import
org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import javax.annotation.Nullable;
import java.io.File;
@@ -47,24 +54,40 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC_PENDING_SNAPSHOT_COUNT;
/** Lookup table of full cache. */
public abstract class FullCacheLookupTable implements LookupTable {
+ private static final Logger LOG =
LoggerFactory.getLogger(FullCacheLookupTable.class);
+ protected final Object lock = new Object();
protected final Context context;
protected final RowType projectedType;
+ protected final boolean refreshAsync;
@Nullable protected final FieldsComparator userDefinedSeqComparator;
protected final int appendUdsFieldNumber;
protected RocksDBStateFactory stateFactory;
+ private final ExecutorService refreshExecutor;
+ private final AtomicReference<Exception> cachedException;
+ private final int maxPendingSnapshotCount;
+ private final FileStoreTable table;
+ private Future<?> refreshFuture;
private LookupStreamingReader reader;
private Predicate specificPartition;
public FullCacheLookupTable(Context context) {
this.context = context;
- FileStoreTable table = context.table;
+ this.table = context.table;
List<String> sequenceFields = new ArrayList<>();
if (table.primaryKeys().size() > 0) {
sequenceFields = new CoreOptions(table.options()).sequenceField();
@@ -91,7 +114,20 @@ public abstract class FullCacheLookupTable implements
LookupTable {
this.userDefinedSeqComparator = null;
this.appendUdsFieldNumber = 0;
}
+
+ Options options = Options.fromMap(context.table.options());
this.projectedType = projectedType;
+ this.refreshAsync = options.get(LOOKUP_REFRESH_ASYNC);
+ this.refreshExecutor =
+ this.refreshAsync
+ ? Executors.newSingleThreadExecutor(
+ new ExecutorThreadFactory(
+ String.format(
+ "%s-lookup-refresh",
+
Thread.currentThread().getName())))
+ : MoreExecutors.newDirectExecutorService();
+ this.cachedException = new AtomicReference<>();
+ this.maxPendingSnapshotCount =
options.get(LOOKUP_REFRESH_ASYNC_PENDING_SNAPSHOT_COUNT);
}
@Override
@@ -145,6 +181,51 @@ public abstract class FullCacheLookupTable implements
LookupTable {
@Override
public void refresh() throws Exception {
+ Long latestSnapshotId = table.snapshotManager().latestSnapshotId();
+ Long nextSnapshotId = reader.nextSnapshotId();
+ if (latestSnapshotId != null
+ && nextSnapshotId != null
+ && latestSnapshotId - nextSnapshotId >
maxPendingSnapshotCount) {
+ LOG.warn(
+ "The latest snapshot id {} is much greater than the next
snapshot id {} for {}}, "
+ + "you may need to increase the parallelism of
lookup operator.",
+ latestSnapshotId,
+ nextSnapshotId,
+ maxPendingSnapshotCount);
+ if (refreshFuture != null) {
+ // Wait the previous refresh task to be finished.
+ refreshFuture.get();
+ }
+ doRefresh();
+ } else {
+ Future<?> currentFuture = null;
+ try {
+ currentFuture =
+ refreshExecutor.submit(
+ () -> {
+ try {
+ doRefresh();
+ } catch (Exception e) {
+ LOG.error(
+ "Refresh lookup table {}
failed",
+ context.table.name(),
+ e);
+ cachedException.set(e);
+ }
+ });
+ } catch (RejectedExecutionException ignored) {
+ LOG.warn(
+ "Add refresh task for lookup table {} failed",
+ context.table.name(),
+ ignored);
+ }
+ if (currentFuture != null) {
+ refreshFuture = currentFuture;
+ }
+ }
+ }
+
+ private void doRefresh() throws Exception {
while (true) {
try (RecordReaderIterator<InternalRow> batch =
new RecordReaderIterator<>(reader.nextBatch(false))) {
@@ -158,7 +239,14 @@ public abstract class FullCacheLookupTable implements
LookupTable {
@Override
public final List<InternalRow> get(InternalRow key) throws IOException {
- List<InternalRow> values = innerGet(key);
+ List<InternalRow> values;
+ if (refreshAsync) {
+ synchronized (lock) {
+ values = innerGet(key);
+ }
+ } else {
+ values = innerGet(key);
+ }
if (appendUdsFieldNumber == 0) {
return values;
}
@@ -171,9 +259,23 @@ public abstract class FullCacheLookupTable implements
LookupTable {
return dropSequence;
}
+ public void refresh(Iterator<InternalRow> input) throws IOException {
+ Predicate predicate = projectedPredicate();
+ while (input.hasNext()) {
+ InternalRow row = input.next();
+ if (refreshAsync) {
+ synchronized (lock) {
+ refreshRow(row, predicate);
+ }
+ } else {
+ refreshRow(row, predicate);
+ }
+ }
+ }
+
public abstract List<InternalRow> innerGet(InternalRow key) throws
IOException;
- public abstract void refresh(Iterator<InternalRow> input) throws
IOException;
+ protected abstract void refreshRow(InternalRow row, Predicate predicate)
throws IOException;
@Nullable
public Predicate projectedPredicate() {
@@ -188,8 +290,12 @@ public abstract class FullCacheLookupTable implements
LookupTable {
@Override
public void close() throws IOException {
- stateFactory.close();
- FileIOUtils.deleteDirectory(context.tempPath);
+ try {
+ stateFactory.close();
+ FileIOUtils.deleteDirectory(context.tempPath);
+ } finally {
+ refreshExecutor.shutdown();
+ }
}
/** Bulk loader for the table. */
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
index 6e32bb7d3..ce64e27e3 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
@@ -144,4 +144,9 @@ public class LookupStreamingReader {
}
return reader;
}
+
+ @Nullable
+ public Long nextSnapshotId() {
+ return scan.checkpoint();
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
index eaad549ee..84587083b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
@@ -74,21 +74,21 @@ public class NoPrimaryKeyLookupTable extends
FullCacheLookupTable {
throw new IllegalArgumentException(
"Append table does not support user defined sequence
fields.");
}
+ super.refresh(incremental);
+ }
- Predicate predicate = projectedPredicate();
- while (incremental.hasNext()) {
- InternalRow row = incremental.next();
- joinKeyRow.replaceRow(row);
- if (row.getRowKind() == RowKind.INSERT || row.getRowKind() ==
RowKind.UPDATE_AFTER) {
- if (predicate == null || predicate.test(row)) {
- state.add(joinKeyRow, row);
- }
- } else {
- throw new RuntimeException(
- String.format(
- "Received %s message. Only INSERT/UPDATE_AFTER
values are expected here.",
- row.getRowKind()));
+ @Override
+ protected void refreshRow(InternalRow row, Predicate predicate) throws
IOException {
+ joinKeyRow.replaceRow(row);
+ if (row.getRowKind() == RowKind.INSERT || row.getRowKind() ==
RowKind.UPDATE_AFTER) {
+ if (predicate == null || predicate.test(row)) {
+ state.add(joinKeyRow, row);
}
+ } else {
+ throw new RuntimeException(
+ String.format(
+ "Received %s message. Only INSERT/UPDATE_AFTER
values are expected here.",
+ row.getRowKind()));
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
index 375b93461..c06120d61 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
@@ -33,7 +33,6 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
/** A {@link LookupTable} for primary key table. */
@@ -95,29 +94,25 @@ public class PrimaryKeyLookupTable extends
FullCacheLookupTable {
}
@Override
- public void refresh(Iterator<InternalRow> incremental) throws IOException {
- Predicate predicate = projectedPredicate();
- while (incremental.hasNext()) {
- InternalRow row = incremental.next();
- primaryKeyRow.replaceRow(row);
- if (userDefinedSeqComparator != null) {
- InternalRow previous = tableState.get(primaryKeyRow);
- if (previous != null &&
userDefinedSeqComparator.compare(previous, row) > 0) {
- continue;
- }
+ protected void refreshRow(InternalRow row, Predicate predicate) throws
IOException {
+ primaryKeyRow.replaceRow(row);
+ if (userDefinedSeqComparator != null) {
+ InternalRow previous = tableState.get(primaryKeyRow);
+ if (previous != null && userDefinedSeqComparator.compare(previous,
row) > 0) {
+ return;
}
+ }
- if (row.getRowKind() == RowKind.INSERT || row.getRowKind() ==
RowKind.UPDATE_AFTER) {
- if (predicate == null || predicate.test(row)) {
- tableState.put(primaryKeyRow, row);
- } else {
- // The new record under primary key is filtered
- // We need to delete this primary key as it no longer
exists.
- tableState.delete(primaryKeyRow);
- }
+ if (row.getRowKind() == RowKind.INSERT || row.getRowKind() ==
RowKind.UPDATE_AFTER) {
+ if (predicate == null || predicate.test(row)) {
+ tableState.put(primaryKeyRow, row);
} else {
+ // The new record under primary key is filtered
+ // We need to delete this primary key as it no longer exists.
tableState.delete(primaryKeyRow);
}
+ } else {
+ tableState.delete(primaryKeyRow);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
index f551f17cc..5ebace6cd 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
@@ -28,7 +28,6 @@ import org.apache.paimon.utils.TypeUtils;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
/** A {@link LookupTable} for primary key table which provides lookup by
secondary key. */
@@ -74,40 +73,36 @@ public class SecondaryIndexLookupTable extends
PrimaryKeyLookupTable {
}
@Override
- public void refresh(Iterator<InternalRow> incremental) throws IOException {
- Predicate predicate = projectedPredicate();
- while (incremental.hasNext()) {
- InternalRow row = incremental.next();
- primaryKeyRow.replaceRow(row);
+ protected void refreshRow(InternalRow row, Predicate predicate) throws
IOException {
+ primaryKeyRow.replaceRow(row);
- boolean previousFetched = false;
- InternalRow previous = null;
- if (userDefinedSeqComparator != null) {
- previous = tableState.get(primaryKeyRow);
- previousFetched = true;
- if (previous != null &&
userDefinedSeqComparator.compare(previous, row) > 0) {
- continue;
- }
+ boolean previousFetched = false;
+ InternalRow previous = null;
+ if (userDefinedSeqComparator != null) {
+ previous = tableState.get(primaryKeyRow);
+ previousFetched = true;
+ if (previous != null && userDefinedSeqComparator.compare(previous,
row) > 0) {
+ return;
}
+ }
- if (row.getRowKind() == RowKind.INSERT || row.getRowKind() ==
RowKind.UPDATE_AFTER) {
- if (!previousFetched) {
- previous = tableState.get(primaryKeyRow);
- }
- if (previous != null) {
- indexState.retract(secKeyRow.replaceRow(previous),
primaryKeyRow);
- }
+ if (row.getRowKind() == RowKind.INSERT || row.getRowKind() ==
RowKind.UPDATE_AFTER) {
+ if (!previousFetched) {
+ previous = tableState.get(primaryKeyRow);
+ }
+ if (previous != null) {
+ indexState.retract(secKeyRow.replaceRow(previous),
primaryKeyRow);
+ }
- if (predicate == null || predicate.test(row)) {
- tableState.put(primaryKeyRow, row);
- indexState.add(secKeyRow.replaceRow(row), primaryKeyRow);
- } else {
- tableState.delete(primaryKeyRow);
- }
+ if (predicate == null || predicate.test(row)) {
+ tableState.put(primaryKeyRow, row);
+ indexState.add(secKeyRow.replaceRow(row), primaryKeyRow);
} else {
tableState.delete(primaryKeyRow);
- indexState.retract(secKeyRow.replaceRow(row), primaryKeyRow);
}
+ } else {
+ tableState.delete(primaryKeyRow);
+ indexState.retract(secKeyRow.replaceRow(row), primaryKeyRow);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
index a8aa2530d..14b0ba920 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
@@ -46,6 +46,8 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.net.InetSocketAddress;
import java.nio.file.Path;
@@ -79,18 +81,24 @@ public class FileStoreLookupFunctionTest {
tablePath = new org.apache.paimon.fs.Path(tempDir.toString());
}
- private void createLookupFunction() throws Exception {
- createLookupFunction(true, false);
+ private void createLookupFunction(boolean refreshAsync) throws Exception {
+ createLookupFunction(true, false, refreshAsync);
}
- private void createLookupFunction(boolean isPartition, boolean
joinEqualPk) throws Exception {
- createLookupFunction(isPartition, joinEqualPk, false);
+ private void createLookupFunction(
+ boolean isPartition, boolean joinEqualPk, boolean refreshAsync)
throws Exception {
+ createLookupFunction(isPartition, joinEqualPk, false, refreshAsync);
}
private void createLookupFunction(
- boolean isPartition, boolean joinEqualPk, boolean
dynamicPartition) throws Exception {
+ boolean isPartition,
+ boolean joinEqualPk,
+ boolean dynamicPartition,
+ boolean refreshAsync)
+ throws Exception {
SchemaManager schemaManager = new SchemaManager(fileIO, tablePath);
Options conf = new Options();
+ conf.set(FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC, refreshAsync);
conf.set(CoreOptions.BUCKET, 2);
conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 3);
conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 2);
@@ -132,18 +140,20 @@ public class FileStoreLookupFunctionTest {
}
}
- @Test
- public void testDefaultLocalPartial() throws Exception {
- createLookupFunction(false, true);
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testDefaultLocalPartial(boolean refreshAsync) throws Exception
{
+ createLookupFunction(false, true, refreshAsync);
assertThat(lookupFunction.lookupTable()).isInstanceOf(PrimaryKeyPartialLookupTable.class);
QueryExecutor queryExecutor =
((PrimaryKeyPartialLookupTable)
lookupFunction.lookupTable()).queryExecutor();
assertThat(queryExecutor).isInstanceOf(LocalQueryExecutor.class);
}
- @Test
- public void testDefaultRemotePartial() throws Exception {
- createLookupFunction(false, true);
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testDefaultRemotePartial(boolean refreshAsync) throws
Exception {
+ createLookupFunction(false, true, refreshAsync);
ServiceManager serviceManager = new ServiceManager(fileIO, tablePath);
serviceManager.resetService(
PRIMARY_KEY_LOOKUP, new InetSocketAddress[] {new
InetSocketAddress(1)});
@@ -154,9 +164,10 @@ public class FileStoreLookupFunctionTest {
assertThat(queryExecutor).isInstanceOf(RemoteQueryExecutor.class);
}
- @Test
- public void testLookupScanLeak() throws Exception {
- createLookupFunction();
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testLookupScanLeak(boolean refreshAsync) throws Exception {
+ createLookupFunction(refreshAsync);
commit(writeCommit(1));
lookupFunction.lookup(new FlinkRowData(GenericRow.of(1, 1, 10L)));
assertThat(
@@ -174,9 +185,10 @@ public class FileStoreLookupFunctionTest {
.isEqualTo(0);
}
- @Test
- public void testLookupExpiredSnapshot() throws Exception {
- createLookupFunction();
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testLookupExpiredSnapshot(boolean refreshAsync) throws
Exception {
+ createLookupFunction(refreshAsync);
commit(writeCommit(1));
lookupFunction.lookup(new FlinkRowData(GenericRow.of(1, 1, 10L)));
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
index d9cb58b43..2b45b38bb 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
@@ -25,6 +25,7 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.JoinedRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.disk.IOManagerImpl;
+import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.lookup.FullCacheLookupTable.TableBulkLoader;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.local.LocalFileIO;
@@ -37,6 +38,9 @@ import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
@@ -47,6 +51,8 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
import java.io.IOException;
@@ -60,6 +66,7 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeoutException;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
@@ -596,6 +603,72 @@ public class LookupTableTest extends TableTestBase {
assertRow(result.get(0), 22, -2);
}
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testPKLookupTableRefreshAsync(boolean refreshAsync) throws
Exception {
+ Options options = new Options();
+ options.set(FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC, refreshAsync);
+ FileStoreTable storeTable = createTable(singletonList("f0"), options);
+ FullCacheLookupTable.Context context =
+ new FullCacheLookupTable.Context(
+ storeTable,
+ new int[] {0, 1, 2},
+ null,
+ null,
+ tempDir.toFile(),
+ singletonList("f0"));
+ table = FullCacheLookupTable.create(context,
ThreadLocalRandom.current().nextInt(2) * 10);
+ table.open();
+
+ // Batch insert 100_000 records into table store
+ BatchWriteBuilder writeBuilder = storeTable.newBatchWriteBuilder();
+ Set<Integer> insertKeys = new HashSet<>();
+ try (BatchTableWrite write = writeBuilder.newWrite()) {
+ for (int i = 1; i <= 100_000; i++) {
+ insertKeys.add(i);
+ write.write(row(i, 11 * i, 111 * i), 0);
+ }
+ try (BatchTableCommit commit = writeBuilder.newCommit()) {
+ commit.commit(write.prepareCommit());
+ }
+ }
+
+ // Refresh lookup table
+ table.refresh();
+ Set<Integer> batchKeys = new HashSet<>();
+ long start = System.currentTimeMillis();
+ while (batchKeys.size() < 100_000) {
+ Thread.sleep(10);
+ for (int i = 1; i <= 100_000; i++) {
+ List<InternalRow> result = table.get(row(i));
+ if (!result.isEmpty()) {
+ assertThat(result).hasSize(1);
+ assertRow(result.get(0), i, 11 * i, 111 * i);
+ batchKeys.add(i);
+ }
+ }
+ if (System.currentTimeMillis() - start > 30_000) {
+ throw new TimeoutException();
+ }
+ }
+ assertThat(batchKeys).isEqualTo(insertKeys);
+
+ // Add 10 snapshots and refresh lookup table
+ for (int k = 0; k < 10; k++) {
+ try (BatchTableWrite write = writeBuilder.newWrite()) {
+ for (int i = 1; i <= 100; i++) {
+ write.write(row(i, 11 * i, 111 * i), 0);
+ }
+ try (BatchTableCommit commit = writeBuilder.newCommit()) {
+ commit.commit(write.prepareCommit());
+ }
+ }
+ }
+ table.refresh();
+
+ table.close();
+ }
+
private FileStoreTable createDimTable() throws Exception {
FileIO fileIO = LocalFileIO.create();
org.apache.paimon.fs.Path tablePath =