This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c3053ebc2 [flink] FileStoreLookupFunction should not throw
OutOfRangeException (#1636)
c3053ebc2 is described below
commit c3053ebc27503324056c3423e294f49371b8275d
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jul 24 18:12:06 2023 +0800
[flink] FileStoreLookupFunction should not throw OutOfRangeException (#1636)
---
.../flink/lookup/FileStoreLookupFunction.java | 16 +++++
.../flink/lookup/FileStoreLookupFunctionTest.java | 77 ++++++++++++----------
2 files changed, 60 insertions(+), 33 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index d6ef13ca8..3381cfa7a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -28,6 +28,7 @@ import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateFilter;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.OutOfRangeException;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileIOUtils;
import org.apache.paimon.utils.TypeUtils;
@@ -117,7 +118,10 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
// we tag this method friendly for testing
void open(String tmpDirectory) throws Exception {
this.path = new File(tmpDirectory, "lookup-" + UUID.randomUUID());
+ open();
+ }
+ private void open() throws Exception {
Options options = Options.fromMap(table.options());
this.refreshInterval =
options.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL);
this.stateFactory = new RocksDBStateFactory(path.toString(), options);
@@ -167,6 +171,18 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
rows.add(new FlinkRowData(matchedRow));
}
return rows;
+ } catch (OutOfRangeException e) {
+ reopen();
+ return lookup(keyRow);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void reopen() {
+ try {
+ close();
+ open();
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
index 259a8da95..57ff417ad 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
@@ -42,6 +42,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.nio.file.Path;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -60,6 +61,40 @@ public class FileStoreLookupFunctionTest {
private FileStoreTable fileStoreTable;
@TempDir private Path tempDir;
+ @BeforeEach
+ public void before() throws Exception {
+ org.apache.paimon.fs.Path path = new
org.apache.paimon.fs.Path(tempDir.toString());
+ SchemaManager schemaManager = new SchemaManager(fileIO, path);
+ Options conf = new Options();
+ conf.set(CoreOptions.BUCKET, 2);
+ conf.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 * 3));
+ conf.set(CoreOptions.PAGE_SIZE, new MemorySize(4096));
+ conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 3);
+ conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 2);
+ conf.set(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL, Duration.ZERO);
+
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.INT(),
DataTypes.BIGINT()},
+ new String[] {"pt", "k", "v"});
+
+ Schema schema =
+ new Schema(
+ rowType.getFields(),
+ Collections.singletonList("pt"),
+ Arrays.asList("pt", "k"),
+ conf.toMap(),
+ "");
+ TableSchema tableSchema = schemaManager.createTable(schema);
+ fileStoreTable =
+ FileStoreTableFactory.create(
+ fileIO, new
org.apache.paimon.fs.Path(tempDir.toString()), tableSchema);
+
+ fileStoreLookupFunction =
+ new FileStoreLookupFunction(fileStoreTable, new int[] {0, 1},
new int[] {1}, null);
+ fileStoreLookupFunction.open(tempDir.toString());
+ }
+
@Test
public void testLookupScanLeak() throws Exception {
commit(writeCommit(1));
@@ -77,31 +112,16 @@ public class FileStoreLookupFunctionTest {
0);
}
- @BeforeEach
- public void before() throws Exception {
- createFileStoreTable();
- fileStoreLookupFunction =
- new FileStoreLookupFunction(fileStoreTable, new int[] {0, 1},
new int[] {1}, null);
- fileStoreLookupFunction.open(tempDir.toString());
- }
-
- private static final RowType ROW_TYPE =
- RowType.of(
- new DataType[] {DataTypes.INT(), DataTypes.INT(),
DataTypes.BIGINT()},
- new String[] {"pt", "k", "v"});
-
- private static Schema schema() {
- Options conf = new Options();
- conf.set(CoreOptions.BUCKET, 2);
- conf.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 * 3));
- conf.set(CoreOptions.PAGE_SIZE, new MemorySize(4096));
+ @Test
+ public void testLookupExpiredSnapshot() throws Exception {
+ commit(writeCommit(1));
+ fileStoreLookupFunction.lookup(new FlinkRowData(GenericRow.of(1, 1,
10L)));
- return new Schema(
- ROW_TYPE.getFields(),
- Collections.singletonList("pt"),
- Arrays.asList("pt", "k"),
- conf.toMap(),
- "");
+ commit(writeCommit(2));
+ commit(writeCommit(3));
+ commit(writeCommit(4));
+ commit(writeCommit(5));
+ fileStoreLookupFunction.lookup(new FlinkRowData(GenericRow.of(1, 1,
10L)));
}
private void commit(List<CommitMessage> messages) {
@@ -121,13 +141,4 @@ public class FileStoreLookupFunctionTest {
private InternalRow randomRow() {
return GenericRow.of(RANDOM.nextInt(100), RANDOM.nextInt(100),
RANDOM.nextLong());
}
-
- public void createFileStoreTable() throws Exception {
- org.apache.paimon.fs.Path path = new
org.apache.paimon.fs.Path(tempDir.toString());
- SchemaManager schemaManager = new SchemaManager(fileIO, path);
- TableSchema tableSchema = schemaManager.createTable(schema());
- fileStoreTable =
- FileStoreTableFactory.create(
- fileIO, new
org.apache.paimon.fs.Path(tempDir.toString()), tableSchema);
- }
}