This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new e76dd9e [FLINK-26778] Introduce snapshot latest and earliest hint
files
e76dd9e is described below
commit e76dd9e7a21ff505f13ed77c9f4623c900b9f2a2
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Mar 25 16:43:09 2022 +0800
[FLINK-26778] Introduce snapshot latest and earliest hint files
This closes #56
---
.../store/connector/sink/StoreGlobalCommitter.java | 1 -
.../table/store/connector/sink/StoreSink.java | 20 ++--
.../table/store/connector/sink/TestFileStore.java | 12 +-
.../flink/table/store/file/FileStoreOptions.java | 2 +-
.../store/file/operation/FileStoreCommitImpl.java | 17 ++-
.../store/file/operation/FileStoreExpire.java | 3 +
.../store/file/operation/FileStoreExpireImpl.java | 73 ++++++++++--
.../store/file/utils/FileStorePathFactory.java | 44 ++-----
.../table/store/file/utils/SnapshotFinder.java | 129 +++++++++++++++++++++
.../flink/table/store/file/TestFileStore.java | 37 ++++--
.../store/file/operation/FileStoreCommitTest.java | 17 +++
.../store/file/operation/FileStoreExpireTest.java | 17 ++-
12 files changed, 301 insertions(+), 71 deletions(-)
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java
index bf9fec2..20e405e 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java
@@ -103,7 +103,6 @@ public class StoreGlobalCommitter implements
GlobalCommitter<Committable, Manife
}
}
- // TODO introduce check interval
fileStoreExpire.expire();
}
}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
index 88cf8b5..1fc91db 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
@@ -31,7 +31,6 @@ import
org.apache.flink.table.store.connector.sink.global.GlobalCommittingSink;
import org.apache.flink.table.store.file.FileStore;
import org.apache.flink.table.store.file.manifest.ManifestCommittable;
import
org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
-import org.apache.flink.table.store.file.operation.FileStoreCommit;
import org.apache.flink.table.store.file.operation.Lock;
import org.apache.flink.table.store.log.LogInitContext;
import org.apache.flink.table.store.log.LogSinkProvider;
@@ -172,25 +171,30 @@ public class StoreSink<WriterStateT, LogCommT>
@Override
public StoreGlobalCommitter createGlobalCommitter() {
- FileStoreCommit commit = fileStore.newCommit();
- CatalogLock lock;
+ CatalogLock catalogLock;
+ Lock lock;
if (lockFactory == null) {
+ catalogLock = null;
lock = null;
} else {
- lock = lockFactory.create();
- commit.withLock(
+ catalogLock = lockFactory.create();
+ lock =
new Lock() {
@Override
public <T> T runWithLock(Callable<T> callable) throws
Exception {
- return lock.runWithLock(
+ return catalogLock.runWithLock(
tableIdentifier.getDatabaseName(),
tableIdentifier.getObjectName(),
callable);
}
- });
+ };
}
- return new StoreGlobalCommitter(commit, fileStore.newExpire(), lock,
overwritePartition);
+ return new StoreGlobalCommitter(
+ fileStore.newCommit().withLock(lock),
+ fileStore.newExpire().withLock(lock),
+ catalogLock,
+ overwritePartition);
}
@SuppressWarnings("unchecked")
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
index 6b38312..bd4557f 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
@@ -103,7 +103,17 @@ public class TestFileStore implements FileStore {
@Override
public FileStoreExpire newExpire() {
- return () -> expired = true;
+ return new FileStoreExpire() {
+ @Override
+ public FileStoreExpire withLock(Lock lock) {
+ return this;
+ }
+
+ @Override
+ public void expire() {
+ expired = true;
+ }
+ };
}
@Override
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
index c4b9c38..7f7bf8f 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
@@ -96,7 +96,7 @@ public class FileStoreOptions implements Serializable {
public static final ConfigOption<Duration> SNAPSHOT_TIME_RETAINED =
ConfigOptions.key("snapshot.time-retained")
.durationType()
- .defaultValue(Duration.ofDays(1))
+ .defaultValue(Duration.ofHours(1))
.withDescription("The maximum time of completed snapshots
to retain.");
public static final ConfigOption<Duration> CONTINUOUS_DISCOVERY_INTERVAL =
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index 4d8249f..75042a7 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -35,6 +35,7 @@ import
org.apache.flink.table.store.file.predicate.PredicateConverter;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.FileUtils;
import org.apache.flink.table.store.file.utils.RowDataToObjectArrayConverter;
+import org.apache.flink.table.store.file.utils.SnapshotFinder;
import org.apache.flink.table.types.logical.RowType;
import org.slf4j.Logger;
@@ -48,6 +49,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.stream.Collectors;
/**
@@ -383,6 +385,16 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
try {
FileSystem fs = tmpSnapshotPath.getFileSystem();
// atomic rename
+ // TODO rename is not work for object store, use recoverable writer
+ Callable<Boolean> callable =
+ () -> {
+ boolean committed = fs.rename(tmpSnapshotPath,
newSnapshotPath);
+ if (committed) {
+ SnapshotFinder.commitLatestHint(
+ pathFactory.snapshotDirectory(),
newSnapshotId);
+ }
+ return committed;
+ };
if (lock != null) {
success =
lock.runWithLock(
@@ -392,10 +404,9 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
// as we're relying on external
locking, we can first
// check if file exist then rename to
work around this
// case
- !fs.exists(newSnapshotPath)
- && fs.rename(tmpSnapshotPath,
newSnapshotPath));
+ !fs.exists(newSnapshotPath) &&
callable.call());
} else {
- success = fs.rename(tmpSnapshotPath, newSnapshotPath);
+ success = callable.call();
}
} catch (Throwable e) {
// exception when performing the atomic rename,
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpire.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpire.java
index feefebd..a1dad3b 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpire.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpire.java
@@ -21,6 +21,9 @@ package org.apache.flink.table.store.file.operation;
/** Expire operation which provides snapshots expire. */
public interface FileStoreExpire {
+ /** With global lock. */
+ FileStoreExpire withLock(Lock lock);
+
/** Expire snapshots. */
void expire();
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
index f4602c7..7d7d94c 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
@@ -27,14 +27,17 @@ import
org.apache.flink.table.store.file.manifest.ManifestList;
import org.apache.flink.table.store.file.mergetree.sst.SstPathFactory;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.table.store.file.utils.SnapshotFinder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.Callable;
/**
* Default implementation of {@link FileStoreExpire}. It retains a certain
number or period of
@@ -42,6 +45,8 @@ import java.util.Set;
*
* <p>NOTE: This implementation will keep at least one snapshot so that users
will not accidentally
* clear all snapshots.
+ *
+ * <p>TODO: add concurrent tests.
*/
public class FileStoreExpireImpl implements FileStoreExpire {
@@ -55,6 +60,8 @@ public class FileStoreExpireImpl implements FileStoreExpire {
private final ManifestFile manifestFile;
private final ManifestList manifestList;
+ private Lock lock;
+
public FileStoreExpireImpl(
int numRetained,
long millisRetained,
@@ -69,6 +76,12 @@ public class FileStoreExpireImpl implements FileStoreExpire {
}
@Override
+ public FileStoreExpire withLock(Lock lock) {
+ this.lock = lock;
+ return this;
+ }
+
+ @Override
public void expire() {
Long latestSnapshotId = pathFactory.latestSnapshotId();
if (latestSnapshotId == null) {
@@ -78,10 +91,18 @@ public class FileStoreExpireImpl implements FileStoreExpire
{
long currentMillis = System.currentTimeMillis();
+ Long earliest;
+ try {
+ earliest =
SnapshotFinder.findEarliest(pathFactory.snapshotDirectory());
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to find earliest snapshot id",
e);
+ }
+ if (earliest == null) {
+ return;
+ }
+
// find the earliest snapshot to retain
- // TODO Here id will start from 1, we need to optimize the method of
finding the minimum
- // snapshot
- for (long id = Math.max(latestSnapshotId - numRetained + 1,
Snapshot.FIRST_SNAPSHOT_ID);
+ for (long id = Math.max(latestSnapshotId - numRetained + 1, earliest);
id <= latestSnapshotId;
id++) {
Path snapshotPath = pathFactory.toSnapshotPath(id);
@@ -91,7 +112,7 @@ public class FileStoreExpireImpl implements FileStoreExpire {
<= millisRetained) {
// within time threshold, can assume that all snapshots
after it are also within
// the threshold
- expireUntil(id);
+ expireUntil(earliest, id);
return;
}
} catch (IOException e) {
@@ -101,18 +122,30 @@ public class FileStoreExpireImpl implements
FileStoreExpire {
}
// no snapshot can be retained, expire all but last one
- expireUntil(latestSnapshotId);
+ expireUntil(earliest, latestSnapshotId);
}
- private void expireUntil(long endExclusiveId) {
- if (endExclusiveId <= Snapshot.FIRST_SNAPSHOT_ID) {
+ private void expireUntil(long earliestId, long endExclusiveId) {
+ if (endExclusiveId <= earliestId) {
+ // No expire happens:
+ // write the hint file in order to see the earliest snapshot
directly next time
+ // should avoid duplicate writes when the file exists
+ Path hint = new Path(pathFactory.snapshotDirectory(),
SnapshotFinder.EARLIEST);
+ try {
+ if (!hint.getFileSystem().exists(hint)) {
+ writeEarliestHint(endExclusiveId);
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+
// fast exit
return;
}
// find first snapshot to expire
- long beginInclusiveId = Snapshot.FIRST_SNAPSHOT_ID;
- for (long id = endExclusiveId - 1; id >= Snapshot.FIRST_SNAPSHOT_ID;
id--) {
+ long beginInclusiveId = earliestId;
+ for (long id = endExclusiveId - 1; id >= earliestId; id--) {
Path snapshotPath = pathFactory.toSnapshotPath(id);
try {
if (!snapshotPath.getFileSystem().exists(snapshotPath)) {
@@ -198,5 +231,27 @@ public class FileStoreExpireImpl implements
FileStoreExpire {
// delete snapshot
FileUtils.deleteOrWarn(pathFactory.toSnapshotPath(id));
}
+
+ writeEarliestHint(endExclusiveId);
+ }
+
+ private void writeEarliestHint(long earliest) {
+ // update earliest hint file
+
+ Callable<Void> callable =
+ () -> {
+
SnapshotFinder.commitEarliestHint(pathFactory.snapshotDirectory(), earliest);
+ return null;
+ };
+
+ try {
+ if (lock != null) {
+ lock.runWithLock(callable);
+ } else {
+ callable.call();
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
index be17080..ec8c35a 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
@@ -21,11 +21,8 @@ package org.apache.flink.table.store.file.utils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.connector.file.table.FileSystemConnectorOptions;
import org.apache.flink.connector.file.table.RowDataPartitionComputer;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.mergetree.sst.SstPathFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
@@ -33,9 +30,6 @@ import
org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter;
import org.apache.flink.table.utils.PartitionPathUtils;
import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
@@ -45,13 +39,12 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
+import static
org.apache.flink.table.store.file.utils.SnapshotFinder.SNAPSHOT_PREFIX;
+
/** Factory which produces {@link Path}s for each type of files. */
@ThreadSafe
public class FileStorePathFactory {
- private static final Logger LOG =
LoggerFactory.getLogger(FileStorePathFactory.class);
- private static final String SNAPSHOT_PREFIX = "snapshot-";
-
private final Path root;
private final String uuid;
private final RowDataPartitionComputer partitionComputer;
@@ -124,37 +117,14 @@ public class FileStorePathFactory {
partition, "Partition row data is null. This
is unexpected.")));
}
+ public Path snapshotDirectory() {
+ return new Path(root + "/snapshot");
+ }
+
@Nullable
public Long latestSnapshotId() {
- // TODO add a `bestEffort` argument and read from a best-effort
CURRENT file if true
try {
- Path snapshotDir = new Path(root + "/snapshot");
- FileSystem fs = snapshotDir.getFileSystem();
-
- if (!fs.exists(snapshotDir)) {
- LOG.debug("The snapshot director '{}' is not exist.",
snapshotDir);
- return null;
- }
-
- FileStatus[] statuses = fs.listStatus(snapshotDir);
- if (statuses == null) {
- throw new RuntimeException(
- "The return value is null of the listStatus for the
snapshot directory.");
- }
-
- long latestId = Snapshot.FIRST_SNAPSHOT_ID - 1;
- for (FileStatus status : statuses) {
- String fileName = status.getPath().getName();
- if (fileName.startsWith(SNAPSHOT_PREFIX)) {
- try {
- long id =
Long.parseLong(fileName.substring(SNAPSHOT_PREFIX.length()));
- latestId = Math.max(latestId, id);
- } catch (NumberFormatException e) {
- LOG.warn("Invalid snapshot file name found " +
fileName, e);
- }
- }
- }
- return latestId < Snapshot.FIRST_SNAPSHOT_ID ? null : latestId;
+ return SnapshotFinder.findLatest(snapshotDirectory());
} catch (IOException e) {
throw new RuntimeException("Failed to find latest snapshot id", e);
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotFinder.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotFinder.java
new file mode 100644
index 0000000..ee4752c
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotFinder.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.UUID;
+import java.util.function.BinaryOperator;
+
+/** Find latest and earliest snapshot. */
+public class SnapshotFinder {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SnapshotFinder.class);
+
+ public static final String SNAPSHOT_PREFIX = "snapshot-";
+
+ public static final String EARLIEST = "EARLIEST";
+
+ public static final String LATEST = "LATEST";
+
+ public static Long findLatest(Path snapshotDir) throws IOException {
+ FileSystem fs = snapshotDir.getFileSystem();
+ if (!fs.exists(snapshotDir)) {
+ return null;
+ }
+
+ Long snapshotId = readHint(snapshotDir, LATEST);
+ if (snapshotId != null) {
+ long nextSnapshot = snapshotId + 1;
+ // it is the latest only there is no next one
+ if (!fs.exists(new Path(snapshotDir, SNAPSHOT_PREFIX +
nextSnapshot))) {
+ return snapshotId;
+ }
+ }
+
+ return findByListFiles(snapshotDir, Math::max);
+ }
+
+ public static Long findEarliest(Path snapshotDir) throws IOException {
+ FileSystem fs = snapshotDir.getFileSystem();
+ if (!fs.exists(snapshotDir)) {
+ return null;
+ }
+
+ Long snapshotId = readHint(snapshotDir, EARLIEST);
+ // null and it is the earliest only it exists
+ if (snapshotId != null && fs.exists(new Path(snapshotDir,
SNAPSHOT_PREFIX + snapshotId))) {
+ return snapshotId;
+ }
+
+ return findByListFiles(snapshotDir, Math::min);
+ }
+
+ @VisibleForTesting
+ public static Long readHint(Path snapshotDir, String fileName) throws
IOException {
+ Path path = new Path(snapshotDir, fileName);
+ if (path.getFileSystem().exists(path)) {
+ return Long.parseLong(FileUtils.readFileUtf8(path));
+ }
+ return null;
+ }
+
+ private static Long findByListFiles(Path snapshotDir, BinaryOperator<Long>
reducer)
+ throws IOException {
+ FileStatus[] statuses =
snapshotDir.getFileSystem().listStatus(snapshotDir);
+ if (statuses == null) {
+ throw new RuntimeException(
+ "The return value is null of the listStatus for the
snapshot directory.");
+ }
+
+ Long result = null;
+ for (FileStatus status : statuses) {
+ String fileName = status.getPath().getName();
+ if (fileName.startsWith(SNAPSHOT_PREFIX)) {
+ try {
+ long id =
Long.parseLong(fileName.substring(SNAPSHOT_PREFIX.length()));
+ result = result == null ? id : reducer.apply(result, id);
+ } catch (NumberFormatException e) {
+ throw new RuntimeException("Invalid snapshot file name
found " + fileName, e);
+ }
+ }
+ }
+ return result;
+ }
+
+ public static void commitLatestHint(Path snapshotDir, long snapshotId)
throws IOException {
+ commitHint(snapshotDir, snapshotId, LATEST);
+ }
+
+ public static void commitEarliestHint(Path snapshotDir, long snapshotId)
throws IOException {
+ commitHint(snapshotDir, snapshotId, EARLIEST);
+ }
+
+ private static void commitHint(Path snapshotDir, long snapshotId, String
fileName)
+ throws IOException {
+ FileSystem fs = snapshotDir.getFileSystem();
+ Path hintFile = new Path(snapshotDir, fileName);
+ Path tempFile = new Path(snapshotDir, UUID.randomUUID() + "-" +
fileName + ".temp");
+ FileUtils.writeFileUtf8(tempFile, String.valueOf(snapshotId));
+ fs.delete(hintFile, false);
+ boolean success = fs.rename(tempFile, hintFile);
+ if (!success) {
+ fs.delete(tempFile, false);
+ }
+ }
+}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index 319970d..15a85d0 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -40,6 +40,7 @@ import
org.apache.flink.table.store.file.operation.FileStoreWrite;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.store.file.utils.SnapshotFinder;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.function.QuadFunction;
@@ -302,18 +303,34 @@ public class TestFileStore extends FileStoreImpl {
return result;
}
- public void assertCleaned() {
+ public void assertCleaned() throws IOException {
Set<Path> filesInUse = getFilesInUse();
- Set<Path> actualFiles;
- try {
- actualFiles =
- Files.walk(Paths.get(root))
- .filter(p -> Files.isRegularFile(p))
- .map(p -> new Path(p.toString()))
- .collect(Collectors.toSet());
- } catch (IOException e) {
- throw new RuntimeException(e);
+ Set<Path> actualFiles =
+ Files.walk(Paths.get(root))
+ .filter(Files::isRegularFile)
+ .map(p -> new Path(p.toString()))
+ .collect(Collectors.toSet());
+
+ // remove best effort latest and earliest hint files
+ // Consider concurrency test, it will not be possible to check here
because the hint_file is
+ // possibly not the most accurate, so this check is only.
+ // - latest should < true_latest
+ // - earliest should < true_earliest
+ Path snapshotDir = pathFactory().snapshotDirectory();
+ Path earliest = new Path(snapshotDir, SnapshotFinder.EARLIEST);
+ Path latest = new Path(snapshotDir, SnapshotFinder.LATEST);
+ if (actualFiles.remove(earliest)) {
+ long earliestId = SnapshotFinder.readHint(snapshotDir,
SnapshotFinder.EARLIEST);
+ earliest.getFileSystem().delete(earliest, false);
+ assertThat(earliestId <=
SnapshotFinder.findEarliest(snapshotDir)).isTrue();
}
+ if (actualFiles.remove(latest)) {
+ long latestId = SnapshotFinder.readHint(snapshotDir,
SnapshotFinder.LATEST);
+ latest.getFileSystem().delete(latest, false);
+ assertThat(latestId <=
SnapshotFinder.findLatest(snapshotDir)).isTrue();
+ }
+ actualFiles.remove(latest);
+
assertThat(actualFiles).isEqualTo(filesInUse);
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
index f3ca1b0..ce1cb45 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
@@ -27,6 +27,7 @@ import
org.apache.flink.table.store.file.TestKeyValueGenerator;
import org.apache.flink.table.store.file.ValueKind;
import
org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
+import org.apache.flink.table.store.file.utils.SnapshotFinder;
import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
import org.junit.jupiter.api.BeforeEach;
@@ -78,6 +79,22 @@ public class FileStoreCommitTest {
testRandomConcurrentNoConflict(ThreadLocalRandom.current().nextInt(3)
+ 2, failing);
}
+ @Test
+ public void testLatestHint() throws Exception {
+ testRandomConcurrentNoConflict(1, false);
+ Path snapshotDir = createStore(false,
1).pathFactory().snapshotDirectory();
+ Path latest = new Path(snapshotDir, SnapshotFinder.LATEST);
+
+ assertThat(latest.getFileSystem().exists(latest)).isTrue();
+
+ Long latestId = SnapshotFinder.findLatest(snapshotDir);
+
+ // remove latest hint file
+ latest.getFileSystem().delete(latest, false);
+
+ assertThat(SnapshotFinder.findLatest(snapshotDir)).isEqualTo(latestId);
+ }
+
protected void testRandomConcurrentNoConflict(int numThreads, boolean
failing)
throws Exception {
// prepare test data
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
index 3761b4b..2884a85 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.store.file.TestFileStore;
import org.apache.flink.table.store.file.TestKeyValueGenerator;
import
org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.SnapshotFinder;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -67,7 +68,7 @@ public class FileStoreExpireTest {
}
@AfterEach
- public void afterEach() {
+ public void afterEach() throws IOException {
store.assertCleaned();
}
@@ -150,6 +151,20 @@ public class FileStoreExpireTest {
}
}
}
+
+ // validate earliest hint file
+
+ Path snapshotDir = pathFactory.snapshotDirectory();
+ Path earliest = new Path(snapshotDir, SnapshotFinder.EARLIEST);
+
+ assertThat(earliest.getFileSystem().exists(earliest)).isTrue();
+
+ Long earliestId = SnapshotFinder.findEarliest(snapshotDir);
+
+ // remove earliest hint file
+ earliest.getFileSystem().delete(earliest, false);
+
+
assertThat(SnapshotFinder.findEarliest(snapshotDir)).isEqualTo(earliestId);
}
@Test