This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new e76dd9e  [FLINK-26778] Introduce snapshot latest and earliest hint 
files
e76dd9e is described below

commit e76dd9e7a21ff505f13ed77c9f4623c900b9f2a2
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Mar 25 16:43:09 2022 +0800

    [FLINK-26778] Introduce snapshot latest and earliest hint files
    
    This closes #56
---
 .../store/connector/sink/StoreGlobalCommitter.java |   1 -
 .../table/store/connector/sink/StoreSink.java      |  20 ++--
 .../table/store/connector/sink/TestFileStore.java  |  12 +-
 .../flink/table/store/file/FileStoreOptions.java   |   2 +-
 .../store/file/operation/FileStoreCommitImpl.java  |  17 ++-
 .../store/file/operation/FileStoreExpire.java      |   3 +
 .../store/file/operation/FileStoreExpireImpl.java  |  73 ++++++++++--
 .../store/file/utils/FileStorePathFactory.java     |  44 ++-----
 .../table/store/file/utils/SnapshotFinder.java     | 129 +++++++++++++++++++++
 .../flink/table/store/file/TestFileStore.java      |  37 ++++--
 .../store/file/operation/FileStoreCommitTest.java  |  17 +++
 .../store/file/operation/FileStoreExpireTest.java  |  17 ++-
 12 files changed, 301 insertions(+), 71 deletions(-)

diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java
index bf9fec2..20e405e 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java
@@ -103,7 +103,6 @@ public class StoreGlobalCommitter implements 
GlobalCommitter<Committable, Manife
             }
         }
 
-        // TODO introduce check interval
         fileStoreExpire.expire();
     }
 }
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
index 88cf8b5..1fc91db 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
@@ -31,7 +31,6 @@ import 
org.apache.flink.table.store.connector.sink.global.GlobalCommittingSink;
 import org.apache.flink.table.store.file.FileStore;
 import org.apache.flink.table.store.file.manifest.ManifestCommittable;
 import 
org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
-import org.apache.flink.table.store.file.operation.FileStoreCommit;
 import org.apache.flink.table.store.file.operation.Lock;
 import org.apache.flink.table.store.log.LogInitContext;
 import org.apache.flink.table.store.log.LogSinkProvider;
@@ -172,25 +171,30 @@ public class StoreSink<WriterStateT, LogCommT>
 
     @Override
     public StoreGlobalCommitter createGlobalCommitter() {
-        FileStoreCommit commit = fileStore.newCommit();
-        CatalogLock lock;
+        CatalogLock catalogLock;
+        Lock lock;
         if (lockFactory == null) {
+            catalogLock = null;
             lock = null;
         } else {
-            lock = lockFactory.create();
-            commit.withLock(
+            catalogLock = lockFactory.create();
+            lock =
                     new Lock() {
                         @Override
                         public <T> T runWithLock(Callable<T> callable) throws 
Exception {
-                            return lock.runWithLock(
+                            return catalogLock.runWithLock(
                                     tableIdentifier.getDatabaseName(),
                                     tableIdentifier.getObjectName(),
                                     callable);
                         }
-                    });
+                    };
         }
 
-        return new StoreGlobalCommitter(commit, fileStore.newExpire(), lock, 
overwritePartition);
+        return new StoreGlobalCommitter(
+                fileStore.newCommit().withLock(lock),
+                fileStore.newExpire().withLock(lock),
+                catalogLock,
+                overwritePartition);
     }
 
     @SuppressWarnings("unchecked")
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
index 6b38312..bd4557f 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
@@ -103,7 +103,17 @@ public class TestFileStore implements FileStore {
 
     @Override
     public FileStoreExpire newExpire() {
-        return () -> expired = true;
+        return new FileStoreExpire() {
+            @Override
+            public FileStoreExpire withLock(Lock lock) {
+                return this;
+            }
+
+            @Override
+            public void expire() {
+                expired = true;
+            }
+        };
     }
 
     @Override
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
index c4b9c38..7f7bf8f 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
@@ -96,7 +96,7 @@ public class FileStoreOptions implements Serializable {
     public static final ConfigOption<Duration> SNAPSHOT_TIME_RETAINED =
             ConfigOptions.key("snapshot.time-retained")
                     .durationType()
-                    .defaultValue(Duration.ofDays(1))
+                    .defaultValue(Duration.ofHours(1))
                     .withDescription("The maximum time of completed snapshots 
to retain.");
 
     public static final ConfigOption<Duration> CONTINUOUS_DISCOVERY_INTERVAL =
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index 4d8249f..75042a7 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -35,6 +35,7 @@ import 
org.apache.flink.table.store.file.predicate.PredicateConverter;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.FileUtils;
 import org.apache.flink.table.store.file.utils.RowDataToObjectArrayConverter;
+import org.apache.flink.table.store.file.utils.SnapshotFinder;
 import org.apache.flink.table.types.logical.RowType;
 
 import org.slf4j.Logger;
@@ -48,6 +49,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.stream.Collectors;
 
 /**
@@ -383,6 +385,16 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         try {
             FileSystem fs = tmpSnapshotPath.getFileSystem();
             // atomic rename
+            // TODO rename is not work for object store, use recoverable writer
+            Callable<Boolean> callable =
+                    () -> {
+                        boolean committed = fs.rename(tmpSnapshotPath, 
newSnapshotPath);
+                        if (committed) {
+                            SnapshotFinder.commitLatestHint(
+                                    pathFactory.snapshotDirectory(), 
newSnapshotId);
+                        }
+                        return committed;
+                    };
             if (lock != null) {
                 success =
                         lock.runWithLock(
@@ -392,10 +404,9 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                                         // as we're relying on external 
locking, we can first
                                         // check if file exist then rename to 
work around this
                                         // case
-                                        !fs.exists(newSnapshotPath)
-                                                && fs.rename(tmpSnapshotPath, 
newSnapshotPath));
+                                        !fs.exists(newSnapshotPath) && 
callable.call());
             } else {
-                success = fs.rename(tmpSnapshotPath, newSnapshotPath);
+                success = callable.call();
             }
         } catch (Throwable e) {
             // exception when performing the atomic rename,
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpire.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpire.java
index feefebd..a1dad3b 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpire.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpire.java
@@ -21,6 +21,9 @@ package org.apache.flink.table.store.file.operation;
 /** Expire operation which provides snapshots expire. */
 public interface FileStoreExpire {
 
+    /** With global lock. */
+    FileStoreExpire withLock(Lock lock);
+
     /** Expire snapshots. */
     void expire();
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
index f4602c7..7d7d94c 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
@@ -27,14 +27,17 @@ import 
org.apache.flink.table.store.file.manifest.ManifestList;
 import org.apache.flink.table.store.file.mergetree.sst.SstPathFactory;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.table.store.file.utils.SnapshotFinder;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.Callable;
 
 /**
  * Default implementation of {@link FileStoreExpire}. It retains a certain 
number or period of
@@ -42,6 +45,8 @@ import java.util.Set;
  *
  * <p>NOTE: This implementation will keep at least one snapshot so that users 
will not accidentally
  * clear all snapshots.
+ *
+ * <p>TODO: add concurrent tests.
  */
 public class FileStoreExpireImpl implements FileStoreExpire {
 
@@ -55,6 +60,8 @@ public class FileStoreExpireImpl implements FileStoreExpire {
     private final ManifestFile manifestFile;
     private final ManifestList manifestList;
 
+    private Lock lock;
+
     public FileStoreExpireImpl(
             int numRetained,
             long millisRetained,
@@ -69,6 +76,12 @@ public class FileStoreExpireImpl implements FileStoreExpire {
     }
 
     @Override
+    public FileStoreExpire withLock(Lock lock) {
+        this.lock = lock;
+        return this;
+    }
+
+    @Override
     public void expire() {
         Long latestSnapshotId = pathFactory.latestSnapshotId();
         if (latestSnapshotId == null) {
@@ -78,10 +91,18 @@ public class FileStoreExpireImpl implements FileStoreExpire 
{
 
         long currentMillis = System.currentTimeMillis();
 
+        Long earliest;
+        try {
+            earliest = 
SnapshotFinder.findEarliest(pathFactory.snapshotDirectory());
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to find earliest snapshot id", 
e);
+        }
+        if (earliest == null) {
+            return;
+        }
+
         // find the earliest snapshot to retain
-        // TODO Here id will start from 1, we need to optimize the method of 
finding the minimum
-        // snapshot
-        for (long id = Math.max(latestSnapshotId - numRetained + 1, 
Snapshot.FIRST_SNAPSHOT_ID);
+        for (long id = Math.max(latestSnapshotId - numRetained + 1, earliest);
                 id <= latestSnapshotId;
                 id++) {
             Path snapshotPath = pathFactory.toSnapshotPath(id);
@@ -91,7 +112,7 @@ public class FileStoreExpireImpl implements FileStoreExpire {
                                 <= millisRetained) {
                     // within time threshold, can assume that all snapshots 
after it are also within
                     // the threshold
-                    expireUntil(id);
+                    expireUntil(earliest, id);
                     return;
                 }
             } catch (IOException e) {
@@ -101,18 +122,30 @@ public class FileStoreExpireImpl implements 
FileStoreExpire {
         }
 
         // no snapshot can be retained, expire all but last one
-        expireUntil(latestSnapshotId);
+        expireUntil(earliest, latestSnapshotId);
     }
 
-    private void expireUntil(long endExclusiveId) {
-        if (endExclusiveId <= Snapshot.FIRST_SNAPSHOT_ID) {
+    private void expireUntil(long earliestId, long endExclusiveId) {
+        if (endExclusiveId <= earliestId) {
+            // No expire happens:
+            // write the hint file in order to see the earliest snapshot 
directly next time
+            // should avoid duplicate writes when the file exists
+            Path hint = new Path(pathFactory.snapshotDirectory(), 
SnapshotFinder.EARLIEST);
+            try {
+                if (!hint.getFileSystem().exists(hint)) {
+                    writeEarliestHint(endExclusiveId);
+                }
+            } catch (IOException e) {
+                throw new UncheckedIOException(e);
+            }
+
             // fast exit
             return;
         }
 
         // find first snapshot to expire
-        long beginInclusiveId = Snapshot.FIRST_SNAPSHOT_ID;
-        for (long id = endExclusiveId - 1; id >= Snapshot.FIRST_SNAPSHOT_ID; 
id--) {
+        long beginInclusiveId = earliestId;
+        for (long id = endExclusiveId - 1; id >= earliestId; id--) {
             Path snapshotPath = pathFactory.toSnapshotPath(id);
             try {
                 if (!snapshotPath.getFileSystem().exists(snapshotPath)) {
@@ -198,5 +231,27 @@ public class FileStoreExpireImpl implements 
FileStoreExpire {
             // delete snapshot
             FileUtils.deleteOrWarn(pathFactory.toSnapshotPath(id));
         }
+
+        writeEarliestHint(endExclusiveId);
+    }
+
+    private void writeEarliestHint(long earliest) {
+        // update earliest hint file
+
+        Callable<Void> callable =
+                () -> {
+                    
SnapshotFinder.commitEarliestHint(pathFactory.snapshotDirectory(), earliest);
+                    return null;
+                };
+
+        try {
+            if (lock != null) {
+                lock.runWithLock(callable);
+            } else {
+                callable.call();
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
     }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
index be17080..ec8c35a 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
@@ -21,11 +21,8 @@ package org.apache.flink.table.store.file.utils;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.connector.file.table.FileSystemConnectorOptions;
 import org.apache.flink.connector.file.table.RowDataPartitionComputer;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.mergetree.sst.SstPathFactory;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
@@ -33,9 +30,6 @@ import 
org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter;
 import org.apache.flink.table.utils.PartitionPathUtils;
 import org.apache.flink.util.Preconditions;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 
@@ -45,13 +39,12 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static 
org.apache.flink.table.store.file.utils.SnapshotFinder.SNAPSHOT_PREFIX;
+
 /** Factory which produces {@link Path}s for each type of files. */
 @ThreadSafe
 public class FileStorePathFactory {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(FileStorePathFactory.class);
-    private static final String SNAPSHOT_PREFIX = "snapshot-";
-
     private final Path root;
     private final String uuid;
     private final RowDataPartitionComputer partitionComputer;
@@ -124,37 +117,14 @@ public class FileStorePathFactory {
                                 partition, "Partition row data is null. This 
is unexpected.")));
     }
 
+    public Path snapshotDirectory() {
+        return new Path(root + "/snapshot");
+    }
+
     @Nullable
     public Long latestSnapshotId() {
-        // TODO add a `bestEffort` argument and read from a best-effort 
CURRENT file if true
         try {
-            Path snapshotDir = new Path(root + "/snapshot");
-            FileSystem fs = snapshotDir.getFileSystem();
-
-            if (!fs.exists(snapshotDir)) {
-                LOG.debug("The snapshot director '{}' is not exist.", 
snapshotDir);
-                return null;
-            }
-
-            FileStatus[] statuses = fs.listStatus(snapshotDir);
-            if (statuses == null) {
-                throw new RuntimeException(
-                        "The return value is null of the listStatus for the 
snapshot directory.");
-            }
-
-            long latestId = Snapshot.FIRST_SNAPSHOT_ID - 1;
-            for (FileStatus status : statuses) {
-                String fileName = status.getPath().getName();
-                if (fileName.startsWith(SNAPSHOT_PREFIX)) {
-                    try {
-                        long id = 
Long.parseLong(fileName.substring(SNAPSHOT_PREFIX.length()));
-                        latestId = Math.max(latestId, id);
-                    } catch (NumberFormatException e) {
-                        LOG.warn("Invalid snapshot file name found " + 
fileName, e);
-                    }
-                }
-            }
-            return latestId < Snapshot.FIRST_SNAPSHOT_ID ? null : latestId;
+            return SnapshotFinder.findLatest(snapshotDirectory());
         } catch (IOException e) {
             throw new RuntimeException("Failed to find latest snapshot id", e);
         }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotFinder.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotFinder.java
new file mode 100644
index 0000000..ee4752c
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotFinder.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.UUID;
+import java.util.function.BinaryOperator;
+
+/** Find latest and earliest snapshot. */
+public class SnapshotFinder {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SnapshotFinder.class);
+
+    public static final String SNAPSHOT_PREFIX = "snapshot-";
+
+    public static final String EARLIEST = "EARLIEST";
+
+    public static final String LATEST = "LATEST";
+
+    public static Long findLatest(Path snapshotDir) throws IOException {
+        FileSystem fs = snapshotDir.getFileSystem();
+        if (!fs.exists(snapshotDir)) {
+            return null;
+        }
+
+        Long snapshotId = readHint(snapshotDir, LATEST);
+        if (snapshotId != null) {
+            long nextSnapshot = snapshotId + 1;
+            // it is the latest only there is no next one
+            if (!fs.exists(new Path(snapshotDir, SNAPSHOT_PREFIX + 
nextSnapshot))) {
+                return snapshotId;
+            }
+        }
+
+        return findByListFiles(snapshotDir, Math::max);
+    }
+
+    public static Long findEarliest(Path snapshotDir) throws IOException {
+        FileSystem fs = snapshotDir.getFileSystem();
+        if (!fs.exists(snapshotDir)) {
+            return null;
+        }
+
+        Long snapshotId = readHint(snapshotDir, EARLIEST);
+        // null and it is the earliest only it exists
+        if (snapshotId != null && fs.exists(new Path(snapshotDir, 
SNAPSHOT_PREFIX + snapshotId))) {
+            return snapshotId;
+        }
+
+        return findByListFiles(snapshotDir, Math::min);
+    }
+
+    @VisibleForTesting
+    public static Long readHint(Path snapshotDir, String fileName) throws 
IOException {
+        Path path = new Path(snapshotDir, fileName);
+        if (path.getFileSystem().exists(path)) {
+            return Long.parseLong(FileUtils.readFileUtf8(path));
+        }
+        return null;
+    }
+
+    private static Long findByListFiles(Path snapshotDir, BinaryOperator<Long> 
reducer)
+            throws IOException {
+        FileStatus[] statuses = 
snapshotDir.getFileSystem().listStatus(snapshotDir);
+        if (statuses == null) {
+            throw new RuntimeException(
+                    "The return value is null of the listStatus for the 
snapshot directory.");
+        }
+
+        Long result = null;
+        for (FileStatus status : statuses) {
+            String fileName = status.getPath().getName();
+            if (fileName.startsWith(SNAPSHOT_PREFIX)) {
+                try {
+                    long id = 
Long.parseLong(fileName.substring(SNAPSHOT_PREFIX.length()));
+                    result = result == null ? id : reducer.apply(result, id);
+                } catch (NumberFormatException e) {
+                    throw new RuntimeException("Invalid snapshot file name 
found " + fileName, e);
+                }
+            }
+        }
+        return result;
+    }
+
+    public static void commitLatestHint(Path snapshotDir, long snapshotId) 
throws IOException {
+        commitHint(snapshotDir, snapshotId, LATEST);
+    }
+
+    public static void commitEarliestHint(Path snapshotDir, long snapshotId) 
throws IOException {
+        commitHint(snapshotDir, snapshotId, EARLIEST);
+    }
+
+    private static void commitHint(Path snapshotDir, long snapshotId, String 
fileName)
+            throws IOException {
+        FileSystem fs = snapshotDir.getFileSystem();
+        Path hintFile = new Path(snapshotDir, fileName);
+        Path tempFile = new Path(snapshotDir, UUID.randomUUID() + "-" + 
fileName + ".temp");
+        FileUtils.writeFileUtf8(tempFile, String.valueOf(snapshotId));
+        fs.delete(hintFile, false);
+        boolean success = fs.rename(tempFile, hintFile);
+        if (!success) {
+            fs.delete(tempFile, false);
+        }
+    }
+}
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index 319970d..15a85d0 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -40,6 +40,7 @@ import 
org.apache.flink.table.store.file.operation.FileStoreWrite;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
 import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.store.file.utils.SnapshotFinder;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.function.QuadFunction;
 
@@ -302,18 +303,34 @@ public class TestFileStore extends FileStoreImpl {
         return result;
     }
 
-    public void assertCleaned() {
+    public void assertCleaned() throws IOException {
         Set<Path> filesInUse = getFilesInUse();
-        Set<Path> actualFiles;
-        try {
-            actualFiles =
-                    Files.walk(Paths.get(root))
-                            .filter(p -> Files.isRegularFile(p))
-                            .map(p -> new Path(p.toString()))
-                            .collect(Collectors.toSet());
-        } catch (IOException e) {
-            throw new RuntimeException(e);
+        Set<Path> actualFiles =
+                Files.walk(Paths.get(root))
+                        .filter(Files::isRegularFile)
+                        .map(p -> new Path(p.toString()))
+                        .collect(Collectors.toSet());
+
+        // remove best effort latest and earliest hint files
+        // Consider concurrency test, it will not be possible to check here 
because the hint_file is
+        // possibly not the most accurate, so this check is only.
+        // - latest should < true_latest
+        // - earliest should < true_earliest
+        Path snapshotDir = pathFactory().snapshotDirectory();
+        Path earliest = new Path(snapshotDir, SnapshotFinder.EARLIEST);
+        Path latest = new Path(snapshotDir, SnapshotFinder.LATEST);
+        if (actualFiles.remove(earliest)) {
+            long earliestId = SnapshotFinder.readHint(snapshotDir, 
SnapshotFinder.EARLIEST);
+            earliest.getFileSystem().delete(earliest, false);
+            assertThat(earliestId <= 
SnapshotFinder.findEarliest(snapshotDir)).isTrue();
         }
+        if (actualFiles.remove(latest)) {
+            long latestId = SnapshotFinder.readHint(snapshotDir, 
SnapshotFinder.LATEST);
+            latest.getFileSystem().delete(latest, false);
+            assertThat(latestId <= 
SnapshotFinder.findLatest(snapshotDir)).isTrue();
+        }
+        actualFiles.remove(latest);
+
         assertThat(actualFiles).isEqualTo(filesInUse);
     }
 
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
index f3ca1b0..ce1cb45 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.table.store.file.TestKeyValueGenerator;
 import org.apache.flink.table.store.file.ValueKind;
 import 
org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
 import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
+import org.apache.flink.table.store.file.utils.SnapshotFinder;
 import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
 
 import org.junit.jupiter.api.BeforeEach;
@@ -78,6 +79,22 @@ public class FileStoreCommitTest {
         testRandomConcurrentNoConflict(ThreadLocalRandom.current().nextInt(3) 
+ 2, failing);
     }
 
+    @Test
+    public void testLatestHint() throws Exception {
+        testRandomConcurrentNoConflict(1, false);
+        Path snapshotDir = createStore(false, 
1).pathFactory().snapshotDirectory();
+        Path latest = new Path(snapshotDir, SnapshotFinder.LATEST);
+
+        assertThat(latest.getFileSystem().exists(latest)).isTrue();
+
+        Long latestId = SnapshotFinder.findLatest(snapshotDir);
+
+        // remove latest hint file
+        latest.getFileSystem().delete(latest, false);
+
+        assertThat(SnapshotFinder.findLatest(snapshotDir)).isEqualTo(latestId);
+    }
+
     protected void testRandomConcurrentNoConflict(int numThreads, boolean 
failing)
             throws Exception {
         // prepare test data
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
index 3761b4b..2884a85 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.store.file.TestFileStore;
 import org.apache.flink.table.store.file.TestKeyValueGenerator;
 import 
org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.SnapshotFinder;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -67,7 +68,7 @@ public class FileStoreExpireTest {
     }
 
     @AfterEach
-    public void afterEach() {
+    public void afterEach() throws IOException {
         store.assertCleaned();
     }
 
@@ -150,6 +151,20 @@ public class FileStoreExpireTest {
                 }
             }
         }
+
+        // validate earliest hint file
+
+        Path snapshotDir = pathFactory.snapshotDirectory();
+        Path earliest = new Path(snapshotDir, SnapshotFinder.EARLIEST);
+
+        assertThat(earliest.getFileSystem().exists(earliest)).isTrue();
+
+        Long earliestId = SnapshotFinder.findEarliest(snapshotDir);
+
+        // remove earliest hint file
+        earliest.getFileSystem().delete(earliest, false);
+
+        
assertThat(SnapshotFinder.findEarliest(snapshotDir)).isEqualTo(earliestId);
     }
 
     @Test

Reply via email to