This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b080e1  [FLINK-25689] Introduce atomic commit
8b080e1 is described below

commit 8b080e108d3ac3ff4425f2deb41c154e0e0c5504
Author: tsreaper <[email protected]>
AuthorDate: Fri Jan 21 13:38:38 2022 +0800

    [FLINK-25689] Introduce atomic commit
    
    This closes #12
---
 .../flink/table/store/file/FileStoreOptions.java   |  52 +++
 .../apache/flink/table/store/file/KeyValue.java    |  39 +++
 .../apache/flink/table/store/file/Snapshot.java    | 132 ++++++++
 .../store/file/manifest/ManifestCommittable.java   |  30 ++
 .../table/store/file/manifest/ManifestEntry.java   |  17 +-
 .../table/store/file/manifest/ManifestFile.java    |  21 +-
 .../store/file/manifest/ManifestFileMeta.java      |  94 ++++++
 .../table/store/file/manifest/ManifestList.java    |  21 +-
 .../table/store/file/mergetree/sst/SstFile.java    |  13 +-
 .../store/file/operation/FileStoreCommitImpl.java  | 375 +++++++++++++++++++++
 .../store/file/operation/FileStoreScanImpl.java    | 142 ++++++++
 .../store/file/utils/FileStorePathFactory.java     |  44 ++-
 .../flink/table/store/file/utils/FileUtils.java    |  28 ++
 .../table/store/file/TestKeyValueGenerator.java    |   6 +-
 .../store/file/manifest/ManifestFileMetaTest.java  | 227 +++++++++++++
 .../store/file/manifest/ManifestFileTest.java      |   5 +-
 .../store/file/manifest/ManifestListTest.java      |   5 +-
 .../file/mergetree/sst/SstTestDataGenerator.java   |   4 +-
 .../file/operation/FileStoreCommitTestBase.java    | 268 +++++++++++++++
 .../store/file/operation/TestCommitThread.java     | 251 ++++++++++++++
 20 files changed, 1732 insertions(+), 42 deletions(-)

diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
new file mode 100644
index 0000000..028b678
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.ReadableConfig;
+
+/** Options for {@link FileStore}. */
+public class FileStoreOptions {
+
+    public static final ConfigOption<Integer> BUCKET =
+            ConfigOptions.key("bucket")
+                    .intType()
+                    .defaultValue(1)
+                    .withDescription("Bucket number for file store.");
+
+    public static final ConfigOption<MemorySize> MANIFEST_TARGET_FILE_SIZE =
+            ConfigOptions.key("manifest.target-file-size")
+                    .memoryType()
+                    .defaultValue(MemorySize.ofMebiBytes(8))
+                    .withDescription("Suggested file size of a manifest 
file.");
+
+    public final int bucket;
+    public final MemorySize manifestSuggestedSize;
+
+    public FileStoreOptions(int bucket, MemorySize manifestSuggestedSize) {
+        this.bucket = bucket;
+        this.manifestSuggestedSize = manifestSuggestedSize;
+    }
+
+    public FileStoreOptions(ReadableConfig config) {
+        this(config.get(BUCKET), config.get(MANIFEST_TARGET_FILE_SIZE));
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
index d5ff507..8461d64 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
@@ -18,13 +18,17 @@
 
 package org.apache.flink.table.store.file;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.TinyIntType;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 /**
  * A key value, including user key, sequence number, value kind and value. 
This object can be
@@ -73,4 +77,39 @@ public class KeyValue {
         fields.addAll(valueType.getFields());
         return new RowType(fields);
     }
+
+    @VisibleForTesting
+    public KeyValue copy(RowDataSerializer keySerializer, RowDataSerializer 
valueSerializer) {
+        return new KeyValue()
+                .replace(
+                        keySerializer.copy(key),
+                        sequenceNumber,
+                        valueKind,
+                        valueSerializer.copy(value));
+    }
+
+    @VisibleForTesting
+    public String toString(RowType keyType, RowType valueType) {
+        String keyString = rowDataToString(key, keyType);
+        String valueString = rowDataToString(value, valueType);
+        return "{kind: "
+                + valueKind.name()
+                + ", seq: "
+                + sequenceNumber
+                + ", key: ("
+                + keyString
+                + "), value: ("
+                + valueString
+                + ")}";
+    }
+
+    public static String rowDataToString(RowData row, RowType type) {
+        return IntStream.range(0, type.getFieldCount())
+                .mapToObj(
+                        i ->
+                                String.valueOf(
+                                        
RowData.createFieldGetter(type.getTypeAt(i), i)
+                                                .getFieldOrNull(row)))
+                .collect(Collectors.joining(", "));
+    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
new file mode 100644
index 0000000..77f79e6
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.utils.FileUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+
+/** This file is the entrance to all data committed at some specific time 
point. */
+public class Snapshot {
+
+    public static final long FIRST_SNAPSHOT_ID = 1;
+
+    private static final String FIELD_ID = "id";
+    private static final String FIELD_MANIFEST_LIST = "manifestList";
+    private static final String FIELD_COMMIT_USER = "commitUser";
+    private static final String FIELD_COMMIT_DIGEST = "commitDigest";
+    private static final String FIELD_COMMIT_KIND = "commitKind";
+
+    @JsonProperty(FIELD_ID)
+    private final long id;
+
+    @JsonProperty(FIELD_MANIFEST_LIST)
+    private final String manifestList;
+
+    @JsonProperty(FIELD_COMMIT_USER)
+    private final String commitUser;
+
+    // for deduplication
+    @JsonProperty(FIELD_COMMIT_DIGEST)
+    private final String commitDigest;
+
+    @JsonProperty(FIELD_COMMIT_KIND)
+    private final CommitKind commitKind;
+
+    @JsonCreator
+    public Snapshot(
+            @JsonProperty(FIELD_ID) long id,
+            @JsonProperty(FIELD_MANIFEST_LIST) String manifestList,
+            @JsonProperty(FIELD_COMMIT_USER) String commitUser,
+            @JsonProperty(FIELD_COMMIT_DIGEST) String commitDigest,
+            @JsonProperty(FIELD_COMMIT_KIND) CommitKind commitKind) {
+        this.id = id;
+        this.manifestList = manifestList;
+        this.commitUser = commitUser;
+        this.commitDigest = commitDigest;
+        this.commitKind = commitKind;
+    }
+
+    @JsonGetter(FIELD_ID)
+    public long id() {
+        return id;
+    }
+
+    @JsonGetter(FIELD_MANIFEST_LIST)
+    public String manifestList() {
+        return manifestList;
+    }
+
+    @JsonGetter(FIELD_COMMIT_USER)
+    public String commitUser() {
+        return commitUser;
+    }
+
+    @JsonGetter(FIELD_COMMIT_DIGEST)
+    public String commitDigest() {
+        return commitDigest;
+    }
+
+    @JsonGetter(FIELD_COMMIT_KIND)
+    public CommitKind commitKind() {
+        return commitKind;
+    }
+
+    public String toJson() {
+        try {
+            return new ObjectMapper().writeValueAsString(this);
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static Snapshot fromJson(String json) {
+        try {
+            return new ObjectMapper().readValue(json, Snapshot.class);
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static Snapshot fromPath(Path path) {
+        try {
+            String json = FileUtils.readFileUtf8(path);
+            return Snapshot.fromJson(json);
+        } catch (IOException e) {
+            throw new RuntimeException("Fails to read snapshot from path " + 
path, e);
+        }
+    }
+
+    /** Type of changes in this snapshot. */
+    public enum CommitKind {
+
+        /** Changes flushed from the mem table. */
+        APPEND,
+
+        /** Changes by compacting existing sst files. */
+        COMPACT
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java
index a02fd75..4fcd763 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java
@@ -98,4 +98,34 @@ public class ManifestCommittable {
     public int hashCode() {
         return Objects.hash(newFiles, compactBefore, compactAfter);
     }
+
+    @Override
+    public String toString() {
+        return "new files:\n"
+                + filesToString(newFiles)
+                + "compact before:\n"
+                + filesToString(compactBefore)
+                + "compact after:\n"
+                + filesToString(compactAfter);
+    }
+
+    private static String filesToString(Map<BinaryRowData, Map<Integer, 
List<SstFileMeta>>> files) {
+        StringBuilder builder = new StringBuilder();
+        for (Map.Entry<BinaryRowData, Map<Integer, List<SstFileMeta>>> 
entryWithPartition :
+                files.entrySet()) {
+            for (Map.Entry<Integer, List<SstFileMeta>> entryWithBucket :
+                    entryWithPartition.getValue().entrySet()) {
+                for (SstFileMeta sst : entryWithBucket.getValue()) {
+                    builder.append("  * partition: ")
+                            .append(entryWithPartition.getKey())
+                            .append(", bucket: ")
+                            .append(entryWithBucket.getKey())
+                            .append(", file: ")
+                            .append(sst.fileName())
+                            .append("\n");
+                }
+            }
+        }
+        return builder.toString();
+    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntry.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntry.java
index 206b8e1..170b024 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntry.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntry.java
@@ -73,7 +73,7 @@ public class ManifestEntry {
     }
 
     public Identifier identifier() {
-        return new Identifier(partition, bucket, file.fileName());
+        return new Identifier(partition, bucket, file.level(), 
file.fileName());
     }
 
     public static RowType schema(RowType partitionType, RowType keyType, 
RowType rowType) {
@@ -114,13 +114,15 @@ public class ManifestEntry {
      * file.
      */
     public static class Identifier {
-        private final BinaryRowData partition;
-        private final int bucket;
-        private final String fileName;
+        public final BinaryRowData partition;
+        public final int bucket;
+        public final int level;
+        public final String fileName;
 
-        private Identifier(BinaryRowData partition, int bucket, String 
fileName) {
+        private Identifier(BinaryRowData partition, int bucket, int level, 
String fileName) {
             this.partition = partition;
             this.bucket = bucket;
+            this.level = level;
             this.fileName = fileName;
         }
 
@@ -132,17 +134,18 @@ public class ManifestEntry {
             Identifier that = (Identifier) o;
             return Objects.equals(partition, that.partition)
                     && bucket == that.bucket
+                    && level == that.level
                     && Objects.equals(fileName, that.fileName);
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(partition, bucket, fileName);
+            return Objects.hash(partition, bucket, level, fileName);
         }
 
         @Override
         public String toString() {
-            return String.format("{%s, %d, %s}", partition, bucket, fileName);
+            return String.format("{%s, %d, %d, %s}", partition, bucket, level, 
fileName);
         }
     }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
index 13d8be3..1b2bb92 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
@@ -32,9 +32,6 @@ import org.apache.flink.table.store.file.utils.FileUtils;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Preconditions;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.IOException;
 import java.util.List;
 
@@ -44,8 +41,6 @@ import java.util.List;
  */
 public class ManifestFile {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(ManifestFile.class);
-
     private final RowType partitionType;
     private final ManifestEntrySerializer serializer;
     private final BulkFormat<RowData, FileSourceSplit> readerFactory;
@@ -66,9 +61,13 @@ public class ManifestFile {
         this.pathFactory = pathFactory;
     }
 
-    public List<ManifestEntry> read(String fileName) throws IOException {
-        return FileUtils.readListFromFile(
-                pathFactory.toManifestFilePath(fileName), serializer, 
readerFactory);
+    public List<ManifestEntry> read(String fileName) {
+        try {
+            return FileUtils.readListFromFile(
+                    pathFactory.toManifestFilePath(fileName), serializer, 
readerFactory);
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to read manifest file " + 
fileName, e);
+        }
     }
 
     /**
@@ -76,7 +75,7 @@ public class ManifestFile {
      *
      * <p>NOTE: This method is atomic.
      */
-    public ManifestFileMeta write(List<ManifestEntry> entries) throws 
IOException {
+    public ManifestFileMeta write(List<ManifestEntry> entries) {
         Preconditions.checkArgument(
                 entries.size() > 0, "Manifest entries to write must not be 
empty.");
 
@@ -84,9 +83,9 @@ public class ManifestFile {
         try {
             return write(entries, path);
         } catch (Throwable e) {
-            LOG.warn("Exception occurs when writing manifest file " + path + 
". Cleaning up.", e);
             FileUtils.deleteOrWarn(path);
-            throw e;
+            throw new RuntimeException(
+                    "Exception occurs when writing manifest file " + path + ". 
Clean up.", e);
         }
     }
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
index 38f9467..0c4aa8f 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
@@ -23,10 +23,13 @@ import 
org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.util.Preconditions;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
 /** Metadata of a manifest file. */
@@ -118,4 +121,95 @@ public class ManifestFileMeta {
                 numDeletedFiles,
                 Arrays.toString(partitionStats));
     }
+
+    /**
+     * Merge several {@link ManifestFileMeta}s. {@link ManifestEntry}s 
representing first adding and
+     * then deleting the same sst file will cancel each other.
+     *
+     * <p>NOTE: This method is atomic.
+     */
+    public static List<ManifestFileMeta> merge(
+            List<ManifestFileMeta> metas, ManifestFile manifestFile, long 
suggestedMetaSize) {
+        List<ManifestFileMeta> result = new ArrayList<>();
+        // these are the newly created manifest files, clean them up if 
exception occurs
+        List<ManifestFileMeta> newMetas = new ArrayList<>();
+        List<ManifestFileMeta> candidate = new ArrayList<>();
+        long totalSize = 0;
+
+        try {
+            for (ManifestFileMeta manifest : metas) {
+                totalSize += manifest.fileSize;
+                candidate.add(manifest);
+                if (totalSize >= suggestedMetaSize) {
+                    // reach suggested file size, perform merging and produce 
new file
+                    merge(candidate, manifestFile, result, newMetas);
+                    candidate.clear();
+                    totalSize = 0;
+                }
+            }
+            if (!candidate.isEmpty()) {
+                // merge the last bit of metas
+                merge(candidate, manifestFile, result, newMetas);
+            }
+        } catch (Throwable e) {
+            // exception occurs, clean up and rethrow
+            for (ManifestFileMeta manifest : newMetas) {
+                manifestFile.delete(manifest.fileName);
+            }
+            throw e;
+        }
+
+        return result;
+    }
+
+    private static void merge(
+            List<ManifestFileMeta> metas,
+            ManifestFile manifestFile,
+            List<ManifestFileMeta> result,
+            List<ManifestFileMeta> newMetas) {
+        if (metas.size() > 1) {
+            ManifestFileMeta newMeta = merge(metas, manifestFile);
+            result.add(newMeta);
+            newMetas.add(newMeta);
+        } else {
+            result.addAll(metas);
+        }
+    }
+
+    private static ManifestFileMeta merge(List<ManifestFileMeta> metas, 
ManifestFile manifestFile) {
+        Preconditions.checkArgument(
+                metas.size() > 1, "Number of ManifestFileMeta <= 1. This is a 
bug.");
+
+        Map<ManifestEntry.Identifier, ManifestEntry> map = new 
LinkedHashMap<>();
+        for (ManifestFileMeta manifest : metas) {
+            for (ManifestEntry entry : manifestFile.read(manifest.fileName)) {
+                ManifestEntry.Identifier identifier = entry.identifier();
+                switch (entry.kind()) {
+                    case ADD:
+                        Preconditions.checkState(
+                                !map.containsKey(identifier),
+                                "Trying to add file %s which is already added. 
Manifest might be corrupted.",
+                                identifier);
+                        map.put(identifier, entry);
+                        break;
+                    case DELETE:
+                        // each sst file will only be added once and deleted 
once,
+                        // if we know that it is added before then both add 
and delete entry can be
+                        // removed because there won't be further operations 
on this file,
+                        // otherwise we have to keep the delete entry because 
the add entry must be
+                        // in the previous manifest files
+                        if (map.containsKey(identifier)) {
+                            map.remove(identifier);
+                        } else {
+                            map.put(identifier, entry);
+                        }
+                        break;
+                    default:
+                        throw new UnsupportedOperationException(
+                                "Unknown value kind " + entry.kind().name());
+                }
+            }
+        }
+        return manifestFile.write(new ArrayList<>(map.values()));
+    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java
index 73d9ac1..1b008dd 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java
@@ -31,9 +31,6 @@ import org.apache.flink.table.store.file.utils.FileUtils;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Preconditions;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.IOException;
 import java.util.List;
 
@@ -43,8 +40,6 @@ import java.util.List;
  */
 public class ManifestList {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(ManifestList.class);
-
     private final ManifestFileMetaSerializer serializer;
     private final BulkFormat<RowData, FileSourceSplit> readerFactory;
     private final BulkWriter.Factory<RowData> writerFactory;
@@ -59,9 +54,13 @@ public class ManifestList {
         this.pathFactory = pathFactory;
     }
 
-    public List<ManifestFileMeta> read(String fileName) throws IOException {
-        return FileUtils.readListFromFile(
-                pathFactory.toManifestListPath(fileName), serializer, 
readerFactory);
+    public List<ManifestFileMeta> read(String fileName) {
+        try {
+            return FileUtils.readListFromFile(
+                    pathFactory.toManifestListPath(fileName), serializer, 
readerFactory);
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to read manifest list " + 
fileName, e);
+        }
     }
 
     /**
@@ -69,7 +68,7 @@ public class ManifestList {
      *
      * <p>NOTE: This method is atomic.
      */
-    public String write(List<ManifestFileMeta> metas) throws IOException {
+    public String write(List<ManifestFileMeta> metas) {
         Preconditions.checkArgument(
                 metas.size() > 0, "Manifest file metas to write must not be 
empty.");
 
@@ -77,9 +76,9 @@ public class ManifestList {
         try {
             return write(metas, path);
         } catch (Throwable e) {
-            LOG.warn("Exception occurs when writing manifest list " + path + 
". Cleaning up.", e);
             FileUtils.deleteOrWarn(path);
-            throw e;
+            throw new RuntimeException(
+                    "Exception occurs when writing manifest list " + path + ". 
Clean up.", e);
         }
     }
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFile.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFile.java
index d73ff45..caae17b 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFile.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFile.java
@@ -222,11 +222,22 @@ public class SstFile {
             this.maxKey = null;
             this.minSequenceNumber = Long.MAX_VALUE;
             this.maxSequenceNumber = Long.MIN_VALUE;
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Creating new sst file " + path);
+            }
         }
 
         private void write(KeyValue kv) throws IOException {
-            writer.addElement(serializer.toRow(kv));
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(
+                        "Writing key-value to sst file "
+                                + path
+                                + ", kv: "
+                                + kv.toString(keyType, valueType));
+            }
 
+            writer.addElement(serializer.toRow(kv));
             rowCount++;
             if (minKey == null) {
                 minKey = keySerializer.toBinaryRow(kv.key()).copy();
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
new file mode 100644
index 0000000..8cb9fc0
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+import 
org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.manifest.ManifestFile;
+import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
+import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.FileUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Default implementation of {@link FileStoreCommit}.
+ *
+ * <p>This class provides an atomic commit method to the user.
+ *
+ * <ol>
+ *   <li>Before calling {@link FileStoreCommitImpl#commit}, if user cannot 
determine if this commit
+ *       is done before, user should first call {@link 
FileStoreCommitImpl#filterCommitted}.
+ *   <li>Before committing, it will first check for conflicts by checking if 
all files to be removed
+ *       currently exists.
+ *   <li>After that it use the external {@link FileStoreCommitImpl#lock} (if 
provided) or the atomic
+ *       rename of the file system to ensure atomicity.
+ *   <li>If commit fails due to conflicts or exception it tries its best to 
clean up and aborts.
+ *   <li>If atomic rename fails it tries again after reading the latest 
snapshot from step 2.
+ * </ol>
+ */
+public class FileStoreCommitImpl implements FileStoreCommit {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FileStoreCommitImpl.class);
+
+    private final String commitUser;
+    private final ManifestCommittableSerializer committableSerializer;
+
+    private final FileStorePathFactory pathFactory;
+    private final ManifestFile manifestFile;
+    private final ManifestList manifestList;
+    private final FileStoreOptions fileStoreOptions;
+    private final FileStoreScan scan;
+
+    @Nullable private Lock lock;
+
+    public FileStoreCommitImpl(
+            String commitUser,
+            ManifestCommittableSerializer committableSerializer,
+            FileStorePathFactory pathFactory,
+            ManifestFile manifestFile,
+            ManifestList manifestList,
+            FileStoreOptions fileStoreOptions,
+            FileStoreScan scan) {
+        this.commitUser = commitUser;
+        this.committableSerializer = committableSerializer;
+
+        this.pathFactory = pathFactory;
+        this.manifestFile = manifestFile;
+        this.manifestList = manifestList;
+        this.fileStoreOptions = fileStoreOptions;
+        this.scan = scan;
+
+        this.lock = null;
+    }
+
+    @Override
+    public FileStoreCommit withLock(Lock lock) {
+        this.lock = lock;
+        return this;
+    }
+
+    @Override
+    public List<ManifestCommittable> filterCommitted(List<ManifestCommittable> 
committableList) {
+        committableList = new ArrayList<>(committableList);
+
+        // filter out commits with no new files
+        committableList.removeIf(committable -> 
committable.newFiles().isEmpty());
+
+        // if there is no previous snapshots then nothing should be filtered
+        Long latestSnapshotId = pathFactory.latestSnapshotId();
+        if (latestSnapshotId == null) {
+            return committableList;
+        }
+
+        // check if a committable is already committed by its hash
+        Map<String, ManifestCommittable> digests = new LinkedHashMap<>();
+        for (ManifestCommittable committable : committableList) {
+            digests.put(digestManifestCommittable(committable), committable);
+        }
+
+        for (long id = latestSnapshotId; id >= Snapshot.FIRST_SNAPSHOT_ID; 
id--) {
+            Path snapshotPath = pathFactory.toSnapshotPath(id);
+            Snapshot snapshot = Snapshot.fromPath(snapshotPath);
+            if (commitUser.equals(snapshot.commitUser())) {
+                if (digests.containsKey(snapshot.commitDigest())) {
+                    digests.remove(snapshot.commitDigest());
+                } else {
+                    // early exit, because committableList must be the latest 
commits by this
+                    // commit user
+                    break;
+                }
+            }
+        }
+
+        return new ArrayList<>(digests.values());
+    }
+
+    @Override
+    public void commit(ManifestCommittable committable, Map<String, String> 
properties) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Ready to commit\n" + committable.toString());
+        }
+
+        String hash = digestManifestCommittable(committable);
+
+        List<ManifestEntry> appendChanges = 
collectChanges(committable.newFiles(), ValueKind.ADD);
+        if (!appendChanges.isEmpty()) {
+            tryCommit(appendChanges, hash, Snapshot.CommitKind.APPEND);
+        }
+
+        List<ManifestEntry> compactChanges = new ArrayList<>();
+        compactChanges.addAll(collectChanges(committable.compactBefore(), 
ValueKind.DELETE));
+        compactChanges.addAll(collectChanges(committable.compactAfter(), 
ValueKind.ADD));
+        if (!compactChanges.isEmpty()) {
+            tryCommit(compactChanges, hash, Snapshot.CommitKind.COMPACT);
+        }
+    }
+
+    @Override
+    public void overwrite(
+            Map<String, String> partition,
+            ManifestCommittable committable,
+            Map<String, String> properties) {
+        throw new UnsupportedOperationException();
+    }
+
+    private String digestManifestCommittable(ManifestCommittable committable) {
+        try {
+            return new String(
+                    Base64.getEncoder()
+                            .encode(
+                                    MessageDigest.getInstance("MD5")
+                                            
.digest(committableSerializer.serialize(committable))));
+        } catch (NoSuchAlgorithmException e) {
+            throw new RuntimeException("MD5 algorithm not found. This is 
impossible.", e);
+        } catch (IOException e) {
+            throw new RuntimeException(
+                    "Failed to serialize ManifestCommittable. This is 
unexpected.", e);
+        }
+    }
+
+    private List<ManifestEntry> collectChanges(
+            Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> map, ValueKind 
kind) {
+        List<ManifestEntry> changes = new ArrayList<>();
+        for (Map.Entry<BinaryRowData, Map<Integer, List<SstFileMeta>>> 
entryWithPartition :
+                map.entrySet()) {
+            for (Map.Entry<Integer, List<SstFileMeta>> entryWithBucket :
+                    entryWithPartition.getValue().entrySet()) {
+                changes.addAll(
+                        entryWithBucket.getValue().stream()
+                                .map(
+                                        file ->
+                                                new ManifestEntry(
+                                                        kind,
+                                                        
entryWithPartition.getKey(),
+                                                        
entryWithBucket.getKey(),
+                                                        
fileStoreOptions.bucket,
+                                                        file))
+                                .collect(Collectors.toList()));
+            }
+        }
+        return changes;
+    }
+
+    private void tryCommit(
+            List<ManifestEntry> changes, String hash, Snapshot.CommitKind 
commitKind) {
+        while (true) {
+            Long latestSnapshotId = pathFactory.latestSnapshotId();
+            long newSnapshotId =
+                    latestSnapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID : 
latestSnapshotId + 1;
+            Path newSnapshotPath = pathFactory.toSnapshotPath(newSnapshotId);
+            Path tmpSnapshotPath = 
pathFactory.toTmpSnapshotPath(newSnapshotId);
+
+            Snapshot latestSnapshot = null;
+            if (latestSnapshotId != null) {
+                noConflictsOrFail(latestSnapshotId, changes);
+                latestSnapshot = 
Snapshot.fromPath(pathFactory.toSnapshotPath(latestSnapshotId));
+            }
+
+            Snapshot newSnapshot;
+            String manifestListName = null;
+            List<ManifestFileMeta> oldMetas = new ArrayList<>();
+            List<ManifestFileMeta> newMetas = new ArrayList<>();
+            try {
+                if (latestSnapshot != null) {
+                    // read all previous manifest files
+                    
oldMetas.addAll(manifestList.read(latestSnapshot.manifestList()));
+                    // merge manifest files
+                    newMetas.addAll(
+                            ManifestFileMeta.merge(
+                                    oldMetas,
+                                    manifestFile,
+                                    
fileStoreOptions.manifestSuggestedSize.getBytes()));
+                }
+                // write all changes to manifest file
+                newMetas.add(manifestFile.write(changes));
+                // prepare snapshot file
+                manifestListName = manifestList.write(newMetas);
+                newSnapshot =
+                        new Snapshot(newSnapshotId, manifestListName, 
commitUser, hash, commitKind);
+                FileUtils.writeFileUtf8(tmpSnapshotPath, newSnapshot.toJson());
+            } catch (Throwable e) {
+                // fails when preparing for commit, we should clean up
+                cleanUpTmpSnapshot(tmpSnapshotPath, manifestListName, 
oldMetas, newMetas);
+                throw new RuntimeException(
+                        String.format(
+                                "Exception occurs when preparing snapshot #%d 
(path %s) by user %s "
+                                        + "with hash %s and kind %s. Clean 
up.",
+                                newSnapshotId,
+                                newSnapshotPath.toString(),
+                                commitUser,
+                                hash,
+                                commitKind.name()),
+                        e);
+            }
+
+            boolean success;
+            try {
+                FileSystem fs = tmpSnapshotPath.getFileSystem();
+                // atomic rename
+                if (lock != null) {
+                    success =
+                            lock.runWithLock(
+                                    () ->
+                                            // fs.rename may not returns false 
if target file
+                                            // already exists, or even not 
atomic
+                                            // as we're relying on external 
locking, we can first
+                                            // check if file exist then rename 
to work around this
+                                            // case
+                                            !fs.exists(newSnapshotPath)
+                                                    && 
fs.rename(tmpSnapshotPath, newSnapshotPath));
+                } else {
+                    success = fs.rename(tmpSnapshotPath, newSnapshotPath);
+                }
+            } catch (Throwable e) {
+                // exception when performing the atomic rename,
+                // we cannot clean up because we can't determine the success
+                throw new RuntimeException(
+                        String.format(
+                                "Exception occurs when committing snapshot #%d 
(path %s) by user %s "
+                                        + "with hash %s and kind %s. "
+                                        + "Cannot clean up because we can't 
determine the success.",
+                                newSnapshotId,
+                                newSnapshotPath.toString(),
+                                commitUser,
+                                hash,
+                                commitKind.name()),
+                        e);
+            }
+
+            if (success) {
+                return;
+            }
+
+            // atomic rename fails, clean up and try again
+            LOG.warn(
+                    String.format(
+                            "Atomic rename failed for snapshot #%d (path %s) 
by user %s "
+                                    + "with hash %s and kind %s. "
+                                    + "Clean up and try again.",
+                            newSnapshotId,
+                            newSnapshotPath.toString(),
+                            commitUser,
+                            hash,
+                            commitKind.name()));
+            cleanUpTmpSnapshot(tmpSnapshotPath, manifestListName, oldMetas, 
newMetas);
+        }
+    }
+
+    private void noConflictsOrFail(long snapshotId, List<ManifestEntry> 
changes) {
+        Set<ManifestEntry.Identifier> removedFiles =
+                changes.stream()
+                        .filter(e -> e.kind().equals(ValueKind.DELETE))
+                        .map(ManifestEntry::identifier)
+                        .collect(Collectors.toSet());
+        if (removedFiles.isEmpty()) {
+            // early exit for append only changes
+            return;
+        }
+
+        try {
+            // TODO use partition filter of scan when implemented
+            for (ManifestEntry entry : 
scan.withSnapshot(snapshotId).plan().files()) {
+                removedFiles.remove(entry.identifier());
+            }
+        } catch (Throwable e) {
+            throw new RuntimeException("Cannot determine if conflicts exist.", 
e);
+        }
+
+        if (!removedFiles.isEmpty()) {
+            throw new RuntimeException(
+                    "Conflicts detected on:\n"
+                            + removedFiles.stream()
+                                    .map(
+                                            i ->
+                                                    
pathFactory.getPartitionString(i.partition)
+                                                            + ", bucket "
+                                                            + i.bucket
+                                                            + ", level "
+                                                            + i.level
+                                                            + ", file "
+                                                            + i.fileName)
+                                    .collect(Collectors.joining("\n")));
+        }
+    }
+
+    private void cleanUpTmpSnapshot(
+            Path tmpSnapshotPath,
+            String manifestListName,
+            List<ManifestFileMeta> oldMetas,
+            List<ManifestFileMeta> newMetas) {
+        // clean up tmp snapshot file
+        FileUtils.deleteOrWarn(tmpSnapshotPath);
+        // clean up newly created manifest list
+        if (manifestListName != null) {
+            manifestList.delete(manifestListName);
+        }
+        // clean up newly merged manifest files
+        Set<ManifestFileMeta> oldMetaSet = new HashSet<>(oldMetas); // for 
faster searching
+        for (ManifestFileMeta suspect : newMetas) {
+            if (!oldMetaSet.contains(suspect)) {
+                manifestList.delete(suspect.fileName());
+            }
+        }
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
new file mode 100644
index 0000000..da72dbf
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.manifest.ManifestFile;
+import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
+import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Default implementation of {@link FileStoreScan}. */
+public class FileStoreScanImpl implements FileStoreScan {
+
+    private final FileStorePathFactory pathFactory;
+    private final ManifestFile manifestFile;
+    private final ManifestList manifestList;
+
+    private Long snapshotId;
+    private List<ManifestFileMeta> manifests;
+
+    public FileStoreScanImpl(
+            FileStorePathFactory pathFactory,
+            ManifestFile manifestFile,
+            ManifestList manifestList) {
+        this.pathFactory = pathFactory;
+        this.manifestFile = manifestFile;
+        this.manifestList = manifestList;
+
+        this.snapshotId = null;
+        this.manifests = new ArrayList<>();
+    }
+
+    @Override
+    public FileStoreScan withPartitionFilter(Predicate predicate) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public FileStoreScan withKeyFilter(Predicate predicate) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public FileStoreScan withValueFilter(Predicate predicate) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public FileStoreScan withBucket(int bucket) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public FileStoreScan withSnapshot(long snapshotId) {
+        this.snapshotId = snapshotId;
+        Snapshot snapshot = 
Snapshot.fromPath(pathFactory.toSnapshotPath(snapshotId));
+        this.manifests = manifestList.read(snapshot.manifestList());
+        return this;
+    }
+
+    @Override
+    public FileStoreScan withManifestList(List<ManifestFileMeta> manifests) {
+        this.manifests = manifests;
+        return this;
+    }
+
+    @Override
+    public Plan plan() {
+        List<ManifestEntry> files = scan();
+
+        return new Plan() {
+            @Nullable
+            @Override
+            public Long snapshotId() {
+                return snapshotId;
+            }
+
+            @Override
+            public List<ManifestEntry> files() {
+                return files;
+            }
+        };
+    }
+
+    private List<ManifestEntry> scan() {
+        Map<ManifestEntry.Identifier, ManifestEntry> map = new 
LinkedHashMap<>();
+        for (ManifestFileMeta manifest : manifests) {
+            // TODO read each manifest file concurrently
+            for (ManifestEntry entry : manifestFile.read(manifest.fileName())) 
{
+                ManifestEntry.Identifier identifier = entry.identifier();
+                switch (entry.kind()) {
+                    case ADD:
+                        Preconditions.checkState(
+                                !map.containsKey(identifier),
+                                "Trying to add file %s which is already added. 
"
+                                        + "Manifest might be corrupted.",
+                                identifier);
+                        map.put(identifier, entry);
+                        break;
+                    case DELETE:
+                        Preconditions.checkState(
+                                map.containsKey(identifier),
+                                "Trying to delete file %s which is not 
previously added. "
+                                        + "Manifest might be corrupted.",
+                                identifier);
+                        map.remove(identifier);
+                        break;
+                    default:
+                        throw new UnsupportedOperationException(
+                                "Unknown value kind " + entry.kind().name());
+                }
+            }
+        }
+        return new ArrayList<>(map.values());
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
index 0aaef19..9aa9844 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
@@ -21,8 +21,11 @@ package org.apache.flink.table.store.file.utils;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.connector.file.table.FileSystemConnectorOptions;
 import org.apache.flink.connector.file.table.RowDataPartitionComputer;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.mergetree.sst.SstPathFactory;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
@@ -30,11 +33,20 @@ import 
org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter;
 import org.apache.flink.table.utils.PartitionPathUtils;
 import org.apache.flink.util.Preconditions;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
 import java.util.UUID;
 
 /** Factory which produces {@link Path}s for each type of files. */
 public class FileStorePathFactory {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(FileStorePathFactory.class);
+    private static final String SNAPSHOT_PREFIX = "snapshot-";
+
     private final Path root;
     private final String uuid;
     private final RowDataPartitionComputer partitionComputer;
@@ -82,7 +94,11 @@ public class FileStorePathFactory {
     }
 
     public Path toSnapshotPath(long id) {
-        return new Path(root + "/snapshot/snapshot-" + id);
+        return new Path(root + "/snapshot/" + SNAPSHOT_PREFIX + id);
+    }
+
+    public Path toTmpSnapshotPath(long id) {
+        return new Path(root + "/snapshot/." + SNAPSHOT_PREFIX + id + "-" + 
UUID.randomUUID());
     }
 
     public SstPathFactory createSstPathFactory(BinaryRowData partition, int 
bucket) {
@@ -96,6 +112,32 @@ public class FileStorePathFactory {
                                 partition, "Partition row data is null. This 
is unexpected.")));
     }
 
+    @Nullable
+    public Long latestSnapshotId() {
+        // TODO add a `bestEffort` argument and read from a best-effort 
CURRENT file if true
+        try {
+            Path snapshotDir = new Path(root + "/snapshot");
+            FileSystem fs = snapshotDir.getFileSystem();
+            FileStatus[] statuses = fs.listStatus(snapshotDir);
+
+            long latestId = Snapshot.FIRST_SNAPSHOT_ID - 1;
+            for (FileStatus status : statuses) {
+                String fileName = status.getPath().getName();
+                if (fileName.startsWith(SNAPSHOT_PREFIX)) {
+                    try {
+                        long id = 
Long.parseLong(fileName.substring(SNAPSHOT_PREFIX.length()));
+                        latestId = Math.max(latestId, id);
+                    } catch (NumberFormatException e) {
+                        LOG.warn("Invalid snapshot file name found " + 
fileName, e);
+                    }
+                }
+            }
+            return latestId < Snapshot.FIRST_SNAPSHOT_ID ? null : latestId;
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to find latest snapshot id", e);
+        }
+    }
+
     @VisibleForTesting
     public String uuid() {
         return uuid;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java
index b787b9a..33f78b5 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java
@@ -23,6 +23,8 @@ import 
org.apache.flink.connector.base.source.reader.SourceReaderOptions;
 import org.apache.flink.connector.file.src.FileSourceSplit;
 import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.connector.file.src.util.Utils;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.RowData;
@@ -30,7 +32,11 @@ import org.apache.flink.table.data.RowData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.BufferedReader;
 import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -63,6 +69,28 @@ public class FileUtils {
         return path.getFileSystem().getFileStatus(path).getLen();
     }
 
+    public static String readFileUtf8(Path file) throws IOException {
+        try (FSDataInputStream in = file.getFileSystem().open(file)) {
+            BufferedReader reader =
+                    new BufferedReader(new InputStreamReader(in, 
StandardCharsets.UTF_8));
+            StringBuilder builder = new StringBuilder();
+            String line;
+            while ((line = reader.readLine()) != null) {
+                builder.append(line);
+            }
+            return builder.toString();
+        }
+    }
+
+    public static void writeFileUtf8(Path file, String content) throws 
IOException {
+        try (FSDataOutputStream out =
+                file.getFileSystem().create(file, 
FileSystem.WriteMode.NO_OVERWRITE)) {
+            OutputStreamWriter writer = new OutputStreamWriter(out, 
StandardCharsets.UTF_8);
+            writer.write(content);
+            writer.close();
+        }
+    }
+
     public static void deleteOrWarn(Path file) {
         try {
             FileSystem fs = file.getFileSystem();
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestKeyValueGenerator.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestKeyValueGenerator.java
index 6a39165..3f6ad2f 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestKeyValueGenerator.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestKeyValueGenerator.java
@@ -78,7 +78,7 @@ public class TestKeyValueGenerator {
     private static final RowDataSerializer PARTITION_SERIALIZER =
             new RowDataSerializer(PARTITION_TYPE);
     public static final RowDataSerializer KEY_SERIALIZER = new 
RowDataSerializer(KEY_TYPE);
-    private static final RecordComparator KEY_COMPARATOR =
+    public static final RecordComparator KEY_COMPARATOR =
             (a, b) -> {
                 int firstResult = a.getInt(0) - b.getInt(0);
                 if (firstResult != 0) {
@@ -169,10 +169,6 @@ public class TestKeyValueGenerator {
                 });
     }
 
-    public int compareKeys(BinaryRowData a, BinaryRowData b) {
-        return KEY_COMPARATOR.compare(a, b);
-    }
-
     private Order pick(List<Order> list) {
         int idx = random.nextInt(list.size());
         Order tmp = list.get(idx);
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
new file mode 100644
index 0000000..027dda5
--- /dev/null
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.manifest;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.writer.BinaryRowWriter;
+import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link ManifestFileMeta}. */
+public class ManifestFileMetaTest {
+
+    private static final RowType PARTITION_TYPE = RowType.of(new IntType());
+    private static final RowType KEY_TYPE = RowType.of(new IntType());
+    private static final RowType ROW_TYPE = RowType.of(new BigIntType());
+
+    private final FileFormat avro;
+
+    @TempDir java.nio.file.Path tempDir;
+    private ManifestFile manifestFile;
+
+    public ManifestFileMetaTest() {
+        this.avro =
+                FileFormat.fromIdentifier(
+                        ManifestFileMetaTest.class.getClassLoader(), "avro", 
new Configuration());
+    }
+
+    @BeforeEach
+    public void beforeEach() {
+        manifestFile = createManifestFile(tempDir.toString());
+    }
+
+    @Test
+    public void testMerge() {
+        List<ManifestFileMeta> input = new ArrayList<>();
+        List<ManifestFileMeta> expected = new ArrayList<>();
+        createData(input, expected);
+
+        List<ManifestFileMeta> actual = ManifestFileMeta.merge(input, 
manifestFile, 500);
+        assertThat(actual).hasSameSizeAs(expected);
+
+        // these three manifest files are merged from the input
+        assertSameContent(expected.get(0), actual.get(0), manifestFile);
+        assertSameContent(expected.get(1), actual.get(1), manifestFile);
+        assertSameContent(expected.get(4), actual.get(4), manifestFile);
+
+        // these two manifest files should be kept without modification
+        assertThat(actual.get(2)).isEqualTo(input.get(5));
+        assertThat(actual.get(3)).isEqualTo(input.get(6));
+    }
+
+    private void assertSameContent(
+            ManifestFileMeta expected, ManifestFileMeta actual, ManifestFile 
manifestFile) {
+        // check meta
+        assertThat(actual.numAddedFiles()).isEqualTo(expected.numAddedFiles());
+        
assertThat(actual.numDeletedFiles()).isEqualTo(expected.numDeletedFiles());
+        
assertThat(actual.partitionStats()).isEqualTo(expected.partitionStats());
+
+        // check content
+        assertThat(manifestFile.read(actual.fileName()))
+                .isEqualTo(manifestFile.read(expected.fileName()));
+    }
+
+    @RepeatedTest(10)
+    public void testCleanUpForException() throws IOException {
+        FailingAtomicRenameFileSystem.resetFailCounter(1);
+        FailingAtomicRenameFileSystem.setFailPossibility(10);
+
+        List<ManifestFileMeta> input = new ArrayList<>();
+        createData(input, null);
+        ManifestFile failingManifestFile =
+                createManifestFile(
+                        FailingAtomicRenameFileSystem.SCHEME + "://" + 
tempDir.toString());
+
+        try {
+            ManifestFileMeta.merge(input, failingManifestFile, 500);
+        } catch (Throwable e) {
+            assertThat(e)
+                    .hasRootCauseExactlyInstanceOf(
+                            
FailingAtomicRenameFileSystem.ArtificialException.class);
+            // old files should be kept untouched, while new files should be 
cleaned up
+            Path manifestDir = new Path(tempDir.toString() + "/manifest");
+            FileSystem fs = manifestDir.getFileSystem();
+            assertThat(
+                            new TreeSet<>(
+                                    Arrays.stream(fs.listStatus(manifestDir))
+                                            .map(s -> s.getPath().getName())
+                                            .collect(Collectors.toList())))
+                    .isEqualTo(
+                            new TreeSet<>(
+                                    input.stream()
+                                            .map(ManifestFileMeta::fileName)
+                                            .collect(Collectors.toList())));
+        }
+    }
+
+    private ManifestFile createManifestFile(String path) {
+        return new ManifestFile(
+                PARTITION_TYPE,
+                KEY_TYPE,
+                ROW_TYPE,
+                avro,
+                new FileStorePathFactory(new Path(path), PARTITION_TYPE, 
"default"));
+    }
+
+    private void createData(List<ManifestFileMeta> input, 
List<ManifestFileMeta> expected) {
+        // suggested size 500
+        // file sizes:
+        // 200, 300, -- multiple files exactly the suggested size
+        // 100, 200, 300, -- multiple files exceeding the suggested size
+        // 500, -- single file exactly the suggested size
+        // 600, -- single file exceeding the suggested size
+        // 100, 200 -- not enough sizes, but the last bit
+
+        input.add(makeManifest(makeEntry(true, "A"), makeEntry(true, "B")));
+        input.add(makeManifest(makeEntry(true, "C"), makeEntry(false, "B"), 
makeEntry(true, "D")));
+
+        input.add(makeManifest(makeEntry(false, "A")));
+        input.add(makeManifest(makeEntry(true, "E"), makeEntry(true, "F")));
+        input.add(makeManifest(makeEntry(true, "G"), makeEntry(false, "E"), 
makeEntry(false, "G")));
+
+        input.add(
+                makeManifest(
+                        makeEntry(false, "C"),
+                        makeEntry(false, "F"),
+                        makeEntry(true, "H"),
+                        makeEntry(true, "I"),
+                        makeEntry(false, "H")));
+
+        input.add(
+                makeManifest(
+                        makeEntry(false, "I"),
+                        makeEntry(true, "J"),
+                        makeEntry(true, "K"),
+                        makeEntry(false, "J"),
+                        makeEntry(false, "K"),
+                        makeEntry(true, "L")));
+
+        input.add(makeManifest(makeEntry(true, "M")));
+        input.add(makeManifest(makeEntry(false, "M"), makeEntry(true, "N")));
+
+        if (expected == null) {
+            return;
+        }
+
+        expected.add(
+                makeManifest(makeEntry(true, "A"), makeEntry(true, "C"), 
makeEntry(true, "D")));
+        expected.add(makeManifest(makeEntry(false, "A"), makeEntry(true, 
"F")));
+        expected.add(input.get(5));
+        expected.add(input.get(6));
+        expected.add(makeManifest(makeEntry(true, "N")));
+    }
+
+    private ManifestFileMeta makeManifest(ManifestEntry... entries) {
+        ManifestFileMeta writtenMeta = 
manifestFile.write(Arrays.asList(entries));
+        return new ManifestFileMeta(
+                writtenMeta.fileName(),
+                entries.length * 100, // for testing purpose
+                writtenMeta.numAddedFiles(),
+                writtenMeta.numDeletedFiles(),
+                writtenMeta.partitionStats());
+    }
+
+    private ManifestEntry makeEntry(boolean isAdd, String fileName) {
+        BinaryRowData binaryRowData = new BinaryRowData(1);
+        BinaryRowWriter writer = new BinaryRowWriter(binaryRowData);
+        writer.writeInt(0, 0);
+        writer.complete();
+
+        return new ManifestEntry(
+                isAdd ? ValueKind.ADD : ValueKind.DELETE,
+                binaryRowData, // not used
+                0, // not used
+                0, // not used
+                new SstFileMeta(
+                        fileName,
+                        0, // not used
+                        0, // not used
+                        binaryRowData, // not used
+                        binaryRowData, // not used
+                        new FieldStats[] {new FieldStats(null, null, 0)}, // 
not used
+                        0, // not used
+                        0, // not used
+                        0 // not used
+                        ));
+    }
+}
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
index b9f768a..ee09d9a 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
@@ -46,7 +46,7 @@ public class ManifestFileTest {
     @TempDir java.nio.file.Path tempDir;
 
     @RepeatedTest(10)
-    public void testWriteAndReadManifestFile() throws IOException {
+    public void testWriteAndReadManifestFile() {
         List<ManifestEntry> entries = generateData();
         ManifestFileMeta meta = gen.createManifestFileMeta(entries);
         ManifestFile manifestFile = createManifestFile(tempDir.toString());
@@ -71,7 +71,8 @@ public class ManifestFileTest {
             manifestFile.write(entries);
         } catch (Throwable e) {
             assertThat(e)
-                    
.isExactlyInstanceOf(FailingAtomicRenameFileSystem.ArtificialException.class);
+                    .hasRootCauseExactlyInstanceOf(
+                            
FailingAtomicRenameFileSystem.ArtificialException.class);
             Path manifestDir = new Path(tempDir.toString() + "/manifest");
             FileSystem fs = manifestDir.getFileSystem();
             assertThat(fs.listStatus(manifestDir)).isEmpty();
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
index c9117a1..e9df1c1 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
@@ -47,7 +47,7 @@ public class ManifestListTest {
     @TempDir java.nio.file.Path tempDir;
 
     @RepeatedTest(10)
-    public void testWriteAndReadManifestList() throws IOException {
+    public void testWriteAndReadManifestList() {
         List<ManifestFileMeta> metas = generateData();
         ManifestList manifestList = createManifestList(tempDir.toString());
 
@@ -69,7 +69,8 @@ public class ManifestListTest {
             manifestList.write(metas);
         } catch (Throwable e) {
             assertThat(e)
-                    
.isExactlyInstanceOf(FailingAtomicRenameFileSystem.ArtificialException.class);
+                    .hasRootCauseExactlyInstanceOf(
+                            
FailingAtomicRenameFileSystem.ArtificialException.class);
             Path manifestDir = new Path(tempDir.toString() + "/manifest");
             FileSystem fs = manifestDir.getFileSystem();
             assertThat(fs.listStatus(manifestDir)).isEmpty();
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstTestDataGenerator.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstTestDataGenerator.java
index f1ef606..bb39d7d 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstTestDataGenerator.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstTestDataGenerator.java
@@ -109,10 +109,10 @@ public class SstTestDataGenerator {
             BinaryRowData value = (BinaryRowData) kv.value();
             totalSize += key.getSizeInBytes() + value.getSizeInBytes();
             collector.collect(value);
-            if (minKey == null || gen.compareKeys(key, minKey) < 0) {
+            if (minKey == null || 
TestKeyValueGenerator.KEY_COMPARATOR.compare(key, minKey) < 0) {
                 minKey = key;
             }
-            if (maxKey == null || gen.compareKeys(key, maxKey) > 0) {
+            if (maxKey == null || 
TestKeyValueGenerator.KEY_COMPARATOR.compare(key, maxKey) > 0) {
                 maxKey = key;
             }
             minSequenceNumber = Math.min(minSequenceNumber, 
kv.sequenceNumber());
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTestBase.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTestBase.java
new file mode 100644
index 0000000..0c8f678
--- /dev/null
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTestBase.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.TestKeyValueGenerator;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.manifest.ManifestFile;
+import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.mergetree.sst.SstFile;
+import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link FileStoreCommitImpl}. */
+public abstract class FileStoreCommitTestBase {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FileStoreCommitTestBase.class);
+
+    private final FileFormat avro =
+            FileFormat.fromIdentifier(
+                    FileStoreCommitTestBase.class.getClassLoader(), "avro", 
new Configuration());
+
+    private TestKeyValueGenerator gen;
+    @TempDir java.nio.file.Path tempDir;
+
+    @BeforeEach
+    public void beforeEach() throws IOException {
+        gen = new TestKeyValueGenerator();
+        Path root = new Path(tempDir.toString());
+        root.getFileSystem().mkdirs(new Path(root + "/snapshot"));
+    }
+
+    protected abstract String getSchema();
+
+    @RepeatedTest(10)
+    public void testSingleCommitUser() throws Exception {
+        testRandomConcurrentNoConflict(1);
+    }
+
+    @RepeatedTest(10)
+    public void testManyCommitUsersNoConflict() throws Exception {
+        testRandomConcurrentNoConflict(ThreadLocalRandom.current().nextInt(3) 
+ 2);
+    }
+
+    protected void testRandomConcurrentNoConflict(int numThreads) throws 
Exception {
+        // prepare test data
+        Map<BinaryRowData, List<KeyValue>> data =
+                generateData(ThreadLocalRandom.current().nextInt(1000) + 1);
+        logData(
+                () ->
+                        data.values().stream()
+                                .flatMap(Collection::stream)
+                                .collect(Collectors.toList()),
+                "input");
+        Map<BinaryRowData, BinaryRowData> expected =
+                toKvMap(
+                        data.values().stream()
+                                .flatMap(Collection::stream)
+                                .collect(Collectors.toList()));
+
+        List<Map<BinaryRowData, List<KeyValue>>> dataPerThread = new 
ArrayList<>();
+        for (int i = 0; i < numThreads; i++) {
+            dataPerThread.add(new HashMap<>());
+        }
+        for (Map.Entry<BinaryRowData, List<KeyValue>> entry : data.entrySet()) 
{
+            dataPerThread
+                    .get(ThreadLocalRandom.current().nextInt(numThreads))
+                    .put(entry.getKey(), entry.getValue());
+        }
+
+        // concurrent commits
+        List<TestCommitThread> threads = new ArrayList<>();
+        for (int i = 0; i < numThreads; i++) {
+            TestCommitThread thread =
+                    new TestCommitThread(
+                            dataPerThread.get(i), createTestPathFactory(), 
createSafePathFactory());
+            thread.start();
+            threads.add(thread);
+        }
+        for (TestCommitThread thread : threads) {
+            thread.join();
+        }
+
+        // read actual data and compare
+        Map<BinaryRowData, BinaryRowData> actual = 
toKvMap(readKvsFromLatestSnapshot());
+        logData(() -> kvMapToKvList(expected), "expected");
+        logData(() -> kvMapToKvList(actual), "actual");
+        assertThat(actual).isEqualTo(expected);
+    }
+
+    private Map<BinaryRowData, List<KeyValue>> generateData(int numRecords) {
+        Map<BinaryRowData, List<KeyValue>> data = new HashMap<>();
+        for (int i = 0; i < numRecords; i++) {
+            KeyValue kv = gen.next();
+            data.compute(gen.getPartition(kv), (p, l) -> l == null ? new 
ArrayList<>() : l).add(kv);
+        }
+        return data;
+    }
+
+    private List<KeyValue> readKvsFromLatestSnapshot() throws IOException {
+        FileStorePathFactory pathFactory = createSafePathFactory();
+        Long latestSnapshotId = pathFactory.latestSnapshotId();
+        assertThat(latestSnapshotId).isNotNull();
+
+        ManifestFile manifestFile =
+                new ManifestFile(
+                        TestKeyValueGenerator.PARTITION_TYPE,
+                        TestKeyValueGenerator.KEY_TYPE,
+                        TestKeyValueGenerator.ROW_TYPE,
+                        avro,
+                        pathFactory);
+        ManifestList manifestList =
+                new ManifestList(TestKeyValueGenerator.PARTITION_TYPE, avro, 
pathFactory);
+
+        List<KeyValue> kvs = new ArrayList<>();
+        List<ManifestEntry> entries =
+                new FileStoreScanImpl(pathFactory, manifestFile, manifestList)
+                        .withSnapshot(latestSnapshotId)
+                        .plan()
+                        .files();
+        for (ManifestEntry entry : entries) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Reading actual key-values from file " + 
entry.file().fileName());
+            }
+            SstFile sstFile =
+                    new SstFile(
+                            TestKeyValueGenerator.KEY_TYPE,
+                            TestKeyValueGenerator.ROW_TYPE,
+                            avro,
+                            
pathFactory.createSstPathFactory(entry.partition(), 0),
+                            1024 * 1024 // not used
+                            );
+            RecordReaderIterator iterator =
+                    new 
RecordReaderIterator(sstFile.read(entry.file().fileName()));
+            while (iterator.hasNext()) {
+                kvs.add(
+                        iterator.next()
+                                .copy(
+                                        TestKeyValueGenerator.KEY_SERIALIZER,
+                                        TestKeyValueGenerator.ROW_SERIALIZER));
+            }
+        }
+
+        gen.sort(kvs);
+        logData(() -> kvs, "raw read results");
+        return kvs;
+    }
+
+    private Map<BinaryRowData, BinaryRowData> toKvMap(List<KeyValue> kvs) {
+        Map<BinaryRowData, BinaryRowData> result = new HashMap<>();
+        for (KeyValue kv : kvs) {
+            BinaryRowData key = 
TestKeyValueGenerator.KEY_SERIALIZER.toBinaryRow(kv.key()).copy();
+            BinaryRowData value =
+                    
TestKeyValueGenerator.ROW_SERIALIZER.toBinaryRow(kv.value()).copy();
+            switch (kv.valueKind()) {
+                case ADD:
+                    result.put(key, value);
+                    break;
+                case DELETE:
+                    result.remove(key);
+                    break;
+                default:
+                    throw new UnsupportedOperationException(
+                            "Unknown value kind " + kv.valueKind().name());
+            }
+        }
+        return result;
+    }
+
+    private FileStorePathFactory createTestPathFactory() {
+        return new FileStorePathFactory(
+                new Path(getSchema() + "://" + tempDir.toString()),
+                TestKeyValueGenerator.PARTITION_TYPE,
+                "default");
+    }
+
+    private FileStorePathFactory createSafePathFactory() {
+        return new FileStorePathFactory(
+                new Path(TestAtomicRenameFileSystem.SCHEME + "://" + 
tempDir.toString()),
+                TestKeyValueGenerator.PARTITION_TYPE,
+                "default");
+    }
+
+    private List<KeyValue> kvMapToKvList(Map<BinaryRowData, BinaryRowData> 
map) {
+        return map.entrySet().stream()
+                .map(e -> new KeyValue().replace(e.getKey(), -1, 
ValueKind.ADD, e.getValue()))
+                .collect(Collectors.toList());
+    }
+
+    private void logData(Supplier<List<KeyValue>> supplier, String name) {
+        if (!LOG.isDebugEnabled()) {
+            return;
+        }
+
+        LOG.debug("========== Beginning of " + name + " ==========");
+        for (KeyValue kv : supplier.get()) {
+            LOG.debug(kv.toString(TestKeyValueGenerator.KEY_TYPE, 
TestKeyValueGenerator.ROW_TYPE));
+        }
+        LOG.debug("========== End of " + name + " ==========");
+    }
+
+    /** Tests for {@link FileStoreCommitImpl} with {@link 
TestAtomicRenameFileSystem}. */
+    public static class WithTestAtomicRenameFileSystem extends 
FileStoreCommitTestBase {
+
+        @Override
+        protected String getSchema() {
+            return TestAtomicRenameFileSystem.SCHEME;
+        }
+    }
+
+    /** Tests for {@link FileStoreCommitImpl} with {@link 
FailingAtomicRenameFileSystem}. */
+    public static class WithFailingAtomicRenameFileSystem extends 
FileStoreCommitTestBase {
+
+        @BeforeEach
+        @Override
+        public void beforeEach() throws IOException {
+            super.beforeEach();
+            FailingAtomicRenameFileSystem.resetFailCounter(100);
+            FailingAtomicRenameFileSystem.setFailPossibility(5000);
+        }
+
+        @Override
+        protected String getSchema() {
+            return FailingAtomicRenameFileSystem.SCHEME;
+        }
+    }
+}
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
new file mode 100644
index 0000000..f7f4c86
--- /dev/null
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.TestKeyValueGenerator;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+import 
org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.manifest.ManifestFile;
+import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.mergetree.MergeTree;
+import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
+import org.apache.flink.table.store.file.mergetree.MergeTreeWriter;
+import 
org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
+import org.apache.flink.table.store.file.mergetree.sst.SstFile;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+
+/** Testing {@link Thread}s to perform concurrent commits. */
+public class TestCommitThread extends Thread {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(TestCommitThread.class);
+
+    private static final MergeTreeOptions MERGE_TREE_OPTIONS;
+    private static final long SUGGESTED_SST_FILE_SIZE = 1024;
+
+    static {
+        Configuration mergeTreeConf = new Configuration();
+        mergeTreeConf.set(MergeTreeOptions.WRITE_BUFFER_SIZE, 
MemorySize.parse("16 kb"));
+        mergeTreeConf.set(MergeTreeOptions.PAGE_SIZE, MemorySize.parse("4 
kb"));
+        MERGE_TREE_OPTIONS = new MergeTreeOptions(mergeTreeConf);
+    }
+
+    private final FileFormat avro =
+            FileFormat.fromIdentifier(
+                    FileStoreCommitTestBase.class.getClassLoader(), "avro", 
new Configuration());
+
+    private final Map<BinaryRowData, List<KeyValue>> data;
+    private final FileStorePathFactory safePathFactory;
+
+    private final Map<BinaryRowData, MergeTreeWriter> writers;
+
+    private final FileStoreScan scan;
+    private final FileStoreCommit commit;
+
+    public TestCommitThread(
+            Map<BinaryRowData, List<KeyValue>> data,
+            FileStorePathFactory testPathFactory,
+            FileStorePathFactory safePathFactory) {
+        this.data = data;
+        this.safePathFactory = safePathFactory;
+
+        this.writers = new HashMap<>();
+
+        this.scan =
+                new FileStoreScanImpl(
+                        safePathFactory,
+                        createManifestFile(safePathFactory),
+                        createManifestList(safePathFactory));
+
+        ManifestCommittableSerializer serializer =
+                new ManifestCommittableSerializer(
+                        TestKeyValueGenerator.PARTITION_TYPE,
+                        TestKeyValueGenerator.KEY_TYPE,
+                        TestKeyValueGenerator.ROW_TYPE);
+        ManifestFile testManifestFile = createManifestFile(testPathFactory);
+        ManifestList testManifestList = createManifestList(testPathFactory);
+        Configuration fileStoreConf = new Configuration();
+        fileStoreConf.set(FileStoreOptions.BUCKET, 1);
+        fileStoreConf.set(
+                FileStoreOptions.MANIFEST_TARGET_FILE_SIZE,
+                MemorySize.parse((ThreadLocalRandom.current().nextInt(16) + 1) 
+ "kb"));
+        FileStoreOptions fileStoreOptions = new 
FileStoreOptions(fileStoreConf);
+        FileStoreScanImpl testScan =
+                new FileStoreScanImpl(testPathFactory, testManifestFile, 
testManifestList);
+        this.commit =
+                new FileStoreCommitImpl(
+                        UUID.randomUUID().toString(),
+                        serializer,
+                        testPathFactory,
+                        testManifestFile,
+                        testManifestList,
+                        fileStoreOptions,
+                        testScan);
+    }
+
+    private ManifestFile createManifestFile(FileStorePathFactory pathFactory) {
+        return new ManifestFile(
+                TestKeyValueGenerator.PARTITION_TYPE,
+                TestKeyValueGenerator.KEY_TYPE,
+                TestKeyValueGenerator.ROW_TYPE,
+                avro,
+                pathFactory);
+    }
+
+    private ManifestList createManifestList(FileStorePathFactory pathFactory) {
+        return new ManifestList(TestKeyValueGenerator.PARTITION_TYPE, avro, 
pathFactory);
+    }
+
+    @Override
+    public void run() {
+        while (!data.isEmpty()) {
+            ManifestCommittable committable;
+            try {
+                committable = createCommittable();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+
+            boolean shouldCheckFilter = false;
+            while (true) {
+                try {
+                    if (shouldCheckFilter) {
+                        if 
(commit.filterCommitted(Collections.singletonList(committable))
+                                .isEmpty()) {
+                            break;
+                        }
+                    }
+                    commit.commit(committable, Collections.emptyMap());
+                    break;
+                } catch (Throwable e) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.warn(
+                                "["
+                                        + Thread.currentThread().getName()
+                                        + "] Failed to commit because of 
exception, try again",
+                                e);
+                    }
+                    writers.clear();
+                    shouldCheckFilter = true;
+                }
+            }
+        }
+
+        for (MergeTreeWriter writer : writers.values()) {
+            try {
+                writer.sync();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+            writer.close();
+        }
+    }
+
+    private ManifestCommittable createCommittable() throws Exception {
+        int numWrites = ThreadLocalRandom.current().nextInt(3) + 1;
+        for (int i = 0; i < numWrites && !data.isEmpty(); i++) {
+            writeData();
+        }
+
+        ManifestCommittable committable = new ManifestCommittable();
+        for (Map.Entry<BinaryRowData, MergeTreeWriter> entry : 
writers.entrySet()) {
+            committable.add(entry.getKey(), 0, 
entry.getValue().prepareCommit());
+        }
+        return committable;
+    }
+
+    private void writeData() throws Exception {
+        List<KeyValue> changes = new ArrayList<>();
+        BinaryRowData partition = pickData(changes);
+        MergeTreeWriter writer =
+                writers.compute(partition, (k, v) -> v == null ? 
createWriter(k) : v);
+        for (KeyValue kv : changes) {
+            writer.write(kv.valueKind(), kv.key(), kv.value());
+        }
+    }
+
+    private BinaryRowData pickData(List<KeyValue> changes) {
+        List<BinaryRowData> keys = new ArrayList<>(data.keySet());
+        BinaryRowData partition = 
keys.get(ThreadLocalRandom.current().nextInt(keys.size()));
+        List<KeyValue> remaining = data.get(partition);
+        int numChanges = ThreadLocalRandom.current().nextInt(Math.min(100, 
remaining.size() + 1));
+        changes.addAll(remaining.subList(0, numChanges));
+        if (numChanges == remaining.size()) {
+            data.remove(partition);
+        } else {
+            remaining.subList(0, numChanges).clear();
+        }
+        return partition;
+    }
+
+    private MergeTreeWriter createWriter(BinaryRowData partition) {
+        SstFile sstFile =
+                new SstFile(
+                        TestKeyValueGenerator.KEY_TYPE,
+                        TestKeyValueGenerator.ROW_TYPE,
+                        avro,
+                        safePathFactory.createSstPathFactory(partition, 0),
+                        SUGGESTED_SST_FILE_SIZE);
+        ExecutorService service =
+                Executors.newSingleThreadExecutor(
+                        r -> {
+                            Thread t = new Thread(r);
+                            t.setName(Thread.currentThread().getName() + 
"-writer-service-pool");
+                            return t;
+                        });
+        MergeTree mergeTree =
+                new MergeTree(
+                        MERGE_TREE_OPTIONS,
+                        sstFile,
+                        TestKeyValueGenerator.KEY_COMPARATOR,
+                        service,
+                        new DeduplicateAccumulator());
+        Long latestSnapshotId = safePathFactory.latestSnapshotId();
+        if (latestSnapshotId == null) {
+            return (MergeTreeWriter) 
mergeTree.createWriter(Collections.emptyList());
+        } else {
+            return (MergeTreeWriter)
+                    mergeTree.createWriter(
+                            
scan.withSnapshot(latestSnapshotId).plan().files().stream()
+                                    .filter(e -> 
partition.equals(e.partition()))
+                                    .map(ManifestEntry::file)
+                                    .collect(Collectors.toList()));
+        }
+    }
+}

Reply via email to