This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 018f3a602 [flink] Fix re-open for FullCacheLookupTable implementations 
(#3149)
018f3a602 is described below

commit 018f3a6026a4c28ca46ae9b9f063a5d9911d16af
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Apr 2 17:13:50 2024 +0800

    [flink] Fix re-open for FullCacheLookupTable implementations (#3149)
---
 .../paimon/flink/lookup/FullCacheLookupTable.java  | 22 ++++++-----
 .../flink/lookup/NoPrimaryKeyLookupTable.java      |  5 ++-
 .../paimon/flink/lookup/PrimaryKeyLookupTable.java | 10 +++--
 .../flink/lookup/SecondaryIndexLookupTable.java    |  6 ++-
 .../paimon/flink/lookup/LookupTableTest.java       | 45 ++++++++++++++++++++++
 5 files changed, 71 insertions(+), 17 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
index 8e937ffe1..9ed83a9bb 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
@@ -53,22 +53,17 @@ import java.util.concurrent.atomic.AtomicInteger;
 public abstract class FullCacheLookupTable implements LookupTable {
 
     protected final Context context;
-    protected final RocksDBStateFactory stateFactory;
     protected final RowType projectedType;
 
     @Nullable protected final FieldsComparator userDefinedSeqComparator;
     protected final int appendUdsFieldNumber;
 
+    protected RocksDBStateFactory stateFactory;
     private LookupStreamingReader reader;
     private Predicate specificPartition;
 
-    public FullCacheLookupTable(Context context) throws IOException {
+    public FullCacheLookupTable(Context context) {
         this.context = context;
-        this.stateFactory =
-                new RocksDBStateFactory(
-                        context.tempPath.toString(),
-                        context.table.coreOptions().toConfiguration(),
-                        null);
         FileStoreTable table = context.table;
         List<String> sequenceFields = new ArrayList<>();
         if (table.primaryKeys().size() > 0) {
@@ -104,8 +99,15 @@ public abstract class FullCacheLookupTable implements 
LookupTable {
         this.specificPartition = filter;
     }
 
-    @Override
-    public void open() throws Exception {
+    protected void openStateFactory() throws Exception {
+        this.stateFactory =
+                new RocksDBStateFactory(
+                        context.tempPath.toString(),
+                        context.table.coreOptions().toConfiguration(),
+                        null);
+    }
+
+    protected void bootstrap() throws Exception {
         Predicate scanPredicate =
                 PredicateBuilder.andNullable(context.tablePredicate, 
specificPartition);
         this.reader = new LookupStreamingReader(context.table, 
context.projection, scanPredicate);
@@ -198,7 +200,7 @@ public abstract class FullCacheLookupTable implements 
LookupTable {
         void finish() throws IOException;
     }
 
-    static FullCacheLookupTable create(Context context, long lruCacheSize) 
throws IOException {
+    static FullCacheLookupTable create(Context context, long lruCacheSize) {
         List<String> primaryKeys = context.table.primaryKeys();
         if (primaryKeys.isEmpty()) {
             return new NoPrimaryKeyLookupTable(context, lruCacheSize);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
index 7f5d036b1..eaad549ee 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
@@ -42,7 +42,7 @@ public class NoPrimaryKeyLookupTable extends 
FullCacheLookupTable {
 
     private RocksDBListState<InternalRow, InternalRow> state;
 
-    public NoPrimaryKeyLookupTable(Context context, long lruCacheSize) throws 
IOException {
+    public NoPrimaryKeyLookupTable(Context context, long lruCacheSize) {
         super(context);
         this.lruCacheSize = lruCacheSize;
         List<String> fieldNames = projectedType.getFieldNames();
@@ -52,6 +52,7 @@ public class NoPrimaryKeyLookupTable extends 
FullCacheLookupTable {
 
     @Override
     public void open() throws Exception {
+        openStateFactory();
         this.state =
                 stateFactory.listState(
                         "join-key-index",
@@ -59,7 +60,7 @@ public class NoPrimaryKeyLookupTable extends 
FullCacheLookupTable {
                                 TypeUtils.project(projectedType, 
joinKeyRow.indexMapping())),
                         InternalSerializers.create(projectedType),
                         lruCacheSize);
-        super.open();
+        bootstrap();
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
index 889e1e35b..375b93461 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
@@ -47,8 +47,7 @@ public class PrimaryKeyLookupTable extends 
FullCacheLookupTable {
 
     protected RocksDBValueState<InternalRow, InternalRow> tableState;
 
-    public PrimaryKeyLookupTable(Context context, long lruCacheSize, 
List<String> joinKey)
-            throws IOException {
+    public PrimaryKeyLookupTable(Context context, long lruCacheSize, 
List<String> joinKey) {
         super(context);
         this.lruCacheSize = lruCacheSize;
         List<String> fieldNames = projectedType.getFieldNames();
@@ -71,6 +70,12 @@ public class PrimaryKeyLookupTable extends 
FullCacheLookupTable {
 
     @Override
     public void open() throws Exception {
+        openStateFactory();
+        createTableState();
+        bootstrap();
+    }
+
+    protected void createTableState() throws IOException {
         this.tableState =
                 stateFactory.valueState(
                         "table",
@@ -78,7 +83,6 @@ public class PrimaryKeyLookupTable extends 
FullCacheLookupTable {
                                 TypeUtils.project(projectedType, 
primaryKeyRow.indexMapping())),
                         InternalSerializers.create(projectedType),
                         lruCacheSize);
-        super.open();
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
index d4fb22c4b..f551f17cc 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
@@ -38,7 +38,7 @@ public class SecondaryIndexLookupTable extends 
PrimaryKeyLookupTable {
 
     private RocksDBSetState<InternalRow, InternalRow> indexState;
 
-    public SecondaryIndexLookupTable(Context context, long lruCacheSize) 
throws IOException {
+    public SecondaryIndexLookupTable(Context context, long lruCacheSize) {
         super(context, lruCacheSize / 2, context.table.primaryKeys());
         List<String> fieldNames = projectedType.getFieldNames();
         int[] secKeyMapping = 
context.joinKey.stream().mapToInt(fieldNames::indexOf).toArray();
@@ -47,6 +47,8 @@ public class SecondaryIndexLookupTable extends 
PrimaryKeyLookupTable {
 
     @Override
     public void open() throws Exception {
+        openStateFactory();
+        createTableState();
         this.indexState =
                 stateFactory.setState(
                         "sec-index",
@@ -55,7 +57,7 @@ public class SecondaryIndexLookupTable extends 
PrimaryKeyLookupTable {
                         InternalSerializers.create(
                                 TypeUtils.project(projectedType, 
primaryKeyRow.indexMapping())),
                         lruCacheSize);
-        super.open();
+        bootstrap();
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
index 065ace85a..d9cb58b43 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
@@ -118,6 +118,10 @@ public class LookupTableTest extends TableTestBase {
         table = FullCacheLookupTable.create(context, 
ThreadLocalRandom.current().nextInt(2) * 10);
         table.open();
 
+        // test re-open
+        table.close();
+        table.open();
+
         // test bulk load error
         {
             TableBulkLoader bulkLoader = table.createBulkLoader();
@@ -175,6 +179,10 @@ public class LookupTableTest extends TableTestBase {
         table = FullCacheLookupTable.create(context, 
ThreadLocalRandom.current().nextInt(2) * 10);
         table.open();
 
+        // test re-open
+        table.close();
+        table.open();
+
         List<Pair<byte[], byte[]>> records = new ArrayList<>();
         for (int i = 1; i <= 10; i++) {
             InternalRow row = row(i, 11 * i, 111 * i);
@@ -219,6 +227,10 @@ public class LookupTableTest extends TableTestBase {
         table = FullCacheLookupTable.create(context, 
ThreadLocalRandom.current().nextInt(2) * 10);
         table.open();
 
+        // test re-open
+        table.close();
+        table.open();
+
         table.refresh(singletonList(row(1, 11, 111)).iterator());
         List<InternalRow> result = table.get(row(1));
         assertThat(result).hasSize(1);
@@ -250,6 +262,10 @@ public class LookupTableTest extends TableTestBase {
         table = FullCacheLookupTable.create(context, 
ThreadLocalRandom.current().nextInt(2) * 10);
         table.open();
 
+        // test re-open
+        table.close();
+        table.open();
+
         table.refresh(singletonList(row(1, 11, 111)).iterator());
         List<InternalRow> result = table.get(row(1));
         assertThat(result).hasSize(1);
@@ -274,6 +290,10 @@ public class LookupTableTest extends TableTestBase {
         table = FullCacheLookupTable.create(context, 
ThreadLocalRandom.current().nextInt(2) * 10);
         table.open();
 
+        // test re-open
+        table.close();
+        table.open();
+
         // test bulk load 100_000 records
         List<Pair<byte[], byte[]>> records = new ArrayList<>();
         Random rnd = new Random();
@@ -319,6 +339,10 @@ public class LookupTableTest extends TableTestBase {
         table = FullCacheLookupTable.create(context, 
ThreadLocalRandom.current().nextInt(2) * 10);
         table.open();
 
+        // test re-open
+        table.close();
+        table.open();
+
         List<Pair<byte[], byte[]>> records = new ArrayList<>();
         Random rnd = new Random();
         Map<Integer, Set<Integer>> secKeyToPk = new HashMap<>();
@@ -367,6 +391,10 @@ public class LookupTableTest extends TableTestBase {
         table = FullCacheLookupTable.create(context, 
ThreadLocalRandom.current().nextInt(2) * 10);
         table.open();
 
+        // test re-open
+        table.close();
+        table.open();
+
         table.refresh(singletonList(row(1, 11, 111)).iterator());
         List<InternalRow> result = table.get(row(11));
         assertThat(result).hasSize(1);
@@ -407,6 +435,10 @@ public class LookupTableTest extends TableTestBase {
         table = FullCacheLookupTable.create(context, 
ThreadLocalRandom.current().nextInt(2) * 10);
         table.open();
 
+        // test re-open
+        table.close();
+        table.open();
+
         // test bulk load 100_000 records
         List<Pair<byte[], byte[]>> records = new ArrayList<>();
         Random rnd = new Random();
@@ -450,6 +482,10 @@ public class LookupTableTest extends TableTestBase {
         table = FullCacheLookupTable.create(context, 
ThreadLocalRandom.current().nextInt(2) * 10);
         table.open();
 
+        // test re-open
+        table.close();
+        table.open();
+
         table.refresh(singletonList(row(1, 11, 333)).iterator());
         List<InternalRow> result = table.get(row(11));
         assertThat(result).hasSize(0);
@@ -508,6 +544,11 @@ public class LookupTableTest extends TableTestBase {
                         tempDir.toFile(),
                         ImmutableList.of("pk1", "pk2"));
         table.open();
+
+        // test re-open
+        table.close();
+        table.open();
+
         List<InternalRow> result = table.get(row(1, -1));
         assertThat(result).hasSize(0);
 
@@ -535,6 +576,10 @@ public class LookupTableTest extends TableTestBase {
                         ImmutableList.of("pk2", "pk1"));
         table.open();
 
+        // test re-open
+        table.close();
+        table.open();
+
         List<InternalRow> result = table.get(row(-1, 1));
         assertThat(result).hasSize(0);
 

Reply via email to