This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.0 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit a533dc57427bbefa9ea02ff2f0a2ad3635dd9f73 Author: WenjunMin <[email protected]> AuthorDate: Tue Jan 7 14:51:03 2025 +0800 [flink] Fix the refresh executor not work after reopen (#4851) --- .../paimon/flink/lookup/FullCacheLookupTable.java | 26 +++++++++------- .../flink/lookup/NoPrimaryKeyLookupTable.java | 2 +- .../paimon/flink/lookup/PrimaryKeyLookupTable.java | 2 +- .../flink/lookup/SecondaryIndexLookupTable.java | 2 +- .../paimon/flink/lookup/LookupTableTest.java | 35 ++++++++++++++++++++++ 5 files changed, 54 insertions(+), 13 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java index de69c67a4c..4154b6742c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.lookup; import org.apache.paimon.CoreOptions; +import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; @@ -79,7 +80,7 @@ public abstract class FullCacheLookupTable implements LookupTable { protected final int appendUdsFieldNumber; protected RocksDBStateFactory stateFactory; - @Nullable private final ExecutorService refreshExecutor; + @Nullable private ExecutorService refreshExecutor; private final AtomicReference<Exception> cachedException; private final int maxPendingSnapshotCount; private final FileStoreTable table; @@ -127,14 +128,6 @@ public abstract class FullCacheLookupTable implements LookupTable { Options options = Options.fromMap(context.table.options()); this.projectedType = projectedType; this.refreshAsync = options.get(LOOKUP_REFRESH_ASYNC); - this.refreshExecutor = - this.refreshAsync - ? Executors.newSingleThreadExecutor( - new ExecutorThreadFactory( - String.format( - "%s-lookup-refresh", - Thread.currentThread().getName()))) - : null; this.cachedException = new AtomicReference<>(); this.maxPendingSnapshotCount = options.get(LOOKUP_REFRESH_ASYNC_PENDING_SNAPSHOT_COUNT); } @@ -149,12 +142,20 @@ public abstract class FullCacheLookupTable implements LookupTable { this.cacheRowFilter = filter; } - protected void openStateFactory() throws Exception { + protected void init() throws Exception { this.stateFactory = new RocksDBStateFactory( context.tempPath.toString(), context.table.coreOptions().toConfiguration(), null); + this.refreshExecutor = + this.refreshAsync + ? Executors.newSingleThreadExecutor( + new ExecutorThreadFactory( + String.format( + "%s-lookup-refresh", + Thread.currentThread().getName()))) + : null; } protected void bootstrap() throws Exception { @@ -322,6 +323,11 @@ public abstract class FullCacheLookupTable implements LookupTable { } } + @VisibleForTesting + public Future<?> getRefreshFuture() { + return refreshFuture; + } + /** Bulk loader for the table. */ public interface TableBulkLoader { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java index 84587083bf..63af4f3506 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java @@ -52,7 +52,7 @@ public class NoPrimaryKeyLookupTable extends FullCacheLookupTable { @Override public void open() throws Exception { - openStateFactory(); + init(); this.state = stateFactory.listState( "join-key-index", diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java index c06120d61d..2a3099e9a6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java @@ -69,7 +69,7 @@ public class PrimaryKeyLookupTable extends FullCacheLookupTable { @Override public void open() throws Exception { - openStateFactory(); + init(); createTableState(); bootstrap(); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java index 5ebace6cd5..11c9cba24b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java @@ -46,7 +46,7 @@ public class SecondaryIndexLookupTable extends PrimaryKeyLookupTable { @Override public void open() throws Exception { - openStateFactory(); + init(); createTableState(); this.indexState = stateFactory.setState( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java index 46c61a15bd..88b9471330 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java @@ -600,6 +600,41 @@ public class LookupTableTest extends TableTestBase { assertThat(res).isEmpty(); } + @Test + public void testRefreshExecutorRebuildAfterReopen() throws Exception { + Options options = new Options(); + options.set(FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC, true); + FileStoreTable storeTable = createTable(singletonList("f0"), options); + writeWithBucketAssigner( + storeTable, row -> 0, GenericRow.of(1, 11, 111), GenericRow.of(2, 22, 222)); + + FullCacheLookupTable.Context context = + new FullCacheLookupTable.Context( + storeTable, + new int[] {0, 1, 2}, + null, + null, + tempDir.toFile(), + singletonList("f0"), + null); + table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10); + assertThat(table).isInstanceOf(PrimaryKeyLookupTable.class); + table.open(); + // reopen + table.close(); + table.open(); + List<InternalRow> res = table.get(GenericRow.of(1)); + assertThat(res).hasSize(1); + assertRow(res.get(0), 1, 11, 111); + writeWithBucketAssigner(storeTable, row -> 0, GenericRow.of(1, 22, 222)); + table.refresh(); + assertThat(table.getRefreshFuture()).isNotNull(); + table.getRefreshFuture().get(); + res = table.get(GenericRow.of(1)); + assertThat(res).hasSize(1); + assertRow(res.get(0), 1, 22, 222); + } + @Test public void testNoPkTableWithCacheRowFilter() throws Exception { FileStoreTable storeTable = createTable(emptyList(), new Options());
