This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 018f3a602 [flink] Fix re-open for FullCacheLookupTable implementations
(#3149)
018f3a602 is described below
commit 018f3a6026a4c28ca46ae9b9f063a5d9911d16af
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Apr 2 17:13:50 2024 +0800
[flink] Fix re-open for FullCacheLookupTable implementations (#3149)
---
.../paimon/flink/lookup/FullCacheLookupTable.java | 22 ++++++-----
.../flink/lookup/NoPrimaryKeyLookupTable.java | 5 ++-
.../paimon/flink/lookup/PrimaryKeyLookupTable.java | 10 +++--
.../flink/lookup/SecondaryIndexLookupTable.java | 6 ++-
.../paimon/flink/lookup/LookupTableTest.java | 45 ++++++++++++++++++++++
5 files changed, 71 insertions(+), 17 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
index 8e937ffe1..9ed83a9bb 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
@@ -53,22 +53,17 @@ import java.util.concurrent.atomic.AtomicInteger;
public abstract class FullCacheLookupTable implements LookupTable {
protected final Context context;
- protected final RocksDBStateFactory stateFactory;
protected final RowType projectedType;
@Nullable protected final FieldsComparator userDefinedSeqComparator;
protected final int appendUdsFieldNumber;
+ protected RocksDBStateFactory stateFactory;
private LookupStreamingReader reader;
private Predicate specificPartition;
- public FullCacheLookupTable(Context context) throws IOException {
+ public FullCacheLookupTable(Context context) {
this.context = context;
- this.stateFactory =
- new RocksDBStateFactory(
- context.tempPath.toString(),
- context.table.coreOptions().toConfiguration(),
- null);
FileStoreTable table = context.table;
List<String> sequenceFields = new ArrayList<>();
if (table.primaryKeys().size() > 0) {
@@ -104,8 +99,15 @@ public abstract class FullCacheLookupTable implements
LookupTable {
this.specificPartition = filter;
}
- @Override
- public void open() throws Exception {
+ protected void openStateFactory() throws Exception {
+ this.stateFactory =
+ new RocksDBStateFactory(
+ context.tempPath.toString(),
+ context.table.coreOptions().toConfiguration(),
+ null);
+ }
+
+ protected void bootstrap() throws Exception {
Predicate scanPredicate =
PredicateBuilder.andNullable(context.tablePredicate,
specificPartition);
this.reader = new LookupStreamingReader(context.table,
context.projection, scanPredicate);
@@ -198,7 +200,7 @@ public abstract class FullCacheLookupTable implements
LookupTable {
void finish() throws IOException;
}
- static FullCacheLookupTable create(Context context, long lruCacheSize)
throws IOException {
+ static FullCacheLookupTable create(Context context, long lruCacheSize) {
List<String> primaryKeys = context.table.primaryKeys();
if (primaryKeys.isEmpty()) {
return new NoPrimaryKeyLookupTable(context, lruCacheSize);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
index 7f5d036b1..eaad549ee 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
@@ -42,7 +42,7 @@ public class NoPrimaryKeyLookupTable extends
FullCacheLookupTable {
private RocksDBListState<InternalRow, InternalRow> state;
- public NoPrimaryKeyLookupTable(Context context, long lruCacheSize) throws
IOException {
+ public NoPrimaryKeyLookupTable(Context context, long lruCacheSize) {
super(context);
this.lruCacheSize = lruCacheSize;
List<String> fieldNames = projectedType.getFieldNames();
@@ -52,6 +52,7 @@ public class NoPrimaryKeyLookupTable extends
FullCacheLookupTable {
@Override
public void open() throws Exception {
+ openStateFactory();
this.state =
stateFactory.listState(
"join-key-index",
@@ -59,7 +60,7 @@ public class NoPrimaryKeyLookupTable extends
FullCacheLookupTable {
TypeUtils.project(projectedType,
joinKeyRow.indexMapping())),
InternalSerializers.create(projectedType),
lruCacheSize);
- super.open();
+ bootstrap();
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
index 889e1e35b..375b93461 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
@@ -47,8 +47,7 @@ public class PrimaryKeyLookupTable extends
FullCacheLookupTable {
protected RocksDBValueState<InternalRow, InternalRow> tableState;
- public PrimaryKeyLookupTable(Context context, long lruCacheSize,
List<String> joinKey)
- throws IOException {
+ public PrimaryKeyLookupTable(Context context, long lruCacheSize,
List<String> joinKey) {
super(context);
this.lruCacheSize = lruCacheSize;
List<String> fieldNames = projectedType.getFieldNames();
@@ -71,6 +70,12 @@ public class PrimaryKeyLookupTable extends
FullCacheLookupTable {
@Override
public void open() throws Exception {
+ openStateFactory();
+ createTableState();
+ bootstrap();
+ }
+
+ protected void createTableState() throws IOException {
this.tableState =
stateFactory.valueState(
"table",
@@ -78,7 +83,6 @@ public class PrimaryKeyLookupTable extends
FullCacheLookupTable {
TypeUtils.project(projectedType,
primaryKeyRow.indexMapping())),
InternalSerializers.create(projectedType),
lruCacheSize);
- super.open();
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
index d4fb22c4b..f551f17cc 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
@@ -38,7 +38,7 @@ public class SecondaryIndexLookupTable extends
PrimaryKeyLookupTable {
private RocksDBSetState<InternalRow, InternalRow> indexState;
- public SecondaryIndexLookupTable(Context context, long lruCacheSize)
throws IOException {
+ public SecondaryIndexLookupTable(Context context, long lruCacheSize) {
super(context, lruCacheSize / 2, context.table.primaryKeys());
List<String> fieldNames = projectedType.getFieldNames();
int[] secKeyMapping =
context.joinKey.stream().mapToInt(fieldNames::indexOf).toArray();
@@ -47,6 +47,8 @@ public class SecondaryIndexLookupTable extends
PrimaryKeyLookupTable {
@Override
public void open() throws Exception {
+ openStateFactory();
+ createTableState();
this.indexState =
stateFactory.setState(
"sec-index",
@@ -55,7 +57,7 @@ public class SecondaryIndexLookupTable extends
PrimaryKeyLookupTable {
InternalSerializers.create(
TypeUtils.project(projectedType,
primaryKeyRow.indexMapping())),
lruCacheSize);
- super.open();
+ bootstrap();
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
index 065ace85a..d9cb58b43 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
@@ -118,6 +118,10 @@ public class LookupTableTest extends TableTestBase {
table = FullCacheLookupTable.create(context,
ThreadLocalRandom.current().nextInt(2) * 10);
table.open();
+ // test re-open
+ table.close();
+ table.open();
+
// test bulk load error
{
TableBulkLoader bulkLoader = table.createBulkLoader();
@@ -175,6 +179,10 @@ public class LookupTableTest extends TableTestBase {
table = FullCacheLookupTable.create(context,
ThreadLocalRandom.current().nextInt(2) * 10);
table.open();
+ // test re-open
+ table.close();
+ table.open();
+
List<Pair<byte[], byte[]>> records = new ArrayList<>();
for (int i = 1; i <= 10; i++) {
InternalRow row = row(i, 11 * i, 111 * i);
@@ -219,6 +227,10 @@ public class LookupTableTest extends TableTestBase {
table = FullCacheLookupTable.create(context,
ThreadLocalRandom.current().nextInt(2) * 10);
table.open();
+ // test re-open
+ table.close();
+ table.open();
+
table.refresh(singletonList(row(1, 11, 111)).iterator());
List<InternalRow> result = table.get(row(1));
assertThat(result).hasSize(1);
@@ -250,6 +262,10 @@ public class LookupTableTest extends TableTestBase {
table = FullCacheLookupTable.create(context,
ThreadLocalRandom.current().nextInt(2) * 10);
table.open();
+ // test re-open
+ table.close();
+ table.open();
+
table.refresh(singletonList(row(1, 11, 111)).iterator());
List<InternalRow> result = table.get(row(1));
assertThat(result).hasSize(1);
@@ -274,6 +290,10 @@ public class LookupTableTest extends TableTestBase {
table = FullCacheLookupTable.create(context,
ThreadLocalRandom.current().nextInt(2) * 10);
table.open();
+ // test re-open
+ table.close();
+ table.open();
+
// test bulk load 100_000 records
List<Pair<byte[], byte[]>> records = new ArrayList<>();
Random rnd = new Random();
@@ -319,6 +339,10 @@ public class LookupTableTest extends TableTestBase {
table = FullCacheLookupTable.create(context,
ThreadLocalRandom.current().nextInt(2) * 10);
table.open();
+ // test re-open
+ table.close();
+ table.open();
+
List<Pair<byte[], byte[]>> records = new ArrayList<>();
Random rnd = new Random();
Map<Integer, Set<Integer>> secKeyToPk = new HashMap<>();
@@ -367,6 +391,10 @@ public class LookupTableTest extends TableTestBase {
table = FullCacheLookupTable.create(context,
ThreadLocalRandom.current().nextInt(2) * 10);
table.open();
+ // test re-open
+ table.close();
+ table.open();
+
table.refresh(singletonList(row(1, 11, 111)).iterator());
List<InternalRow> result = table.get(row(11));
assertThat(result).hasSize(1);
@@ -407,6 +435,10 @@ public class LookupTableTest extends TableTestBase {
table = FullCacheLookupTable.create(context,
ThreadLocalRandom.current().nextInt(2) * 10);
table.open();
+ // test re-open
+ table.close();
+ table.open();
+
// test bulk load 100_000 records
List<Pair<byte[], byte[]>> records = new ArrayList<>();
Random rnd = new Random();
@@ -450,6 +482,10 @@ public class LookupTableTest extends TableTestBase {
table = FullCacheLookupTable.create(context,
ThreadLocalRandom.current().nextInt(2) * 10);
table.open();
+ // test re-open
+ table.close();
+ table.open();
+
table.refresh(singletonList(row(1, 11, 333)).iterator());
List<InternalRow> result = table.get(row(11));
assertThat(result).hasSize(0);
@@ -508,6 +544,11 @@ public class LookupTableTest extends TableTestBase {
tempDir.toFile(),
ImmutableList.of("pk1", "pk2"));
table.open();
+
+ // test re-open
+ table.close();
+ table.open();
+
List<InternalRow> result = table.get(row(1, -1));
assertThat(result).hasSize(0);
@@ -535,6 +576,10 @@ public class LookupTableTest extends TableTestBase {
ImmutableList.of("pk2", "pk1"));
table.open();
+ // test re-open
+ table.close();
+ table.open();
+
List<InternalRow> result = table.get(row(-1, 1));
assertThat(result).hasSize(0);