JingsongLi commented on a change in pull request #12:
URL: https://github.com/apache/flink-table-store/pull/12#discussion_r787408231



##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.ReadableConfig;
+
+/** Options for {@link FileStore}. */
+public class FileStoreOptions {
+
+    public static final ConfigOption<Integer> BUCKET =
+            ConfigOptions.key("bucket")
+                    .intType()
+                    .defaultValue(1)
+                    .withDescription(
+                            "Bucket number for file store and partition number 
for Kafka.");

Review comment:
       remove `and partition number for Kafka`, log system is abstraction.

##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
##########
@@ -118,4 +121,94 @@ public String toString() {
                 numDeletedFiles,
                 Arrays.toString(partitionStats));
     }
+
+    /**
+     * Merge several {@link ManifestFileMeta}s. {@link ManifestEntry}s 
representing first adding and
+     * then deleting the same sst file will cancel each other.
+     *
+     * <p>NOTE: This method is atomic.
+     */
+    public static List<ManifestFileMeta> merge(
+            List<ManifestFileMeta> metas, ManifestFile manifestFile, long 
suggestedMetaSize) {
+        List<ManifestFileMeta> result = new ArrayList<>();
+        // these are the newly created manifest files, clean them up if 
exception occurs
+        List<ManifestFileMeta> newMetas = new ArrayList<>();
+        List<ManifestFileMeta> candidate = new ArrayList<>();
+        long totalSize = 0;
+
+        try {
+            for (ManifestFileMeta manifest : metas) {
+                totalSize += manifest.fileSize;
+                candidate.add(manifest);
+                if (totalSize >= suggestedMetaSize) {
+                    // reach suggested file size, perform merging and produce 
new file
+                    merge(candidate, manifestFile, result, newMetas);
+                    candidate.clear();
+                    totalSize = 0;
+                }
+            }
+            if (!candidate.isEmpty()) {
+                // merge the last bit of metas
+                merge(candidate, manifestFile, result, newMetas);
+            }
+        } catch (Throwable e) {
+            // exception occurs, clean up and rethrow
+            for (ManifestFileMeta manifest : newMetas) {
+                manifestFile.delete(manifest.fileName);
+            }
+            throw e;
+        }
+
+        return result;
+    }
+
+    private static void merge(

Review comment:
       Inline this method? So simple and too many arguements

##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
##########
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+import 
org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.manifest.ManifestFile;
+import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
+import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.FileUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * Default implementation of {@link FileStoreCommit}.
+ *
+ * <p>This class provides an atomic commit method to the user.
+ *
+ * <ol>
+ *   <li>Before calling {@link FileStoreCommitImpl#commit}, user should first 
call {@link
+ *       FileStoreCommitImpl#filterCommitted} to make sure this commit is not 
done before.
+ *   <li>Before committing, it will first check for conflicts by checking if 
all files to be removed
+ *       currently exists.
+ *   <li>After that it use the external {@link FileStoreCommitImpl#lock} (if 
provided) or the atomic
+ *       rename of the file system to ensure atomicity.
+ *   <li>If commit fails due to conflicts or exception it tries its best to 
clean up and aborts.
+ *   <li>If atomic rename fails it tries again after reading the latest 
snapshot from step 2.
+ * </ol>
+ */
+public class FileStoreCommitImpl implements FileStoreCommit {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FileStoreCommitImpl.class);
+
+    private final String committer;
+    private final ManifestCommittableSerializer committableSerializer;
+
+    private final FileStorePathFactory pathFactory;
+    private final ManifestFile manifestFile;
+    private final ManifestList manifestList;
+    private final FileStoreOptions fileStoreOptions;
+    private final FileStoreScan scan;
+
+    @Nullable private Lock lock;
+
+    public FileStoreCommitImpl(
+            String committer,
+            ManifestCommittableSerializer committableSerializer,
+            FileStorePathFactory pathFactory,
+            ManifestFile manifestFile,
+            ManifestList manifestList,
+            FileStoreOptions fileStoreOptions,
+            FileStoreScan scan) {
+        this.committer = committer;
+        this.committableSerializer = committableSerializer;
+
+        this.pathFactory = pathFactory;
+        this.manifestFile = manifestFile;
+        this.manifestList = manifestList;
+        this.fileStoreOptions = fileStoreOptions;
+        this.scan = scan;
+
+        this.lock = null;
+    }
+
+    @Override
+    public FileStoreCommit withLock(Lock lock) {
+        this.lock = lock;
+        return this;
+    }
+
+    @Override
+    public List<ManifestCommittable> filterCommitted(List<ManifestCommittable> 
committableList) {
+        committableList = new ArrayList<>(committableList);
+
+        // filter out commits with no new files
+        committableList.removeIf(committable -> 
committable.newFiles().isEmpty());
+
+        // if there is no previous snapshots then nothing should be filtered
+        Long latestSnapshotId = pathFactory.latestSnapshotId();
+        if (latestSnapshotId == null) {
+            return committableList;
+        }
+
+        // check if a committable is already committed by its hash
+        Map<String, ManifestCommittable> hashes = new LinkedHashMap<>();
+        for (ManifestCommittable committable : committableList) {
+            hashes.put(digestManifestCommittable(committable), committable);
+        }
+
+        for (long id = latestSnapshotId; id >= Snapshot.FIRST_SNAPSHOT_ID; 
id--) {
+            Path snapshotPath = pathFactory.toSnapshotPath(id);
+            Snapshot snapshot = Snapshot.fromPath(snapshotPath);
+            if (committer.equals(snapshot.committer())) {
+                if (hashes.containsKey(snapshot.hash())) {
+                    hashes.remove(snapshot.hash());
+                } else {
+                    // early exit, because committableList must be the latest 
commits by this
+                    // committer
+                    break;
+                }
+            }
+        }
+
+        return new ArrayList<>(hashes.values());
+    }
+
+    @Override
+    public void commit(ManifestCommittable committable, Map<String, String> 
properties) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Ready to commit\n" + committable.toString());
+        }
+
+        String hash = digestManifestCommittable(committable);
+
+        List<ManifestEntry> appendChanges = 
collectChanges(committable.newFiles(), ValueKind.ADD);
+        if (!appendChanges.isEmpty()) {
+            tryCommit(appendChanges, hash, Snapshot.Type.APPEND);
+        }
+
+        List<ManifestEntry> compactChanges = new ArrayList<>();
+        compactChanges.addAll(collectChanges(committable.compactBefore(), 
ValueKind.DELETE));
+        compactChanges.addAll(collectChanges(committable.compactAfter(), 
ValueKind.ADD));
+        if (!compactChanges.isEmpty()) {
+            tryCommit(compactChanges, hash, Snapshot.Type.COMPACT);
+        }
+    }
+
+    @Override
+    public void overwrite(
+            Map<String, String> partition,
+            ManifestCommittable committable,
+            Map<String, String> properties) {
+        throw new UnsupportedOperationException();
+    }
+
+    private String digestManifestCommittable(ManifestCommittable committable) {
+        try {
+            return new String(
+                    Base64.getEncoder()
+                            .encode(
+                                    MessageDigest.getInstance("MD5")
+                                            
.digest(committableSerializer.serialize(committable))));
+        } catch (NoSuchAlgorithmException e) {
+            throw new RuntimeException("MD5 algorithm not found. This is 
impossible.", e);
+        } catch (IOException e) {
+            throw new RuntimeException(
+                    "Failed to serialize ManifestCommittable. This is 
unexpected.", e);
+        }
+    }
+
+    private List<ManifestEntry> collectChanges(
+            Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> map, ValueKind 
kind) {
+        List<ManifestEntry> changes = new ArrayList<>();
+        for (Map.Entry<BinaryRowData, Map<Integer, List<SstFileMeta>>> 
entryWithPartition :
+                map.entrySet()) {
+            for (Map.Entry<Integer, List<SstFileMeta>> entryWithBucket :
+                    entryWithPartition.getValue().entrySet()) {
+                changes.addAll(
+                        entryWithBucket.getValue().stream()
+                                .map(
+                                        file ->
+                                                new ManifestEntry(
+                                                        kind,
+                                                        
entryWithPartition.getKey(),
+                                                        
entryWithBucket.getKey(),
+                                                        
fileStoreOptions.bucket,
+                                                        file))
+                                .collect(Collectors.toList()));
+            }
+        }
+        return changes;
+    }
+
+    private void tryCommit(List<ManifestEntry> changes, String hash, 
Snapshot.Type type) {
+        while (true) {
+            Long latestSnapshotId = pathFactory.latestSnapshotId();
+            long newSnapshotId =
+                    latestSnapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID : 
latestSnapshotId + 1;
+            Path newSnapshotPath = pathFactory.toSnapshotPath(newSnapshotId);
+            Path tmpSnapshotPath =

Review comment:
       Add a `toTmpSnapshotPath` to pathFactory?

##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.utils.FileUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+
+/** This file is the entrance to all data committed at some specific time 
point. */
+public class Snapshot {
+
+    public static final long FIRST_SNAPSHOT_ID = 1;
+
+    private static final String FIELD_ID = "id";
+    private static final String FIELD_MANIFEST_LIST = "manifestList";
+    private static final String FIELD_COMMITTER = "committer";
+    private static final String FIELD_HASH = "hash";
+    private static final String FIELD_TYPE = "type";
+
+    @JsonProperty(FIELD_ID)
+    private final long id;
+
+    @JsonProperty(FIELD_MANIFEST_LIST)
+    private final String manifestList;
+
+    @JsonProperty(FIELD_COMMITTER)
+    private final String committer;
+
+    // for deduplication
+    @JsonProperty(FIELD_HASH)
+    private final String hash;
+
+    @JsonProperty(FIELD_TYPE)
+    private final Type type;

Review comment:
       `CommitKind`?

##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
##########
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+import 
org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.manifest.ManifestFile;
+import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
+import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.FileUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * Default implementation of {@link FileStoreCommit}.
+ *
+ * <p>This class provides an atomic commit method to the user.
+ *
+ * <ol>
+ *   <li>Before calling {@link FileStoreCommitImpl#commit}, user should first 
call {@link
+ *       FileStoreCommitImpl#filterCommitted} to make sure this commit is not 
done before.
+ *   <li>Before committing, it will first check for conflicts by checking if 
all files to be removed
+ *       currently exists.
+ *   <li>After that it use the external {@link FileStoreCommitImpl#lock} (if 
provided) or the atomic
+ *       rename of the file system to ensure atomicity.
+ *   <li>If commit fails due to conflicts or exception it tries its best to 
clean up and aborts.
+ *   <li>If atomic rename fails it tries again after reading the latest 
snapshot from step 2.
+ * </ol>
+ */
+public class FileStoreCommitImpl implements FileStoreCommit {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FileStoreCommitImpl.class);
+
+    private final String committer;
+    private final ManifestCommittableSerializer committableSerializer;
+
+    private final FileStorePathFactory pathFactory;
+    private final ManifestFile manifestFile;
+    private final ManifestList manifestList;
+    private final FileStoreOptions fileStoreOptions;
+    private final FileStoreScan scan;
+
+    @Nullable private Lock lock;
+
+    public FileStoreCommitImpl(
+            String committer,
+            ManifestCommittableSerializer committableSerializer,
+            FileStorePathFactory pathFactory,
+            ManifestFile manifestFile,
+            ManifestList manifestList,
+            FileStoreOptions fileStoreOptions,
+            FileStoreScan scan) {
+        this.committer = committer;
+        this.committableSerializer = committableSerializer;
+
+        this.pathFactory = pathFactory;
+        this.manifestFile = manifestFile;
+        this.manifestList = manifestList;
+        this.fileStoreOptions = fileStoreOptions;
+        this.scan = scan;
+
+        this.lock = null;
+    }
+
+    @Override
+    public FileStoreCommit withLock(Lock lock) {
+        this.lock = lock;
+        return this;
+    }
+
+    @Override
+    public List<ManifestCommittable> filterCommitted(List<ManifestCommittable> 
committableList) {
+        committableList = new ArrayList<>(committableList);
+
+        // filter out commits with no new files
+        committableList.removeIf(committable -> 
committable.newFiles().isEmpty());
+
+        // if there is no previous snapshots then nothing should be filtered
+        Long latestSnapshotId = pathFactory.latestSnapshotId();
+        if (latestSnapshotId == null) {
+            return committableList;
+        }
+
+        // check if a committable is already committed by its hash
+        Map<String, ManifestCommittable> hashes = new LinkedHashMap<>();
+        for (ManifestCommittable committable : committableList) {
+            hashes.put(digestManifestCommittable(committable), committable);
+        }
+
+        for (long id = latestSnapshotId; id >= Snapshot.FIRST_SNAPSHOT_ID; 
id--) {
+            Path snapshotPath = pathFactory.toSnapshotPath(id);
+            Snapshot snapshot = Snapshot.fromPath(snapshotPath);
+            if (committer.equals(snapshot.committer())) {
+                if (hashes.containsKey(snapshot.hash())) {
+                    hashes.remove(snapshot.hash());
+                } else {
+                    // early exit, because committableList must be the latest 
commits by this
+                    // committer
+                    break;
+                }
+            }
+        }
+
+        return new ArrayList<>(hashes.values());
+    }
+
+    @Override
+    public void commit(ManifestCommittable committable, Map<String, String> 
properties) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Ready to commit\n" + committable.toString());
+        }
+
+        String hash = digestManifestCommittable(committable);
+
+        List<ManifestEntry> appendChanges = 
collectChanges(committable.newFiles(), ValueKind.ADD);
+        if (!appendChanges.isEmpty()) {
+            tryCommit(appendChanges, hash, Snapshot.Type.APPEND);
+        }
+
+        List<ManifestEntry> compactChanges = new ArrayList<>();
+        compactChanges.addAll(collectChanges(committable.compactBefore(), 
ValueKind.DELETE));
+        compactChanges.addAll(collectChanges(committable.compactAfter(), 
ValueKind.ADD));
+        if (!compactChanges.isEmpty()) {
+            tryCommit(compactChanges, hash, Snapshot.Type.COMPACT);
+        }
+    }
+
+    @Override
+    public void overwrite(
+            Map<String, String> partition,
+            ManifestCommittable committable,
+            Map<String, String> properties) {
+        throw new UnsupportedOperationException();
+    }
+
+    private String digestManifestCommittable(ManifestCommittable committable) {
+        try {
+            return new String(
+                    Base64.getEncoder()
+                            .encode(
+                                    MessageDigest.getInstance("MD5")
+                                            
.digest(committableSerializer.serialize(committable))));
+        } catch (NoSuchAlgorithmException e) {
+            throw new RuntimeException("MD5 algorithm not found. This is 
impossible.", e);
+        } catch (IOException e) {
+            throw new RuntimeException(
+                    "Failed to serialize ManifestCommittable. This is 
unexpected.", e);
+        }
+    }
+
+    private List<ManifestEntry> collectChanges(
+            Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> map, ValueKind 
kind) {
+        List<ManifestEntry> changes = new ArrayList<>();
+        for (Map.Entry<BinaryRowData, Map<Integer, List<SstFileMeta>>> 
entryWithPartition :
+                map.entrySet()) {
+            for (Map.Entry<Integer, List<SstFileMeta>> entryWithBucket :
+                    entryWithPartition.getValue().entrySet()) {
+                changes.addAll(
+                        entryWithBucket.getValue().stream()
+                                .map(
+                                        file ->
+                                                new ManifestEntry(
+                                                        kind,
+                                                        
entryWithPartition.getKey(),
+                                                        
entryWithBucket.getKey(),
+                                                        
fileStoreOptions.bucket,
+                                                        file))
+                                .collect(Collectors.toList()));
+            }
+        }
+        return changes;
+    }
+
+    private void tryCommit(List<ManifestEntry> changes, String hash, 
Snapshot.Type type) {
+        while (true) {
+            Long latestSnapshotId = pathFactory.latestSnapshotId();
+            long newSnapshotId =
+                    latestSnapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID : 
latestSnapshotId + 1;
+            Path newSnapshotPath = pathFactory.toSnapshotPath(newSnapshotId);
+            Path tmpSnapshotPath =
+                    new Path(
+                            newSnapshotPath.getParent()
+                                    + "/."
+                                    + newSnapshotPath.getName()
+                                    + UUID.randomUUID());
+
+            Snapshot latestSnapshot = null;
+            if (latestSnapshotId != null) {
+                detectConflicts(latestSnapshotId, changes);

Review comment:
       Add comments here, fail if conflict

##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
##########
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+import 
org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.manifest.ManifestFile;
+import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
+import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.FileUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * Default implementation of {@link FileStoreCommit}.
+ *
+ * <p>This class provides an atomic commit method to the user.
+ *
+ * <ol>
+ *   <li>Before calling {@link FileStoreCommitImpl#commit}, user should first 
call {@link
+ *       FileStoreCommitImpl#filterCommitted} to make sure this commit is not 
done before.
+ *   <li>Before committing, it will first check for conflicts by checking if 
all files to be removed
+ *       currently exists.
+ *   <li>After that it use the external {@link FileStoreCommitImpl#lock} (if 
provided) or the atomic
+ *       rename of the file system to ensure atomicity.
+ *   <li>If commit fails due to conflicts or exception it tries its best to 
clean up and aborts.
+ *   <li>If atomic rename fails it tries again after reading the latest 
snapshot from step 2.
+ * </ol>
+ */
+public class FileStoreCommitImpl implements FileStoreCommit {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FileStoreCommitImpl.class);
+
+    private final String committer;
+    private final ManifestCommittableSerializer committableSerializer;
+
+    private final FileStorePathFactory pathFactory;
+    private final ManifestFile manifestFile;
+    private final ManifestList manifestList;
+    private final FileStoreOptions fileStoreOptions;
+    private final FileStoreScan scan;
+
+    @Nullable private Lock lock;
+
+    public FileStoreCommitImpl(
+            String committer,
+            ManifestCommittableSerializer committableSerializer,
+            FileStorePathFactory pathFactory,
+            ManifestFile manifestFile,
+            ManifestList manifestList,
+            FileStoreOptions fileStoreOptions,
+            FileStoreScan scan) {
+        this.committer = committer;
+        this.committableSerializer = committableSerializer;
+
+        this.pathFactory = pathFactory;
+        this.manifestFile = manifestFile;
+        this.manifestList = manifestList;
+        this.fileStoreOptions = fileStoreOptions;
+        this.scan = scan;
+
+        this.lock = null;
+    }
+
+    @Override
+    public FileStoreCommit withLock(Lock lock) {
+        this.lock = lock;
+        return this;
+    }
+
+    @Override
+    public List<ManifestCommittable> filterCommitted(List<ManifestCommittable> 
committableList) {
+        committableList = new ArrayList<>(committableList);
+
+        // filter out commits with no new files
+        committableList.removeIf(committable -> 
committable.newFiles().isEmpty());
+
+        // if there is no previous snapshots then nothing should be filtered
+        Long latestSnapshotId = pathFactory.latestSnapshotId();
+        if (latestSnapshotId == null) {
+            return committableList;
+        }
+
+        // check if a committable is already committed by its hash
+        Map<String, ManifestCommittable> hashes = new LinkedHashMap<>();
+        for (ManifestCommittable committable : committableList) {
+            hashes.put(digestManifestCommittable(committable), committable);
+        }
+
+        for (long id = latestSnapshotId; id >= Snapshot.FIRST_SNAPSHOT_ID; 
id--) {
+            Path snapshotPath = pathFactory.toSnapshotPath(id);
+            Snapshot snapshot = Snapshot.fromPath(snapshotPath);
+            if (committer.equals(snapshot.committer())) {
+                if (hashes.containsKey(snapshot.hash())) {
+                    hashes.remove(snapshot.hash());
+                } else {
+                    // early exit, because committableList must be the latest 
commits by this
+                    // committer
+                    break;
+                }
+            }
+        }
+
+        return new ArrayList<>(hashes.values());
+    }
+
+    @Override
+    public void commit(ManifestCommittable committable, Map<String, String> 
properties) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Ready to commit\n" + committable.toString());
+        }
+
+        String hash = digestManifestCommittable(committable);
+
+        List<ManifestEntry> appendChanges = 
collectChanges(committable.newFiles(), ValueKind.ADD);
+        if (!appendChanges.isEmpty()) {
+            tryCommit(appendChanges, hash, Snapshot.Type.APPEND);
+        }
+
+        List<ManifestEntry> compactChanges = new ArrayList<>();
+        compactChanges.addAll(collectChanges(committable.compactBefore(), 
ValueKind.DELETE));
+        compactChanges.addAll(collectChanges(committable.compactAfter(), 
ValueKind.ADD));
+        if (!compactChanges.isEmpty()) {
+            tryCommit(compactChanges, hash, Snapshot.Type.COMPACT);
+        }
+    }
+
+    @Override
+    public void overwrite(
+            Map<String, String> partition,
+            ManifestCommittable committable,
+            Map<String, String> properties) {
+        throw new UnsupportedOperationException();
+    }
+
+    private String digestManifestCommittable(ManifestCommittable committable) {
+        try {
+            return new String(
+                    Base64.getEncoder()
+                            .encode(
+                                    MessageDigest.getInstance("MD5")
+                                            
.digest(committableSerializer.serialize(committable))));
+        } catch (NoSuchAlgorithmException e) {
+            throw new RuntimeException("MD5 algorithm not found. This is 
impossible.", e);
+        } catch (IOException e) {
+            throw new RuntimeException(
+                    "Failed to serialize ManifestCommittable. This is 
unexpected.", e);
+        }
+    }
+
+    private List<ManifestEntry> collectChanges(
+            Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> map, ValueKind 
kind) {
+        List<ManifestEntry> changes = new ArrayList<>();
+        for (Map.Entry<BinaryRowData, Map<Integer, List<SstFileMeta>>> 
entryWithPartition :
+                map.entrySet()) {
+            for (Map.Entry<Integer, List<SstFileMeta>> entryWithBucket :
+                    entryWithPartition.getValue().entrySet()) {
+                changes.addAll(
+                        entryWithBucket.getValue().stream()
+                                .map(
+                                        file ->
+                                                new ManifestEntry(
+                                                        kind,
+                                                        
entryWithPartition.getKey(),
+                                                        
entryWithBucket.getKey(),
+                                                        
fileStoreOptions.bucket,
+                                                        file))
+                                .collect(Collectors.toList()));
+            }
+        }
+        return changes;
+    }
+
+    private void tryCommit(List<ManifestEntry> changes, String hash, 
Snapshot.Type type) {
+        while (true) {
+            Long latestSnapshotId = pathFactory.latestSnapshotId();
+            long newSnapshotId =
+                    latestSnapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID : 
latestSnapshotId + 1;
+            Path newSnapshotPath = pathFactory.toSnapshotPath(newSnapshotId);
+            Path tmpSnapshotPath =
+                    new Path(
+                            newSnapshotPath.getParent()
+                                    + "/."
+                                    + newSnapshotPath.getName()
+                                    + UUID.randomUUID());
+
+            Snapshot latestSnapshot = null;
+            if (latestSnapshotId != null) {
+                detectConflicts(latestSnapshotId, changes);
+                latestSnapshot = 
Snapshot.fromPath(pathFactory.toSnapshotPath(latestSnapshotId));
+            }
+
+            Snapshot newSnapshot;
+            String manifestListName = null;
+            List<ManifestFileMeta> oldMetas = new ArrayList<>();
+            List<ManifestFileMeta> newMetas = new ArrayList<>();
+            try {
+                if (latestSnapshot != null) {
+                    // read all previous manifest files
+                    
oldMetas.addAll(manifestList.read(latestSnapshot.manifestList()));
+                    // merge manifest files
+                    newMetas.addAll(
+                            ManifestFileMeta.merge(
+                                    oldMetas,
+                                    manifestFile,
+                                    
fileStoreOptions.manifestSuggestedSize.getBytes()));
+                }
+                // write all changes to manifest file
+                newMetas.add(manifestFile.write(changes));
+                // prepare snapshot file
+                manifestListName = manifestList.write(newMetas);
+                newSnapshot = new Snapshot(newSnapshotId, manifestListName, 
committer, hash, type);
+                FileUtils.writeFileUtf8(tmpSnapshotPath, newSnapshot.toJson());
+            } catch (Throwable e) {
+                // fails when preparing for commit, we should clean up
+                cleanUpManifests(tmpSnapshotPath, manifestListName, oldMetas, 
newMetas);
+                throw new RuntimeException(
+                        String.format(
+                                "Exception occurs when preparing snapshot #%d 
(path %s) by committer %s "
+                                        + "with hash %s and type %s. Clean 
up.",
+                                newSnapshotId,
+                                newSnapshotPath.toString(),
+                                committer,
+                                hash,
+                                type.name()),
+                        e);
+            }
+
+            boolean success;
+            try {
+                FileSystem fs = tmpSnapshotPath.getFileSystem();
+                // atomic rename
+                if (lock != null) {
+                    success =
+                            lock.runWithLock(
+                                    () ->
+                                            // fs.rename may not returns false 
if target file
+                                            // already exists, or even not 
atomic
+                                            // as we're relying on external 
locking, we can first
+                                            // check if file exist then rename 
to work around this
+                                            // case
+                                            !fs.exists(newSnapshotPath)
+                                                    && 
fs.rename(tmpSnapshotPath, newSnapshotPath));
+                } else {
+                    success = fs.rename(tmpSnapshotPath, newSnapshotPath);
+                }
+            } catch (Throwable e) {
+                // exception when performing the atomic rename,
+                // we cannot clean up because we can't determine the success
+                throw new RuntimeException(
+                        String.format(
+                                "Exception occurs when committing snapshot #%d 
(path %s) by committer %s "
+                                        + "with hash %s and type %s. "
+                                        + "Cannot clean up because we can't 
determine the success.",
+                                newSnapshotId,
+                                newSnapshotPath.toString(),
+                                committer,
+                                hash,
+                                type.name()),
+                        e);
+            }
+
+            if (success) {
+                return;
+            }
+
+            // atomic rename fails, clean up and try again
+            LOG.warn(
+                    String.format(
+                            "Atomic rename failed for snapshot #%d (path %s) 
by committer %s "
+                                    + "with hash %s and type %s. "
+                                    + "Clean up and try again.",
+                            newSnapshotId,
+                            newSnapshotPath.toString(),
+                            committer,
+                            hash,
+                            type.name()));
+            cleanUpManifests(tmpSnapshotPath, manifestListName, oldMetas, 
newMetas);
+        }
+    }
+
+    private void detectConflicts(long snapshotId, List<ManifestEntry> changes) 
{

Review comment:
       validateConflicts?

##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.ReadableConfig;
+
+/** Options for {@link FileStore}. */
+public class FileStoreOptions {
+
+    public static final ConfigOption<Integer> BUCKET =
+            ConfigOptions.key("bucket")
+                    .intType()
+                    .defaultValue(1)
+                    .withDescription(
+                            "Bucket number for file store and partition number 
for Kafka.");
+
+    public static final ConfigOption<MemorySize> MANIFEST_SUGGESTED_SIZE =
+            ConfigOptions.key("manifest.suggested-size")

Review comment:
       manifest.target-file-size

##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.utils.FileUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+
+/** This file is the entrance to all data committed at some specific time 
point. */
+public class Snapshot {
+
+    public static final long FIRST_SNAPSHOT_ID = 1;
+
+    private static final String FIELD_ID = "id";
+    private static final String FIELD_MANIFEST_LIST = "manifestList";
+    private static final String FIELD_COMMITTER = "committer";
+    private static final String FIELD_HASH = "hash";
+    private static final String FIELD_TYPE = "type";
+
+    @JsonProperty(FIELD_ID)

Review comment:
       Why we need to user `JsonProperty`? Anyway we need to serialize all 
fields.

##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.utils.FileUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+
+/** This file is the entrance to all data committed at some specific time 
point. */
+public class Snapshot {
+
+    public static final long FIRST_SNAPSHOT_ID = 1;
+
+    private static final String FIELD_ID = "id";
+    private static final String FIELD_MANIFEST_LIST = "manifestList";
+    private static final String FIELD_COMMITTER = "committer";
+    private static final String FIELD_HASH = "hash";
+    private static final String FIELD_TYPE = "type";
+
+    @JsonProperty(FIELD_ID)
+    private final long id;
+
+    @JsonProperty(FIELD_MANIFEST_LIST)
+    private final String manifestList;
+
+    @JsonProperty(FIELD_COMMITTER)
+    private final String committer;

Review comment:
       `commitUser`? committer is something to help commit.

##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
##########
@@ -96,6 +108,31 @@ public String getPartitionString(BinaryRowData partition) {
                                 partition, "Partition row data is null. This 
is unexpected.")));
     }
 
+    @Nullable
+    public Long latestSnapshotId() {

Review comment:
       `currentSnapshotId`?

##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
##########
@@ -118,4 +121,94 @@ public String toString() {
                 numDeletedFiles,
                 Arrays.toString(partitionStats));
     }
+
+    /**
+     * Merge several {@link ManifestFileMeta}s. {@link ManifestEntry}s 
representing first adding and
+     * then deleting the same sst file will cancel each other.
+     *
+     * <p>NOTE: This method is atomic.
+     */
+    public static List<ManifestFileMeta> merge(
+            List<ManifestFileMeta> metas, ManifestFile manifestFile, long 
suggestedMetaSize) {
+        List<ManifestFileMeta> result = new ArrayList<>();
+        // these are the newly created manifest files, clean them up if 
exception occurs
+        List<ManifestFileMeta> newMetas = new ArrayList<>();
+        List<ManifestFileMeta> candidate = new ArrayList<>();
+        long totalSize = 0;
+
+        try {
+            for (ManifestFileMeta manifest : metas) {
+                totalSize += manifest.fileSize;
+                candidate.add(manifest);
+                if (totalSize >= suggestedMetaSize) {
+                    // reach suggested file size, perform merging and produce 
new file
+                    merge(candidate, manifestFile, result, newMetas);
+                    candidate.clear();
+                    totalSize = 0;
+                }
+            }
+            if (!candidate.isEmpty()) {
+                // merge the last bit of metas
+                merge(candidate, manifestFile, result, newMetas);
+            }
+        } catch (Throwable e) {
+            // exception occurs, clean up and rethrow
+            for (ManifestFileMeta manifest : newMetas) {
+                manifestFile.delete(manifest.fileName);
+            }
+            throw e;
+        }
+
+        return result;
+    }
+
+    private static void merge(
+            List<ManifestFileMeta> metas,
+            ManifestFile manifestFile,
+            List<ManifestFileMeta> result,
+            List<ManifestFileMeta> newMetas) {
+        if (metas.size() > 1) {
+            ManifestFileMeta newMeta = merge(metas, manifestFile);
+            result.add(newMeta);
+            newMetas.add(newMeta);
+        } else {
+            result.addAll(metas);
+        }
+    }
+
+    private static ManifestFileMeta merge(List<ManifestFileMeta> metas, 
ManifestFile manifestFile) {
+        Preconditions.checkArgument(
+                metas.size() > 1, "Number of ManifestFileMeta <= 1. This is a 
bug.");
+
+        Map<ManifestEntry.Identifier, ManifestEntry> map = new 
LinkedHashMap<>();
+        for (ManifestFileMeta manifest : metas) {
+            for (ManifestEntry entry : manifestFile.read(manifest.fileName)) {
+                switch (entry.kind()) {
+                    case ADD:
+                        Preconditions.checkState(
+                                !map.containsKey(entry.identifier()),
+                                "Trying to add file %s which is already added. 
Manifest might be corrupted.",
+                                entry.identifier());
+                        map.put(entry.identifier(), entry);

Review comment:
       extract `entry.identifier()` to a field

##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
##########
@@ -118,4 +121,94 @@ public String toString() {
                 numDeletedFiles,
                 Arrays.toString(partitionStats));
     }
+
+    /**
+     * Merge several {@link ManifestFileMeta}s. {@link ManifestEntry}s 
representing first adding and
+     * then deleting the same sst file will cancel each other.
+     *
+     * <p>NOTE: This method is atomic.
+     */
+    public static List<ManifestFileMeta> merge(
+            List<ManifestFileMeta> metas, ManifestFile manifestFile, long 
suggestedMetaSize) {
+        List<ManifestFileMeta> result = new ArrayList<>();
+        // these are the newly created manifest files, clean them up if 
exception occurs
+        List<ManifestFileMeta> newMetas = new ArrayList<>();
+        List<ManifestFileMeta> candidate = new ArrayList<>();
+        long totalSize = 0;
+
+        try {
+            for (ManifestFileMeta manifest : metas) {
+                totalSize += manifest.fileSize;
+                candidate.add(manifest);
+                if (totalSize >= suggestedMetaSize) {
+                    // reach suggested file size, perform merging and produce 
new file
+                    merge(candidate, manifestFile, result, newMetas);
+                    candidate.clear();
+                    totalSize = 0;
+                }
+            }
+            if (!candidate.isEmpty()) {
+                // merge the last bit of metas
+                merge(candidate, manifestFile, result, newMetas);
+            }
+        } catch (Throwable e) {
+            // exception occurs, clean up and rethrow
+            for (ManifestFileMeta manifest : newMetas) {
+                manifestFile.delete(manifest.fileName);
+            }
+            throw e;
+        }
+
+        return result;
+    }
+
+    private static void merge(
+            List<ManifestFileMeta> metas,
+            ManifestFile manifestFile,
+            List<ManifestFileMeta> result,
+            List<ManifestFileMeta> newMetas) {
+        if (metas.size() > 1) {
+            ManifestFileMeta newMeta = merge(metas, manifestFile);
+            result.add(newMeta);
+            newMetas.add(newMeta);
+        } else {
+            result.addAll(metas);
+        }
+    }
+
+    private static ManifestFileMeta merge(List<ManifestFileMeta> metas, 
ManifestFile manifestFile) {
+        Preconditions.checkArgument(
+                metas.size() > 1, "Number of ManifestFileMeta <= 1. This is a 
bug.");
+
+        Map<ManifestEntry.Identifier, ManifestEntry> map = new 
LinkedHashMap<>();

Review comment:
       Do you consider upgrade case?
   A file upgrade to level 2 from level 1?

##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.manifest.ManifestFile;
+import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
+import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Default implementation of {@link FileStoreScan}. */
+public class FileStoreScanImpl implements FileStoreScan {
+
+    private final FileStorePathFactory pathFactory;
+    private final ManifestFile manifestFile;
+    private final ManifestList manifestList;
+
+    private Long snapshotId;
+    private List<ManifestFileMeta> manifests;
+
+    public FileStoreScanImpl(
+            FileStorePathFactory pathFactory,
+            ManifestFile manifestFile,
+            ManifestList manifestList) {
+        this.pathFactory = pathFactory;
+        this.manifestFile = manifestFile;
+        this.manifestList = manifestList;
+
+        this.snapshotId = null;
+        this.manifests = new ArrayList<>();
+    }
+
+    @Override
+    public FileStoreScan withPartitionFilter(Predicate predicate) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public FileStoreScan withKeyFilter(Predicate predicate) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public FileStoreScan withValueFilter(Predicate predicate) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public FileStoreScan withBucket(int bucket) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public FileStoreScan withSnapshot(long snapshotId) {
+        this.snapshotId = snapshotId;
+        Snapshot snapshot = 
Snapshot.fromPath(pathFactory.toSnapshotPath(snapshotId));
+        this.manifests = manifestList.read(snapshot.manifestList());
+        return this;
+    }
+
+    @Override
+    public FileStoreScan withManifestList(List<ManifestFileMeta> manifests) {
+        this.manifests = manifests;
+        return this;
+    }
+
+    @Override
+    public Plan plan() {
+        List<ManifestEntry> files = scan();
+
+        return new Plan() {
+            @Nullable
+            @Override
+            public Long snapshotId() {
+                return snapshotId;
+            }
+
+            @Override
+            public List<ManifestEntry> files() {
+                return files;
+            }
+        };
+    }
+
+    private List<ManifestEntry> scan() {
+        Map<ManifestEntry.Identifier, ManifestEntry> map = new 
LinkedHashMap<>();
+        for (ManifestFileMeta manifest : manifests) {
+            for (ManifestEntry entry : manifestFile.read(manifest.fileName())) 
{

Review comment:
       Add a TODO here, Concurrent Reading

##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.utils.FileUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+
+/** This file is the entrance to all data committed at some specific time 
point. */
+public class Snapshot {
+
+    public static final long FIRST_SNAPSHOT_ID = 1;
+
+    private static final String FIELD_ID = "id";
+    private static final String FIELD_MANIFEST_LIST = "manifestList";
+    private static final String FIELD_COMMITTER = "committer";
+    private static final String FIELD_HASH = "hash";
+    private static final String FIELD_TYPE = "type";
+
+    @JsonProperty(FIELD_ID)
+    private final long id;
+
+    @JsonProperty(FIELD_MANIFEST_LIST)
+    private final String manifestList;
+
+    @JsonProperty(FIELD_COMMITTER)
+    private final String committer;
+
+    // for deduplication
+    @JsonProperty(FIELD_HASH)
+    private final String hash;
+
+    @JsonProperty(FIELD_TYPE)
+    private final Type type;
+
+    @JsonCreator
+    public Snapshot(
+            @JsonProperty(FIELD_ID) long id,
+            @JsonProperty(FIELD_MANIFEST_LIST) String manifestList,
+            @JsonProperty(FIELD_COMMITTER) String committer,
+            @JsonProperty(FIELD_HASH) String hash,
+            @JsonProperty(FIELD_TYPE) Type type) {
+        this.id = id;
+        this.manifestList = manifestList;
+        this.committer = committer;
+        this.hash = hash;
+        this.type = type;
+    }
+
+    @JsonGetter(FIELD_ID)
+    public long id() {
+        return id;
+    }
+
+    @JsonGetter(FIELD_MANIFEST_LIST)
+    public String manifestList() {
+        return manifestList;
+    }
+
+    @JsonGetter(FIELD_COMMITTER)
+    public String committer() {
+        return committer;
+    }
+
+    @JsonGetter(FIELD_HASH)
+    public String hash() {
+        return hash;

Review comment:
       commitDigest?

##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
##########
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+import 
org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.manifest.ManifestFile;
+import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
+import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.FileUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * Default implementation of {@link FileStoreCommit}.
+ *
+ * <p>This class provides an atomic commit method to the user.
+ *
+ * <ol>
+ *   <li>Before calling {@link FileStoreCommitImpl#commit}, user should first 
call {@link
+ *       FileStoreCommitImpl#filterCommitted} to make sure this commit is not 
done before.
+ *   <li>Before committing, it will first check for conflicts by checking if 
all files to be removed
+ *       currently exists.
+ *   <li>After that it use the external {@link FileStoreCommitImpl#lock} (if 
provided) or the atomic
+ *       rename of the file system to ensure atomicity.
+ *   <li>If commit fails due to conflicts or exception it tries its best to 
clean up and aborts.
+ *   <li>If atomic rename fails it tries again after reading the latest 
snapshot from step 2.
+ * </ol>
+ */
+public class FileStoreCommitImpl implements FileStoreCommit {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FileStoreCommitImpl.class);
+
+    private final String committer;
+    private final ManifestCommittableSerializer committableSerializer;
+
+    private final FileStorePathFactory pathFactory;
+    private final ManifestFile manifestFile;
+    private final ManifestList manifestList;
+    private final FileStoreOptions fileStoreOptions;
+    private final FileStoreScan scan;
+
+    @Nullable private Lock lock;
+
+    public FileStoreCommitImpl(
+            String committer,
+            ManifestCommittableSerializer committableSerializer,
+            FileStorePathFactory pathFactory,
+            ManifestFile manifestFile,
+            ManifestList manifestList,
+            FileStoreOptions fileStoreOptions,
+            FileStoreScan scan) {
+        this.committer = committer;
+        this.committableSerializer = committableSerializer;
+
+        this.pathFactory = pathFactory;
+        this.manifestFile = manifestFile;
+        this.manifestList = manifestList;
+        this.fileStoreOptions = fileStoreOptions;
+        this.scan = scan;
+
+        this.lock = null;
+    }
+
+    @Override
+    public FileStoreCommit withLock(Lock lock) {
+        this.lock = lock;
+        return this;
+    }
+
+    @Override
+    public List<ManifestCommittable> filterCommitted(List<ManifestCommittable> 
committableList) {
+        committableList = new ArrayList<>(committableList);
+
+        // filter out commits with no new files
+        committableList.removeIf(committable -> 
committable.newFiles().isEmpty());

Review comment:
       I am thinking maybe we need to support pure compaction too.

##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
##########
@@ -96,6 +108,31 @@ public String getPartitionString(BinaryRowData partition) {
                                 partition, "Partition row data is null. This 
is unexpected.")));
     }
 
+    @Nullable
+    public Long latestSnapshotId() {
+        try {
+            Path snapshotDir = new Path(root + "/snapshot");

Review comment:
       Add a TODO here, we can write `CURRENT` file for better lookup in future.

##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
##########
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+import 
org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.manifest.ManifestFile;
+import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
+import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.FileUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * Default implementation of {@link FileStoreCommit}.
+ *
+ * <p>This class provides an atomic commit method to the user.
+ *
+ * <ol>
+ *   <li>Before calling {@link FileStoreCommitImpl#commit}, user should first 
call {@link
+ *       FileStoreCommitImpl#filterCommitted} to make sure this commit is not 
done before.
+ *   <li>Before committing, it will first check for conflicts by checking if 
all files to be removed
+ *       currently exists.
+ *   <li>After that it use the external {@link FileStoreCommitImpl#lock} (if 
provided) or the atomic
+ *       rename of the file system to ensure atomicity.
+ *   <li>If commit fails due to conflicts or exception it tries its best to 
clean up and aborts.
+ *   <li>If atomic rename fails it tries again after reading the latest 
snapshot from step 2.
+ * </ol>
+ */
+public class FileStoreCommitImpl implements FileStoreCommit {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FileStoreCommitImpl.class);
+
+    private final String committer;
+    private final ManifestCommittableSerializer committableSerializer;
+
+    private final FileStorePathFactory pathFactory;
+    private final ManifestFile manifestFile;
+    private final ManifestList manifestList;
+    private final FileStoreOptions fileStoreOptions;
+    private final FileStoreScan scan;
+
+    @Nullable private Lock lock;
+
+    public FileStoreCommitImpl(
+            String committer,
+            ManifestCommittableSerializer committableSerializer,
+            FileStorePathFactory pathFactory,
+            ManifestFile manifestFile,
+            ManifestList manifestList,
+            FileStoreOptions fileStoreOptions,
+            FileStoreScan scan) {
+        this.committer = committer;
+        this.committableSerializer = committableSerializer;
+
+        this.pathFactory = pathFactory;
+        this.manifestFile = manifestFile;
+        this.manifestList = manifestList;
+        this.fileStoreOptions = fileStoreOptions;
+        this.scan = scan;
+
+        this.lock = null;
+    }
+
+    @Override
+    public FileStoreCommit withLock(Lock lock) {
+        this.lock = lock;
+        return this;
+    }
+
+    @Override
+    public List<ManifestCommittable> filterCommitted(List<ManifestCommittable> 
committableList) {
+        committableList = new ArrayList<>(committableList);
+
+        // filter out commits with no new files
+        committableList.removeIf(committable -> 
committable.newFiles().isEmpty());
+
+        // if there is no previous snapshots then nothing should be filtered
+        Long latestSnapshotId = pathFactory.latestSnapshotId();
+        if (latestSnapshotId == null) {
+            return committableList;
+        }
+
+        // check if a committable is already committed by its hash
+        Map<String, ManifestCommittable> hashes = new LinkedHashMap<>();
+        for (ManifestCommittable committable : committableList) {
+            hashes.put(digestManifestCommittable(committable), committable);
+        }
+
+        for (long id = latestSnapshotId; id >= Snapshot.FIRST_SNAPSHOT_ID; 
id--) {
+            Path snapshotPath = pathFactory.toSnapshotPath(id);
+            Snapshot snapshot = Snapshot.fromPath(snapshotPath);
+            if (committer.equals(snapshot.committer())) {
+                if (hashes.containsKey(snapshot.hash())) {
+                    hashes.remove(snapshot.hash());
+                } else {
+                    // early exit, because committableList must be the latest 
commits by this
+                    // committer
+                    break;
+                }
+            }
+        }
+
+        return new ArrayList<>(hashes.values());
+    }
+
+    @Override
+    public void commit(ManifestCommittable committable, Map<String, String> 
properties) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Ready to commit\n" + committable.toString());
+        }
+
+        String hash = digestManifestCommittable(committable);
+
+        List<ManifestEntry> appendChanges = 
collectChanges(committable.newFiles(), ValueKind.ADD);
+        if (!appendChanges.isEmpty()) {
+            tryCommit(appendChanges, hash, Snapshot.Type.APPEND);
+        }
+
+        List<ManifestEntry> compactChanges = new ArrayList<>();
+        compactChanges.addAll(collectChanges(committable.compactBefore(), 
ValueKind.DELETE));
+        compactChanges.addAll(collectChanges(committable.compactAfter(), 
ValueKind.ADD));
+        if (!compactChanges.isEmpty()) {
+            tryCommit(compactChanges, hash, Snapshot.Type.COMPACT);
+        }
+    }
+
+    @Override
+    public void overwrite(
+            Map<String, String> partition,
+            ManifestCommittable committable,
+            Map<String, String> properties) {
+        throw new UnsupportedOperationException();
+    }
+
+    private String digestManifestCommittable(ManifestCommittable committable) {
+        try {
+            return new String(
+                    Base64.getEncoder()
+                            .encode(
+                                    MessageDigest.getInstance("MD5")
+                                            
.digest(committableSerializer.serialize(committable))));
+        } catch (NoSuchAlgorithmException e) {
+            throw new RuntimeException("MD5 algorithm not found. This is 
impossible.", e);
+        } catch (IOException e) {
+            throw new RuntimeException(
+                    "Failed to serialize ManifestCommittable. This is 
unexpected.", e);
+        }
+    }
+
+    private List<ManifestEntry> collectChanges(
+            Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> map, ValueKind 
kind) {
+        List<ManifestEntry> changes = new ArrayList<>();
+        for (Map.Entry<BinaryRowData, Map<Integer, List<SstFileMeta>>> 
entryWithPartition :
+                map.entrySet()) {
+            for (Map.Entry<Integer, List<SstFileMeta>> entryWithBucket :
+                    entryWithPartition.getValue().entrySet()) {
+                changes.addAll(
+                        entryWithBucket.getValue().stream()
+                                .map(
+                                        file ->
+                                                new ManifestEntry(
+                                                        kind,
+                                                        
entryWithPartition.getKey(),
+                                                        
entryWithBucket.getKey(),
+                                                        
fileStoreOptions.bucket,
+                                                        file))
+                                .collect(Collectors.toList()));
+            }
+        }
+        return changes;
+    }
+
+    private void tryCommit(List<ManifestEntry> changes, String hash, 
Snapshot.Type type) {
+        while (true) {
+            Long latestSnapshotId = pathFactory.latestSnapshotId();
+            long newSnapshotId =
+                    latestSnapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID : 
latestSnapshotId + 1;
+            Path newSnapshotPath = pathFactory.toSnapshotPath(newSnapshotId);
+            Path tmpSnapshotPath =
+                    new Path(
+                            newSnapshotPath.getParent()
+                                    + "/."
+                                    + newSnapshotPath.getName()
+                                    + UUID.randomUUID());
+
+            Snapshot latestSnapshot = null;
+            if (latestSnapshotId != null) {
+                detectConflicts(latestSnapshotId, changes);
+                latestSnapshot = 
Snapshot.fromPath(pathFactory.toSnapshotPath(latestSnapshotId));
+            }
+
+            Snapshot newSnapshot;
+            String manifestListName = null;
+            List<ManifestFileMeta> oldMetas = new ArrayList<>();
+            List<ManifestFileMeta> newMetas = new ArrayList<>();
+            try {
+                if (latestSnapshot != null) {
+                    // read all previous manifest files
+                    
oldMetas.addAll(manifestList.read(latestSnapshot.manifestList()));
+                    // merge manifest files
+                    newMetas.addAll(
+                            ManifestFileMeta.merge(
+                                    oldMetas,
+                                    manifestFile,
+                                    
fileStoreOptions.manifestSuggestedSize.getBytes()));
+                }
+                // write all changes to manifest file
+                newMetas.add(manifestFile.write(changes));
+                // prepare snapshot file
+                manifestListName = manifestList.write(newMetas);
+                newSnapshot = new Snapshot(newSnapshotId, manifestListName, 
committer, hash, type);
+                FileUtils.writeFileUtf8(tmpSnapshotPath, newSnapshot.toJson());
+            } catch (Throwable e) {
+                // fails when preparing for commit, we should clean up
+                cleanUpManifests(tmpSnapshotPath, manifestListName, oldMetas, 
newMetas);
+                throw new RuntimeException(
+                        String.format(
+                                "Exception occurs when preparing snapshot #%d 
(path %s) by committer %s "
+                                        + "with hash %s and type %s. Clean 
up.",
+                                newSnapshotId,
+                                newSnapshotPath.toString(),
+                                committer,
+                                hash,
+                                type.name()),
+                        e);
+            }
+
+            boolean success;
+            try {
+                FileSystem fs = tmpSnapshotPath.getFileSystem();
+                // atomic rename
+                if (lock != null) {
+                    success =
+                            lock.runWithLock(
+                                    () ->
+                                            // fs.rename may not returns false 
if target file
+                                            // already exists, or even not 
atomic
+                                            // as we're relying on external 
locking, we can first
+                                            // check if file exist then rename 
to work around this
+                                            // case
+                                            !fs.exists(newSnapshotPath)
+                                                    && 
fs.rename(tmpSnapshotPath, newSnapshotPath));
+                } else {
+                    success = fs.rename(tmpSnapshotPath, newSnapshotPath);
+                }
+            } catch (Throwable e) {
+                // exception when performing the atomic rename,
+                // we cannot clean up because we can't determine the success
+                throw new RuntimeException(
+                        String.format(
+                                "Exception occurs when committing snapshot #%d 
(path %s) by committer %s "
+                                        + "with hash %s and type %s. "
+                                        + "Cannot clean up because we can't 
determine the success.",
+                                newSnapshotId,
+                                newSnapshotPath.toString(),
+                                committer,
+                                hash,
+                                type.name()),
+                        e);
+            }
+
+            if (success) {
+                return;
+            }
+
+            // atomic rename fails, clean up and try again
+            LOG.warn(
+                    String.format(
+                            "Atomic rename failed for snapshot #%d (path %s) 
by committer %s "
+                                    + "with hash %s and type %s. "
+                                    + "Clean up and try again.",
+                            newSnapshotId,
+                            newSnapshotPath.toString(),
+                            committer,
+                            hash,
+                            type.name()));
+            cleanUpManifests(tmpSnapshotPath, manifestListName, oldMetas, 
newMetas);
+        }
+    }
+
+    private void detectConflicts(long snapshotId, List<ManifestEntry> changes) 
{
+        Set<ManifestEntry.Identifier> removedFiles =
+                changes.stream()
+                        .filter(e -> e.kind().equals(ValueKind.DELETE))
+                        .map(ManifestEntry::identifier)
+                        .collect(Collectors.toSet());
+        if (removedFiles.isEmpty()) {
+            // early exit for append only changes
+            return;
+        }
+
+        try {
+            for (ManifestEntry entry : 
scan.withSnapshot(snapshotId).plan().files()) {
+                removedFiles.remove(entry.identifier());
+            }
+        } catch (Throwable e) {
+            throw new RuntimeException("Cannot determine if conflicts exist.", 
e);
+        }
+
+        if (!removedFiles.isEmpty()) {
+            throw new RuntimeException(
+                    "Conflicts detected on:\n"
+                            + removedFiles.stream()
+                                    .map(
+                                            i ->
+                                                    
pathFactory.getPartitionString(i.partition)
+                                                            + ", bucket "
+                                                            + i.bucket
+                                                            + ", file "
+                                                            + i.fileName)
+                                    .collect(Collectors.joining("\n")));
+        }
+    }
+
+    private void cleanUpManifests(

Review comment:
       cleanUpTempSnapshot?

##########
File path: 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
##########
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+import 
org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.manifest.ManifestFile;
+import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
+import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.FileUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * Default implementation of {@link FileStoreCommit}.
+ *
+ * <p>This class provides an atomic commit method to the user.
+ *
+ * <ol>
+ *   <li>Before calling {@link FileStoreCommitImpl#commit}, user should first 
call {@link
+ *       FileStoreCommitImpl#filterCommitted} to make sure this commit is not 
done before.
+ *   <li>Before committing, it will first check for conflicts by checking if 
all files to be removed
+ *       currently exists.
+ *   <li>After that it use the external {@link FileStoreCommitImpl#lock} (if 
provided) or the atomic
+ *       rename of the file system to ensure atomicity.
+ *   <li>If commit fails due to conflicts or exception it tries its best to 
clean up and aborts.
+ *   <li>If atomic rename fails it tries again after reading the latest 
snapshot from step 2.
+ * </ol>
+ */
+public class FileStoreCommitImpl implements FileStoreCommit {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FileStoreCommitImpl.class);
+
+    private final String committer;
+    private final ManifestCommittableSerializer committableSerializer;
+
+    private final FileStorePathFactory pathFactory;
+    private final ManifestFile manifestFile;
+    private final ManifestList manifestList;
+    private final FileStoreOptions fileStoreOptions;
+    private final FileStoreScan scan;
+
+    @Nullable private Lock lock;
+
+    public FileStoreCommitImpl(
+            String committer,
+            ManifestCommittableSerializer committableSerializer,
+            FileStorePathFactory pathFactory,
+            ManifestFile manifestFile,
+            ManifestList manifestList,
+            FileStoreOptions fileStoreOptions,
+            FileStoreScan scan) {
+        this.committer = committer;
+        this.committableSerializer = committableSerializer;
+
+        this.pathFactory = pathFactory;
+        this.manifestFile = manifestFile;
+        this.manifestList = manifestList;
+        this.fileStoreOptions = fileStoreOptions;
+        this.scan = scan;
+
+        this.lock = null;
+    }
+
+    @Override
+    public FileStoreCommit withLock(Lock lock) {
+        this.lock = lock;
+        return this;
+    }
+
+    @Override
+    public List<ManifestCommittable> filterCommitted(List<ManifestCommittable> 
committableList) {
+        committableList = new ArrayList<>(committableList);
+
+        // filter out commits with no new files
+        committableList.removeIf(committable -> 
committable.newFiles().isEmpty());
+
+        // if there is no previous snapshots then nothing should be filtered
+        Long latestSnapshotId = pathFactory.latestSnapshotId();
+        if (latestSnapshotId == null) {
+            return committableList;
+        }
+
+        // check if a committable is already committed by its hash
+        Map<String, ManifestCommittable> hashes = new LinkedHashMap<>();
+        for (ManifestCommittable committable : committableList) {
+            hashes.put(digestManifestCommittable(committable), committable);
+        }
+
+        for (long id = latestSnapshotId; id >= Snapshot.FIRST_SNAPSHOT_ID; 
id--) {
+            Path snapshotPath = pathFactory.toSnapshotPath(id);
+            Snapshot snapshot = Snapshot.fromPath(snapshotPath);
+            if (committer.equals(snapshot.committer())) {
+                if (hashes.containsKey(snapshot.hash())) {
+                    hashes.remove(snapshot.hash());
+                } else {
+                    // early exit, because committableList must be the latest 
commits by this
+                    // committer
+                    break;
+                }
+            }
+        }
+
+        return new ArrayList<>(hashes.values());
+    }
+
+    @Override
+    public void commit(ManifestCommittable committable, Map<String, String> 
properties) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Ready to commit\n" + committable.toString());
+        }
+
+        String hash = digestManifestCommittable(committable);
+
+        List<ManifestEntry> appendChanges = 
collectChanges(committable.newFiles(), ValueKind.ADD);
+        if (!appendChanges.isEmpty()) {
+            tryCommit(appendChanges, hash, Snapshot.Type.APPEND);
+        }
+
+        List<ManifestEntry> compactChanges = new ArrayList<>();
+        compactChanges.addAll(collectChanges(committable.compactBefore(), 
ValueKind.DELETE));
+        compactChanges.addAll(collectChanges(committable.compactAfter(), 
ValueKind.ADD));
+        if (!compactChanges.isEmpty()) {
+            tryCommit(compactChanges, hash, Snapshot.Type.COMPACT);
+        }
+    }
+
+    @Override
+    public void overwrite(
+            Map<String, String> partition,
+            ManifestCommittable committable,
+            Map<String, String> properties) {
+        throw new UnsupportedOperationException();
+    }
+
+    private String digestManifestCommittable(ManifestCommittable committable) {
+        try {
+            return new String(
+                    Base64.getEncoder()
+                            .encode(
+                                    MessageDigest.getInstance("MD5")
+                                            
.digest(committableSerializer.serialize(committable))));
+        } catch (NoSuchAlgorithmException e) {
+            throw new RuntimeException("MD5 algorithm not found. This is 
impossible.", e);
+        } catch (IOException e) {
+            throw new RuntimeException(
+                    "Failed to serialize ManifestCommittable. This is 
unexpected.", e);
+        }
+    }
+
+    private List<ManifestEntry> collectChanges(
+            Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> map, ValueKind 
kind) {
+        List<ManifestEntry> changes = new ArrayList<>();
+        for (Map.Entry<BinaryRowData, Map<Integer, List<SstFileMeta>>> 
entryWithPartition :
+                map.entrySet()) {
+            for (Map.Entry<Integer, List<SstFileMeta>> entryWithBucket :
+                    entryWithPartition.getValue().entrySet()) {
+                changes.addAll(
+                        entryWithBucket.getValue().stream()
+                                .map(
+                                        file ->
+                                                new ManifestEntry(
+                                                        kind,
+                                                        
entryWithPartition.getKey(),
+                                                        
entryWithBucket.getKey(),
+                                                        
fileStoreOptions.bucket,
+                                                        file))
+                                .collect(Collectors.toList()));
+            }
+        }
+        return changes;
+    }
+
+    private void tryCommit(List<ManifestEntry> changes, String hash, 
Snapshot.Type type) {
+        while (true) {
+            Long latestSnapshotId = pathFactory.latestSnapshotId();
+            long newSnapshotId =
+                    latestSnapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID : 
latestSnapshotId + 1;
+            Path newSnapshotPath = pathFactory.toSnapshotPath(newSnapshotId);
+            Path tmpSnapshotPath =
+                    new Path(
+                            newSnapshotPath.getParent()
+                                    + "/."
+                                    + newSnapshotPath.getName()
+                                    + UUID.randomUUID());
+
+            Snapshot latestSnapshot = null;
+            if (latestSnapshotId != null) {
+                detectConflicts(latestSnapshotId, changes);
+                latestSnapshot = 
Snapshot.fromPath(pathFactory.toSnapshotPath(latestSnapshotId));
+            }
+
+            Snapshot newSnapshot;
+            String manifestListName = null;
+            List<ManifestFileMeta> oldMetas = new ArrayList<>();
+            List<ManifestFileMeta> newMetas = new ArrayList<>();
+            try {
+                if (latestSnapshot != null) {
+                    // read all previous manifest files
+                    
oldMetas.addAll(manifestList.read(latestSnapshot.manifestList()));
+                    // merge manifest files
+                    newMetas.addAll(
+                            ManifestFileMeta.merge(
+                                    oldMetas,
+                                    manifestFile,
+                                    
fileStoreOptions.manifestSuggestedSize.getBytes()));
+                }
+                // write all changes to manifest file
+                newMetas.add(manifestFile.write(changes));
+                // prepare snapshot file
+                manifestListName = manifestList.write(newMetas);
+                newSnapshot = new Snapshot(newSnapshotId, manifestListName, 
committer, hash, type);
+                FileUtils.writeFileUtf8(tmpSnapshotPath, newSnapshot.toJson());
+            } catch (Throwable e) {
+                // fails when preparing for commit, we should clean up
+                cleanUpManifests(tmpSnapshotPath, manifestListName, oldMetas, 
newMetas);
+                throw new RuntimeException(
+                        String.format(
+                                "Exception occurs when preparing snapshot #%d 
(path %s) by committer %s "
+                                        + "with hash %s and type %s. Clean 
up.",
+                                newSnapshotId,
+                                newSnapshotPath.toString(),
+                                committer,
+                                hash,
+                                type.name()),
+                        e);
+            }
+
+            boolean success;
+            try {
+                FileSystem fs = tmpSnapshotPath.getFileSystem();
+                // atomic rename
+                if (lock != null) {
+                    success =
+                            lock.runWithLock(
+                                    () ->
+                                            // fs.rename may not returns false 
if target file
+                                            // already exists, or even not 
atomic
+                                            // as we're relying on external 
locking, we can first
+                                            // check if file exist then rename 
to work around this
+                                            // case
+                                            !fs.exists(newSnapshotPath)
+                                                    && 
fs.rename(tmpSnapshotPath, newSnapshotPath));
+                } else {
+                    success = fs.rename(tmpSnapshotPath, newSnapshotPath);
+                }
+            } catch (Throwable e) {
+                // exception when performing the atomic rename,
+                // we cannot clean up because we can't determine the success
+                throw new RuntimeException(
+                        String.format(
+                                "Exception occurs when committing snapshot #%d 
(path %s) by committer %s "
+                                        + "with hash %s and type %s. "
+                                        + "Cannot clean up because we can't 
determine the success.",
+                                newSnapshotId,
+                                newSnapshotPath.toString(),
+                                committer,
+                                hash,
+                                type.name()),
+                        e);
+            }
+
+            if (success) {
+                return;
+            }
+
+            // atomic rename fails, clean up and try again
+            LOG.warn(
+                    String.format(
+                            "Atomic rename failed for snapshot #%d (path %s) 
by committer %s "
+                                    + "with hash %s and type %s. "
+                                    + "Clean up and try again.",
+                            newSnapshotId,
+                            newSnapshotPath.toString(),
+                            committer,
+                            hash,
+                            type.name()));
+            cleanUpManifests(tmpSnapshotPath, manifestListName, oldMetas, 
newMetas);
+        }
+    }
+
+    private void detectConflicts(long snapshotId, List<ManifestEntry> changes) 
{
+        Set<ManifestEntry.Identifier> removedFiles =
+                changes.stream()
+                        .filter(e -> e.kind().equals(ValueKind.DELETE))
+                        .map(ManifestEntry::identifier)
+                        .collect(Collectors.toSet());
+        if (removedFiles.isEmpty()) {
+            // early exit for append only changes
+            return;
+        }
+
+        try {
+            for (ManifestEntry entry : 
scan.withSnapshot(snapshotId).plan().files()) {

Review comment:
       Add TODO here, partition filter is important.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to