This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c3053ebc2 [flink] FileStoreLookupFunction should not throw 
OutOfRangeException (#1636)
c3053ebc2 is described below

commit c3053ebc27503324056c3423e294f49371b8275d
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jul 24 18:12:06 2023 +0800

    [flink] FileStoreLookupFunction should not throw OutOfRangeException (#1636)
---
 .../flink/lookup/FileStoreLookupFunction.java      | 16 +++++
 .../flink/lookup/FileStoreLookupFunctionTest.java  | 77 ++++++++++++----------
 2 files changed, 60 insertions(+), 33 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index d6ef13ca8..3381cfa7a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -28,6 +28,7 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateFilter;
 import org.apache.paimon.reader.RecordReaderIterator;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.OutOfRangeException;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileIOUtils;
 import org.apache.paimon.utils.TypeUtils;
@@ -117,7 +118,10 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
     // we tag this method friendly for testing
     void open(String tmpDirectory) throws Exception {
         this.path = new File(tmpDirectory, "lookup-" + UUID.randomUUID());
+        open();
+    }
 
+    private void open() throws Exception {
         Options options = Options.fromMap(table.options());
         this.refreshInterval = 
options.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL);
         this.stateFactory = new RocksDBStateFactory(path.toString(), options);
@@ -167,6 +171,18 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
                 rows.add(new FlinkRowData(matchedRow));
             }
             return rows;
+        } catch (OutOfRangeException e) {
+            reopen();
+            return lookup(keyRow);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void reopen() {
+        try {
+            close();
+            open();
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
index 259a8da95..57ff417ad 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
@@ -42,6 +42,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.nio.file.Path;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -60,6 +61,40 @@ public class FileStoreLookupFunctionTest {
     private FileStoreTable fileStoreTable;
     @TempDir private Path tempDir;
 
+    @BeforeEach
+    public void before() throws Exception {
+        org.apache.paimon.fs.Path path = new 
org.apache.paimon.fs.Path(tempDir.toString());
+        SchemaManager schemaManager = new SchemaManager(fileIO, path);
+        Options conf = new Options();
+        conf.set(CoreOptions.BUCKET, 2);
+        conf.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 * 3));
+        conf.set(CoreOptions.PAGE_SIZE, new MemorySize(4096));
+        conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 3);
+        conf.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 2);
+        conf.set(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL, Duration.ZERO);
+
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.INT(), 
DataTypes.BIGINT()},
+                        new String[] {"pt", "k", "v"});
+
+        Schema schema =
+                new Schema(
+                        rowType.getFields(),
+                        Collections.singletonList("pt"),
+                        Arrays.asList("pt", "k"),
+                        conf.toMap(),
+                        "");
+        TableSchema tableSchema = schemaManager.createTable(schema);
+        fileStoreTable =
+                FileStoreTableFactory.create(
+                        fileIO, new 
org.apache.paimon.fs.Path(tempDir.toString()), tableSchema);
+
+        fileStoreLookupFunction =
+                new FileStoreLookupFunction(fileStoreTable, new int[] {0, 1}, 
new int[] {1}, null);
+        fileStoreLookupFunction.open(tempDir.toString());
+    }
+
     @Test
     public void testLookupScanLeak() throws Exception {
         commit(writeCommit(1));
@@ -77,31 +112,16 @@ public class FileStoreLookupFunctionTest {
                 0);
     }
 
-    @BeforeEach
-    public void before() throws Exception {
-        createFileStoreTable();
-        fileStoreLookupFunction =
-                new FileStoreLookupFunction(fileStoreTable, new int[] {0, 1}, 
new int[] {1}, null);
-        fileStoreLookupFunction.open(tempDir.toString());
-    }
-
-    private static final RowType ROW_TYPE =
-            RowType.of(
-                    new DataType[] {DataTypes.INT(), DataTypes.INT(), 
DataTypes.BIGINT()},
-                    new String[] {"pt", "k", "v"});
-
-    private static Schema schema() {
-        Options conf = new Options();
-        conf.set(CoreOptions.BUCKET, 2);
-        conf.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 * 3));
-        conf.set(CoreOptions.PAGE_SIZE, new MemorySize(4096));
+    @Test
+    public void testLookupExpiredSnapshot() throws Exception {
+        commit(writeCommit(1));
+        fileStoreLookupFunction.lookup(new FlinkRowData(GenericRow.of(1, 1, 
10L)));
 
-        return new Schema(
-                ROW_TYPE.getFields(),
-                Collections.singletonList("pt"),
-                Arrays.asList("pt", "k"),
-                conf.toMap(),
-                "");
+        commit(writeCommit(2));
+        commit(writeCommit(3));
+        commit(writeCommit(4));
+        commit(writeCommit(5));
+        fileStoreLookupFunction.lookup(new FlinkRowData(GenericRow.of(1, 1, 
10L)));
     }
 
     private void commit(List<CommitMessage> messages) {
@@ -121,13 +141,4 @@ public class FileStoreLookupFunctionTest {
     private InternalRow randomRow() {
         return GenericRow.of(RANDOM.nextInt(100), RANDOM.nextInt(100), 
RANDOM.nextLong());
     }
-
-    public void createFileStoreTable() throws Exception {
-        org.apache.paimon.fs.Path path = new 
org.apache.paimon.fs.Path(tempDir.toString());
-        SchemaManager schemaManager = new SchemaManager(fileIO, path);
-        TableSchema tableSchema = schemaManager.createTable(schema());
-        fileStoreTable =
-                FileStoreTableFactory.create(
-                        fileIO, new 
org.apache.paimon.fs.Path(tempDir.toString()), tableSchema);
-    }
 }

Reply via email to