This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d9b38e  [FLINK-27139] FileStoreCommitImpl#filterCommitted should exit 
when faced with expired files
9d9b38e is described below

commit 9d9b38ef5fa0a09f3dd396fb6a718b467900ce22
Author: tsreaper <[email protected]>
AuthorDate: Mon Apr 11 10:44:10 2022 +0800

    [FLINK-27139] FileStoreCommitImpl#filterCommitted should exit when faced 
with expired files
    
    This closes #84
---
 .../table/store/file/operation/FileStoreCommitImpl.java | 14 ++++++++++++++
 .../table/store/file/operation/FileStoreCommitTest.java | 17 +++++++++++++++++
 2 files changed, 31 insertions(+)

diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index 75042a7..27ab159 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -117,6 +118,11 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
 
     @Override
     public List<ManifestCommittable> filterCommitted(List<ManifestCommittable> 
committableList) {
+        // nothing to filter, fast exit
+        if (committableList.isEmpty()) {
+            return committableList;
+        }
+
         // if there is no previous snapshots then nothing should be filtered
         Long latestSnapshotId = pathFactory.latestSnapshotId();
         if (latestSnapshotId == null) {
@@ -131,6 +137,14 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
 
         for (long id = latestSnapshotId; id >= Snapshot.FIRST_SNAPSHOT_ID; 
id--) {
             Path snapshotPath = pathFactory.toSnapshotPath(id);
+            try {
+                if (!snapshotPath.getFileSystem().exists(snapshotPath)) {
+                    // snapshots before this are expired
+                    break;
+                }
+            } catch (IOException e) {
+                throw new RuntimeException("Cannot determine if snapshot #" + 
id + " exists.", e);
+            }
             Snapshot snapshot = Snapshot.fromPath(snapshotPath);
             if (commitUser.equals(snapshot.commitUser())) {
                 if (identifiers.containsKey(snapshot.commitIdentifier())) {
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
index ce1cb45..6f90e52 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
@@ -25,8 +25,11 @@ import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.TestFileStore;
 import org.apache.flink.table.store.file.TestKeyValueGenerator;
 import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
 import 
org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
 import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.FileUtils;
 import org.apache.flink.table.store.file.utils.SnapshotFinder;
 import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
 
@@ -41,6 +44,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -95,6 +99,19 @@ public class FileStoreCommitTest {
         assertThat(SnapshotFinder.findLatest(snapshotDir)).isEqualTo(latestId);
     }
 
+    @Test
+    public void testFilterCommittedAfterExpire() throws Exception {
+        testRandomConcurrentNoConflict(1, false);
+        // remove first snapshot to mimic expiration
+        TestFileStore store = createStore(false);
+        FileStorePathFactory pathFactory = store.pathFactory();
+        Path firstSnapshotPath = 
pathFactory.toSnapshotPath(Snapshot.FIRST_SNAPSHOT_ID);
+        FileUtils.deleteOrWarn(firstSnapshotPath);
+        // this test succeeds if this call does not fail
+        store.newCommit()
+                .filterCommitted(Collections.singletonList(new 
ManifestCommittable("dummy")));
+    }
+
     protected void testRandomConcurrentNoConflict(int numThreads, boolean 
failing)
             throws Exception {
         // prepare test data

Reply via email to