This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 9d9b38e [FLINK-27139] FileStoreCommitImpl#filterCommitted should exit
when faced with expired files
9d9b38e is described below
commit 9d9b38ef5fa0a09f3dd396fb6a718b467900ce22
Author: tsreaper <[email protected]>
AuthorDate: Mon Apr 11 10:44:10 2022 +0800
[FLINK-27139] FileStoreCommitImpl#filterCommitted should exit when faced
with expired files
This closes #84
---
.../table/store/file/operation/FileStoreCommitImpl.java | 14 ++++++++++++++
.../table/store/file/operation/FileStoreCommitTest.java | 17 +++++++++++++++++
2 files changed, 31 insertions(+)
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index 75042a7..27ab159 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashMap;
@@ -117,6 +118,11 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
@Override
public List<ManifestCommittable> filterCommitted(List<ManifestCommittable>
committableList) {
+ // nothing to filter, fast exit
+ if (committableList.isEmpty()) {
+ return committableList;
+ }
+
// if there is no previous snapshots then nothing should be filtered
Long latestSnapshotId = pathFactory.latestSnapshotId();
if (latestSnapshotId == null) {
@@ -131,6 +137,14 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
for (long id = latestSnapshotId; id >= Snapshot.FIRST_SNAPSHOT_ID;
id--) {
Path snapshotPath = pathFactory.toSnapshotPath(id);
+ try {
+ if (!snapshotPath.getFileSystem().exists(snapshotPath)) {
+ // snapshots before this are expired
+ break;
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot determine if snapshot #" +
id + " exists.", e);
+ }
Snapshot snapshot = Snapshot.fromPath(snapshotPath);
if (commitUser.equals(snapshot.commitUser())) {
if (identifiers.containsKey(snapshot.commitIdentifier())) {
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
index ce1cb45..6f90e52 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
@@ -25,8 +25,11 @@ import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.TestFileStore;
import org.apache.flink.table.store.file.TestKeyValueGenerator;
import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
import
org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.FileUtils;
import org.apache.flink.table.store.file.utils.SnapshotFinder;
import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
@@ -41,6 +44,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -95,6 +99,19 @@ public class FileStoreCommitTest {
assertThat(SnapshotFinder.findLatest(snapshotDir)).isEqualTo(latestId);
}
+ @Test
+ public void testFilterCommittedAfterExpire() throws Exception {
+ testRandomConcurrentNoConflict(1, false);
+ // remove first snapshot to mimic expiration
+ TestFileStore store = createStore(false);
+ FileStorePathFactory pathFactory = store.pathFactory();
+ Path firstSnapshotPath =
pathFactory.toSnapshotPath(Snapshot.FIRST_SNAPSHOT_ID);
+ FileUtils.deleteOrWarn(firstSnapshotPath);
+ // this test succeeds if this call does not fail
+ store.newCommit()
+ .filterCommitted(Collections.singletonList(new
ManifestCommittable("dummy")));
+ }
+
protected void testRandomConcurrentNoConflict(int numThreads, boolean
failing)
throws Exception {
// prepare test data