This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 7599a41  [FLINK-26184] Migrate StoreSink to Sink V2
7599a41 is described below

commit 7599a41955bf4931f3dc35273336829e2cc942f4
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Feb 17 19:34:42 2022 +0800

    [FLINK-26184] Migrate StoreSink to Sink V2
    
    This closes #22
---
 flink-table-store-connector/pom.xml                |   8 +
 .../table/store/connector/sink/Committable.java    |  79 ++++++
 .../connector/sink/CommittableSerializer.java      |  54 ++++
 ...{LocalCommittable.java => FileCommittable.java} |   8 +-
 ...ializer.java => FileCommittableSerializer.java} |  12 +-
 .../store/connector/sink/GlobalCommittable.java    |  44 ++++
 .../sink/GlobalCommittableSerializer.java          |  94 +++++++
 .../store/connector/sink/LogOffsetCommittable.java |  51 ++++
 .../store/connector/sink/StoreGlobalCommitter.java |  94 ++++---
 .../table/store/connector/sink/StoreSink.java      |  79 +++---
 .../store/connector/sink/StoreSinkWriter.java      |  63 +++--
 .../connector/sink/global/GlobalCommitter.java     |  45 ++++
 .../sink/global/GlobalCommitterOperator.java       | 152 +++++++++++
 .../sink/global/GlobalCommittingSink.java          |  56 +++++
 .../global/GlobalCommittingSinkTranslator.java     |  60 +++++
 .../connector/sink/CommittableSerializerTest.java  |  38 +++
 ...est.java => FileCommittableSerializerTest.java} |  12 +-
 .../sink/GlobalCommittableSerializerTest.java      |  69 +++++
 .../connector/sink/LogOffsetCommittableTest.java   |  35 +++
 .../table/store/connector/sink/StoreSinkTest.java  |  35 ++-
 .../table/store/connector/sink/TestFileStore.java  |  18 +-
 .../sink/global/GlobalCommitterOperatorTest.java   | 278 +++++++++++++++++++++
 .../apache/flink/table/store/file/Snapshot.java    |  16 +-
 .../store/file/manifest/ManifestCommittable.java   |  58 +++--
 .../manifest/ManifestCommittableSerializer.java    |  22 +-
 .../store/file/operation/FileStoreCommitImpl.java  |  42 ++--
 .../ManifestCommittableSerializerTest.java         |  34 ++-
 .../store/file/operation/OperationTestUtils.java   |   7 +-
 .../store/file/operation/TestCommitThread.java     |  11 +-
 29 files changed, 1394 insertions(+), 180 deletions(-)

diff --git a/flink-table-store-connector/pom.xml 
b/flink-table-store-connector/pom.xml
index a578b80..70ccea3 100644
--- a/flink-table-store-connector/pom.xml
+++ b/flink-table-store-connector/pom.xml
@@ -80,6 +80,14 @@ under the License.
 
         <dependency>
             <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
             <artifactId>flink-table-store-core</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/Committable.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/Committable.java
new file mode 100644
index 0000000..b4ba992
--- /dev/null
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/Committable.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+/** Committable produced by {@link StoreSinkWriter}. */
+public class Committable {
+
+    private final Kind kind;
+
+    private final byte[] wrappedCommittable;
+
+    private final int serializerVersion;
+
+    public Committable(Kind kind, byte[] wrappedCommittable, int 
serializerVersion) {
+        this.kind = kind;
+        this.wrappedCommittable = wrappedCommittable;
+        this.serializerVersion = serializerVersion;
+    }
+
+    public Kind kind() {
+        return kind;
+    }
+
+    public byte[] wrappedCommittable() {
+        return wrappedCommittable;
+    }
+
+    public int serializerVersion() {
+        return serializerVersion;
+    }
+
+    enum Kind {
+        FILE((byte) 0),
+
+        LOG((byte) 1),
+
+        LOG_OFFSET((byte) 2);
+
+        private final byte value;
+
+        Kind(byte value) {
+            this.value = value;
+        }
+
+        public byte toByteValue() {
+            return value;
+        }
+
+        public static Kind fromByteValue(byte value) {
+            switch (value) {
+                case 0:
+                    return FILE;
+                case 1:
+                    return LOG;
+                case 2:
+                    return LOG_OFFSET;
+                default:
+                    throw new UnsupportedOperationException(
+                            "Unsupported byte value '" + value + "' for value 
kind.");
+            }
+        }
+    }
+}
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableSerializer.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableSerializer.java
new file mode 100644
index 0000000..82627ea
--- /dev/null
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableSerializer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.nio.ByteBuffer;
+
+/** {@link SimpleVersionedSerializer} for {@link Committable}. */
+public class CommittableSerializer implements 
SimpleVersionedSerializer<Committable> {
+
+    public static final CommittableSerializer INSTANCE = new 
CommittableSerializer();
+
+    @Override
+    public int getVersion() {
+        return 1;
+    }
+
+    @Override
+    public byte[] serialize(Committable committable) {
+        byte[] wrapped = committable.wrappedCommittable();
+        return ByteBuffer.allocate(1 + wrapped.length + 4)
+                .put(committable.kind().toByteValue())
+                .put(wrapped)
+                .putInt(committable.serializerVersion())
+                .array();
+    }
+
+    @Override
+    public Committable deserialize(int i, byte[] bytes) {
+        ByteBuffer buffer = ByteBuffer.wrap(bytes);
+        Committable.Kind kind = Committable.Kind.fromByteValue(buffer.get());
+        byte[] wrapped = new byte[bytes.length - 5];
+        buffer.get(wrapped);
+        int version = buffer.getInt();
+        return new Committable(kind, wrapped, version);
+    }
+}
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/LocalCommittable.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FileCommittable.java
similarity index 90%
rename from 
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/LocalCommittable.java
rename to 
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FileCommittable.java
index c6bbeca..e088d1d 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/LocalCommittable.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FileCommittable.java
@@ -23,8 +23,8 @@ import org.apache.flink.table.store.file.mergetree.Increment;
 
 import java.util.Objects;
 
-/** Local committable for sink. */
-public class LocalCommittable {
+/** File committable for sink. */
+public class FileCommittable {
 
     private final BinaryRowData partition;
 
@@ -32,7 +32,7 @@ public class LocalCommittable {
 
     private final Increment increment;
 
-    public LocalCommittable(BinaryRowData partition, int bucket, Increment 
increment) {
+    public FileCommittable(BinaryRowData partition, int bucket, Increment 
increment) {
         this.partition = partition;
         this.bucket = bucket;
         this.increment = increment;
@@ -58,7 +58,7 @@ public class LocalCommittable {
         if (o == null || getClass() != o.getClass()) {
             return false;
         }
-        LocalCommittable that = (LocalCommittable) o;
+        FileCommittable that = (FileCommittable) o;
         return bucket == that.bucket
                 && Objects.equals(partition, that.partition)
                 && Objects.equals(increment, that.increment);
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/LocalCommittableSerializer.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializer.java
similarity index 85%
rename from 
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/LocalCommittableSerializer.java
rename to 
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializer.java
index 6bf19ca..69bb44b 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/LocalCommittableSerializer.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializer.java
@@ -29,13 +29,13 @@ import org.apache.flink.table.types.logical.RowType;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 
-/** {@link SimpleVersionedSerializer} for {@link LocalCommittable}. */
-public class LocalCommittableSerializer implements 
SimpleVersionedSerializer<LocalCommittable> {
+/** {@link SimpleVersionedSerializer} for {@link FileCommittable}. */
+public class FileCommittableSerializer implements 
SimpleVersionedSerializer<FileCommittable> {
 
     private final BinaryRowDataSerializer partSerializer;
     private final SstFileMetaSerializer sstSerializer;
 
-    public LocalCommittableSerializer(RowType partitionType, RowType keyType, 
RowType rowType) {
+    public FileCommittableSerializer(RowType partitionType, RowType keyType, 
RowType rowType) {
         this.partSerializer = new 
BinaryRowDataSerializer(partitionType.getFieldCount());
         this.sstSerializer = new SstFileMetaSerializer(keyType, rowType);
     }
@@ -46,7 +46,7 @@ public class LocalCommittableSerializer implements 
SimpleVersionedSerializer<Loc
     }
 
     @Override
-    public byte[] serialize(LocalCommittable obj) throws IOException {
+    public byte[] serialize(FileCommittable obj) throws IOException {
         ByteArrayOutputStream out = new ByteArrayOutputStream();
         DataOutputViewStreamWrapper view = new 
DataOutputViewStreamWrapper(out);
         partSerializer.serialize(obj.partition(), view);
@@ -58,9 +58,9 @@ public class LocalCommittableSerializer implements 
SimpleVersionedSerializer<Loc
     }
 
     @Override
-    public LocalCommittable deserialize(int version, byte[] serialized) throws 
IOException {
+    public FileCommittable deserialize(int version, byte[] serialized) throws 
IOException {
         DataInputDeserializer view = new DataInputDeserializer(serialized);
-        return new LocalCommittable(
+        return new FileCommittable(
                 partSerializer.deserialize(view),
                 view.readInt(),
                 new Increment(
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/GlobalCommittable.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/GlobalCommittable.java
new file mode 100644
index 0000000..ac49d15
--- /dev/null
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/GlobalCommittable.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+
+import java.util.List;
+
+/** Global aggregated committable for {@link StoreGlobalCommitter}. */
+public class GlobalCommittable<LogCommT> {
+
+    private final List<LogCommT> logCommittables;
+
+    private final ManifestCommittable fileCommittable;
+
+    public GlobalCommittable(List<LogCommT> logCommittables, 
ManifestCommittable fileCommittable) {
+        this.logCommittables = logCommittables;
+        this.fileCommittable = fileCommittable;
+    }
+
+    public List<LogCommT> logCommittables() {
+        return logCommittables;
+    }
+
+    public ManifestCommittable fileCommittable() {
+        return fileCommittable;
+    }
+}
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/GlobalCommittableSerializer.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/GlobalCommittableSerializer.java
new file mode 100644
index 0000000..ea180d2
--- /dev/null
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/GlobalCommittableSerializer.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+import 
org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/** {@link SimpleVersionedSerializer} for {@link GlobalCommittable}. */
+public class GlobalCommittableSerializer<LogCommT>
+        implements SimpleVersionedSerializer<GlobalCommittable<LogCommT>> {
+
+    private final SimpleVersionedSerializer<LogCommT> logSerializer;
+
+    private final ManifestCommittableSerializer fileSerializer;
+
+    public GlobalCommittableSerializer(
+            SimpleVersionedSerializer<LogCommT> logSerializer,
+            ManifestCommittableSerializer fileSerializer) {
+        this.logSerializer = logSerializer;
+        this.fileSerializer = fileSerializer;
+    }
+
+    @Override
+    public int getVersion() {
+        return 1;
+    }
+
+    @Override
+    public byte[] serialize(GlobalCommittable<LogCommT> committable) throws 
IOException {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        DataOutputViewStreamWrapper view = new 
DataOutputViewStreamWrapper(out);
+
+        view.writeInt(logSerializer.getVersion());
+        view.writeInt(committable.logCommittables().size());
+        for (LogCommT commT : committable.logCommittables()) {
+            byte[] bytes = logSerializer.serialize(commT);
+            view.writeInt(bytes.length);
+            view.write(bytes);
+        }
+
+        view.writeInt(fileSerializer.getVersion());
+        byte[] bytes = fileSerializer.serialize(committable.fileCommittable());
+        view.writeInt(bytes.length);
+        view.write(bytes);
+
+        return out.toByteArray();
+    }
+
+    @Override
+    public GlobalCommittable<LogCommT> deserialize(int version, byte[] 
serialized)
+            throws IOException {
+        DataInputDeserializer view = new DataInputDeserializer(serialized);
+
+        int logVersion = view.readInt();
+        int logSize = view.readInt();
+        List<LogCommT> logCommTList = new ArrayList<>();
+        for (int i = 0; i < logSize; i++) {
+            byte[] bytes = new byte[view.readInt()];
+            view.read(bytes);
+            logCommTList.add(logSerializer.deserialize(logVersion, bytes));
+        }
+
+        int fileVersion = view.readInt();
+        byte[] bytes = new byte[view.readInt()];
+        view.read(bytes);
+        ManifestCommittable file = fileSerializer.deserialize(fileVersion, 
bytes);
+
+        return new GlobalCommittable<>(logCommTList, file);
+    }
+}
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/LogOffsetCommittable.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/LogOffsetCommittable.java
new file mode 100644
index 0000000..68b62d8
--- /dev/null
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/LogOffsetCommittable.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import java.nio.ByteBuffer;
+
+/** Log offset committable for a bucket. */
+public class LogOffsetCommittable {
+
+    private final int bucket;
+
+    private final long offset;
+
+    public LogOffsetCommittable(int bucket, long offset) {
+        this.bucket = bucket;
+        this.offset = offset;
+    }
+
+    public int bucket() {
+        return bucket;
+    }
+
+    public long offset() {
+        return offset;
+    }
+
+    public byte[] toBytes() {
+        return ByteBuffer.allocate(12).putInt(bucket).putLong(offset).array();
+    }
+
+    public static LogOffsetCommittable fromBytes(byte[] bytes) {
+        ByteBuffer buffer = ByteBuffer.wrap(bytes);
+        return new LogOffsetCommittable(buffer.getInt(), buffer.getLong());
+    }
+}
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java
index e5dde92..a70b3b1 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java
@@ -18,29 +18,31 @@
 
 package org.apache.flink.table.store.connector.sink;
 
-import org.apache.flink.api.connector.sink.GlobalCommitter;
 import org.apache.flink.table.catalog.CatalogLock;
+import org.apache.flink.table.store.connector.sink.global.GlobalCommitter;
 import org.apache.flink.table.store.file.manifest.ManifestCommittable;
 import org.apache.flink.table.store.file.operation.FileStoreCommit;
 import org.apache.flink.table.store.file.operation.FileStoreExpire;
 
 import javax.annotation.Nullable;
 
-import java.util.Collections;
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
+import java.util.stream.Collectors;
 
 /** {@link GlobalCommitter} for dynamic store. */
-public class StoreGlobalCommitter
-        implements GlobalCommitter<LocalCommittable, ManifestCommittable> {
+public class StoreGlobalCommitter<LogCommT>
+        implements GlobalCommitter<Committable, GlobalCommittable<LogCommT>> {
 
     private final FileStoreCommit fileStoreCommit;
 
     private final FileStoreExpire fileStoreExpire;
 
+    private final FileCommittableSerializer fileCommitSerializer;
+
     @Nullable private final CatalogLock lock;
 
     @Nullable private final Map<String, String> overwritePartition;
@@ -48,55 +50,77 @@ public class StoreGlobalCommitter
     public StoreGlobalCommitter(
             FileStoreCommit fileStoreCommit,
             FileStoreExpire fileStoreExpire,
+            FileCommittableSerializer fileCommitSerializer,
             @Nullable CatalogLock lock,
             @Nullable Map<String, String> overwritePartition) {
         this.fileStoreCommit = fileStoreCommit;
         this.fileStoreExpire = fileStoreExpire;
+        this.fileCommitSerializer = fileCommitSerializer;
         this.lock = lock;
         this.overwritePartition = overwritePartition;
     }
 
     @Override
-    public List<ManifestCommittable> filterRecoveredCommittables(
-            List<ManifestCommittable> globalCommittables) {
-        return fileStoreCommit.filterCommitted(globalCommittables);
+    public void close() throws Exception {
+        if (lock != null) {
+            lock.close();
+        }
     }
 
     @Override
-    public ManifestCommittable combine(List<LocalCommittable> committables) {
-        ManifestCommittable globalCommittable = new ManifestCommittable();
-        committables.forEach(
-                committable ->
-                        globalCommittable.add(
-                                committable.partition(),
-                                committable.bucket(),
-                                committable.increment()));
-        return globalCommittable;
+    public List<GlobalCommittable<LogCommT>> filterRecoveredCommittables(
+            List<GlobalCommittable<LogCommT>> globalCommittables) {
+        List<ManifestCommittable> filtered =
+                fileStoreCommit.filterCommitted(
+                        globalCommittables.stream()
+                                .map(GlobalCommittable::fileCommittable)
+                                .collect(Collectors.toList()));
+        return globalCommittables.stream()
+                .filter(c -> filtered.contains(c.fileCommittable()))
+                .collect(Collectors.toList());
     }
 
     @Override
-    public List<ManifestCommittable> commit(List<ManifestCommittable> 
globalCommittables) {
-        Map<String, String> properties = new HashMap<>();
-        if (overwritePartition == null) {
-            for (ManifestCommittable committable : globalCommittables) {
-                fileStoreCommit.commit(committable, properties);
+    public GlobalCommittable<LogCommT> combine(long checkpointId, 
List<Committable> committables)
+            throws IOException {
+        List<LogCommT> logCommittables = new ArrayList<>();
+        ManifestCommittable fileCommittable = new 
ManifestCommittable(String.valueOf(checkpointId));
+        for (Committable committable : committables) {
+            switch (committable.kind()) {
+                case FILE:
+                    FileCommittable file =
+                            fileCommitSerializer.deserialize(
+                                    committable.serializerVersion(),
+                                    committable.wrappedCommittable());
+                    fileCommittable.addFileCommittable(
+                            file.partition(), file.bucket(), file.increment());
+                    break;
+                case LOG_OFFSET:
+                    LogOffsetCommittable offset =
+                            
LogOffsetCommittable.fromBytes(committable.wrappedCommittable());
+                    fileCommittable.addLogOffset(offset.bucket(), 
offset.offset());
+                    break;
+                case LOG:
+                    throw new UnsupportedOperationException();
             }
-        } else {
-            checkArgument(
-                    globalCommittables.size() == 1, "overwrite is only 
supported in batch mode.");
-            fileStoreCommit.overwrite(overwritePartition, 
globalCommittables.get(0), properties);
         }
-        fileStoreExpire.expire();
-        return Collections.emptyList();
+        return new GlobalCommittable<>(logCommittables, fileCommittable);
     }
 
     @Override
-    public void endOfInput() {}
-
-    @Override
-    public void close() throws Exception {
-        if (lock != null) {
-            lock.close();
+    public void commit(List<GlobalCommittable<LogCommT>> committables) {
+        if (overwritePartition == null) {
+            for (GlobalCommittable<LogCommT> committable : committables) {
+                fileStoreCommit.commit(committable.fileCommittable(), new 
HashMap<>());
+            }
+        } else {
+            for (GlobalCommittable<LogCommT> committable : committables) {
+                fileStoreCommit.overwrite(
+                        overwritePartition, committable.fileCommittable(), new 
HashMap<>());
+            }
         }
+
+        // TODO introduce check interval
+        fileStoreExpire.expire();
     }
 }
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
index 8e7652d..c61ce0c 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
@@ -18,15 +18,14 @@
 
 package org.apache.flink.table.store.connector.sink;
 
-import org.apache.flink.api.connector.sink.Committer;
-import org.apache.flink.api.connector.sink.GlobalCommitter;
-import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.StatefulSink;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.table.catalog.CatalogLock;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.connector.sink.global.GlobalCommittingSink;
 import org.apache.flink.table.store.file.FileStore;
-import org.apache.flink.table.store.file.manifest.ManifestCommittable;
 import 
org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
 import org.apache.flink.table.store.file.operation.FileStoreCommit;
 import org.apache.flink.table.store.file.operation.Lock;
@@ -35,15 +34,17 @@ import org.apache.flink.table.types.logical.RowType;
 
 import javax.annotation.Nullable;
 
-import java.util.List;
+import java.io.IOException;
+import java.util.Collection;
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.Callable;
 
 import static org.apache.flink.table.store.utils.ProjectionUtils.project;
 
 /** {@link Sink} of dynamic store. */
-public class StoreSink implements Sink<RowData, LocalCommittable, Void, 
ManifestCommittable> {
+public class StoreSink<WriterStateT, LogCommT>
+        implements StatefulSink<RowData, WriterStateT>,
+                GlobalCommittingSink<RowData, Committable, 
GlobalCommittable<LogCommT>> {
 
     private static final long serialVersionUID = 1L;
 
@@ -83,26 +84,27 @@ public class StoreSink implements Sink<RowData, 
LocalCommittable, Void, Manifest
     }
 
     @Override
-    public StoreSinkWriter createWriter(InitContext initContext, List<Void> 
list) {
-        SinkRecordConverter recordConverter =
-                new SinkRecordConverter(numBucket, rowType, partitions, keys);
-        return new StoreSinkWriter(
-                fileStore.newWrite(), recordConverter, overwritePartition != 
null);
+    public StoreSinkWriter<WriterStateT> createWriter(InitContext initContext) 
throws IOException {
+        return restoreWriter(initContext, null);
     }
 
     @Override
-    public Optional<SimpleVersionedSerializer<Void>> 
getWriterStateSerializer() {
-        return Optional.empty();
+    public StoreSinkWriter<WriterStateT> restoreWriter(
+            InitContext initContext, Collection<WriterStateT> states) throws 
IOException {
+        return new StoreSinkWriter<>(
+                fileStore.newWrite(),
+                new SinkRecordConverter(numBucket, rowType, partitions, keys),
+                fileCommitSerializer(),
+                overwritePartition != null);
     }
 
     @Override
-    public Optional<Committer<LocalCommittable>> createCommitter() {
-        return Optional.empty();
+    public SimpleVersionedSerializer<WriterStateT> getWriterStateSerializer() {
+        return new NoOutputSerializer<>();
     }
 
     @Override
-    public Optional<GlobalCommitter<LocalCommittable, ManifestCommittable>>
-            createGlobalCommitter() {
+    public StoreGlobalCommitter<LogCommT> createCommitter() throws IOException 
{
         FileStoreCommit commit = fileStore.newCommit();
         CatalogLock lock;
         if (lockFactory == null) {
@@ -120,22 +122,43 @@ public class StoreSink implements Sink<RowData, 
LocalCommittable, Void, Manifest
                         }
                     });
         }
-        return Optional.of(
-                new StoreGlobalCommitter(commit, fileStore.newExpire(), lock, 
overwritePartition));
+
+        return new StoreGlobalCommitter<>(
+                commit, fileStore.newExpire(), fileCommitSerializer(), lock, 
overwritePartition);
     }
 
     @Override
-    public Optional<SimpleVersionedSerializer<LocalCommittable>> 
getCommittableSerializer() {
-        return Optional.of(
-                new LocalCommittableSerializer(
-                        project(rowType, partitions), project(rowType, keys), 
rowType));
+    public SimpleVersionedSerializer<Committable> getCommittableSerializer() {
+        return CommittableSerializer.INSTANCE;
     }
 
     @Override
-    public Optional<SimpleVersionedSerializer<ManifestCommittable>>
-            getGlobalCommittableSerializer() {
-        return Optional.of(
+    public GlobalCommittableSerializer<LogCommT> 
getGlobalCommittableSerializer() {
+        ManifestCommittableSerializer fileCommSerializer =
                 new ManifestCommittableSerializer(
-                        project(rowType, partitions), project(rowType, keys), 
rowType));
+                        project(rowType, partitions), project(rowType, keys), 
rowType);
+        SimpleVersionedSerializer<LogCommT> logCommitSerializer = new 
NoOutputSerializer<>();
+        return new GlobalCommittableSerializer<>(logCommitSerializer, 
fileCommSerializer);
+    }
+
+    private FileCommittableSerializer fileCommitSerializer() {
+        return new FileCommittableSerializer(
+                project(rowType, partitions), project(rowType, keys), rowType);
+    }
+
+    private static class NoOutputSerializer<T> implements 
SimpleVersionedSerializer<T> {
+        private NoOutputSerializer() {}
+
+        public int getVersion() {
+            return 1;
+        }
+
+        public byte[] serialize(T obj) {
+            throw new IllegalStateException("Should not serialize anything");
+        }
+
+        public T deserialize(int version, byte[] serialized) {
+            throw new IllegalStateException("Should not deserialize anything");
+        }
     }
 }
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
index f30a88b..8cc88a1 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
@@ -19,7 +19,9 @@
 package org.apache.flink.table.store.connector.sink;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter;
+import 
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
@@ -28,10 +30,10 @@ import 
org.apache.flink.table.store.file.operation.FileStoreWrite;
 import org.apache.flink.table.store.file.utils.RecordWriter;
 import org.apache.flink.table.store.sink.SinkRecord;
 import org.apache.flink.table.store.sink.SinkRecordConverter;
-import org.apache.flink.types.RowKind;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -39,12 +41,16 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 /** A {@link SinkWriter} for dynamic store. */
-public class StoreSinkWriter implements SinkWriter<RowData, LocalCommittable, 
Void> {
+public class StoreSinkWriter<WriterStateT>
+        implements StatefulSinkWriter<RowData, WriterStateT>,
+                PrecommittingSinkWriter<RowData, Committable> {
 
     private final FileStoreWrite fileStoreWrite;
 
     private final SinkRecordConverter recordConverter;
 
+    private final FileCommittableSerializer fileCommitSerializer;
+
     private final boolean overwrite;
 
     private final ExecutorService compactExecutor;
@@ -52,9 +58,13 @@ public class StoreSinkWriter implements SinkWriter<RowData, 
LocalCommittable, Vo
     private final Map<BinaryRowData, Map<Integer, RecordWriter>> writers;
 
     public StoreSinkWriter(
-            FileStoreWrite fileStoreWrite, SinkRecordConverter 
recordConverter, boolean overwrite) {
+            FileStoreWrite fileStoreWrite,
+            SinkRecordConverter recordConverter,
+            FileCommittableSerializer fileCommitSerializer,
+            boolean overwrite) {
         this.fileStoreWrite = fileStoreWrite;
         this.recordConverter = recordConverter;
+        this.fileCommitSerializer = fileCommitSerializer;
         this.overwrite = overwrite;
         this.compactExecutor = Executors.newSingleThreadScheduledExecutor();
         this.writers = new HashMap<>();
@@ -76,8 +86,7 @@ public class StoreSinkWriter implements SinkWriter<RowData, 
LocalCommittable, Vo
     }
 
     @Override
-    public void write(RowData rowData, Context context) throws IOException {
-        RowKind rowKind = rowData.getRowKind();
+    public void write(RowData rowData, Context context) throws IOException, 
InterruptedException {
         SinkRecord record = recordConverter.convert(rowData);
         RecordWriter writer = getWriter(record.partition(), record.bucket());
         try {
@@ -85,7 +94,6 @@ public class StoreSinkWriter implements SinkWriter<RowData, 
LocalCommittable, Vo
         } catch (Exception e) {
             throw new IOException(e);
         }
-        rowData.setRowKind(rowKind);
     }
 
     private void writeToFileStore(RecordWriter writer, SinkRecord record) 
throws Exception {
@@ -110,23 +118,31 @@ public class StoreSinkWriter implements 
SinkWriter<RowData, LocalCommittable, Vo
     }
 
     @Override
-    public List<LocalCommittable> prepareCommit(boolean flush) throws 
IOException {
-        try {
-            return prepareCommit();
-        } catch (Exception e) {
-            throw new IOException(e);
-        }
+    public void flush(boolean endOfInput) {}
+
+    @Override
+    public List<WriterStateT> snapshotState(long checkpointId) throws 
IOException {
+        return Collections.emptyList();
     }
 
-    private List<LocalCommittable> prepareCommit() throws Exception {
-        List<LocalCommittable> committables = new ArrayList<>();
+    @Override
+    public List<Committable> prepareCommit() throws IOException {
+        List<Committable> committables = new ArrayList<>();
         for (BinaryRowData partition : writers.keySet()) {
             Map<Integer, RecordWriter> buckets = writers.get(partition);
             for (Integer bucket : buckets.keySet()) {
                 RecordWriter writer = buckets.get(bucket);
-                LocalCommittable committable =
-                        new LocalCommittable(partition, bucket, 
writer.prepareCommit());
-                committables.add(committable);
+                FileCommittable committable;
+                try {
+                    committable = new FileCommittable(partition, bucket, 
writer.prepareCommit());
+                } catch (Exception e) {
+                    throw new IOException(e);
+                }
+                committables.add(
+                        new Committable(
+                                Committable.Kind.FILE,
+                                fileCommitSerializer.serialize(committable),
+                                fileCommitSerializer.getVersion()));
 
                 // clear if no update
                 // we need a mechanism to clear writers, otherwise there will 
be more and more
@@ -141,12 +157,17 @@ public class StoreSinkWriter implements 
SinkWriter<RowData, LocalCommittable, Vo
                 writers.remove(partition);
             }
         }
+
         return committables;
     }
 
-    private void closeWriter(RecordWriter writer) throws Exception {
-        writer.sync();
-        writer.close();
+    private void closeWriter(RecordWriter writer) throws IOException {
+        try {
+            writer.sync();
+            writer.close();
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
     }
 
     @Override
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitter.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitter.java
new file mode 100644
index 0000000..90eee7b
--- /dev/null
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.table.store.connector.sink.global;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * The {@code GlobalCommitter} is responsible for creating and committing an 
aggregated committable,
+ * which we call global committable (see {@link #combine}).
+ *
+ * <p>The {@code GlobalCommitter} runs with parallelism equal to 1.
+ *
+ * @param <CommT> The type of information needed to commit data staged by the 
sink
+ * @param <GlobalCommT> The type of the aggregated committable
+ */
+public interface GlobalCommitter<CommT, GlobalCommT> extends AutoCloseable {
+
+    /** Find out which global committables need to be retried when recovering 
from the failure. */
+    List<GlobalCommT> filterRecoveredCommittables(List<GlobalCommT> 
globalCommittables)
+            throws IOException;
+
+    /** Compute an aggregated committable from a list of committables. */
+    GlobalCommT combine(long checkpointId, List<CommT> committables) throws 
IOException;
+
+    /** Commits the given {@link GlobalCommT}. */
+    void commit(List<GlobalCommT> globalCommittables) throws IOException, 
InterruptedException;
+}
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperator.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperator.java
new file mode 100644
index 0000000..ec48d27
--- /dev/null
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperator.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink.global;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** An operator that processes committables of a {@link Sink}. */
+public class GlobalCommitterOperator<CommT, GlobalCommT> extends 
AbstractStreamOperator<Void>
+        implements OneInputStreamOperator<CommittableMessage<CommT>, Void>, 
BoundedOneInput {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(GlobalCommitterOperator.class);
+
+    /** Record all the committables until commit. */
+    private final Deque<CommT> committables = new ArrayDeque<>();
+
+    /**
+     * Aggregate committables to global committables and commit the global 
committables to the
+     * external system.
+     */
+    private final GlobalCommitter<CommT, GlobalCommT> globalCommitter;
+
+    /** The operator's state descriptor. */
+    private static final ListStateDescriptor<byte[]> 
STREAMING_COMMITTER_RAW_STATES_DESC =
+            new ListStateDescriptor<>(
+                    "streaming_committer_raw_states", 
BytePrimitiveArraySerializer.INSTANCE);
+
+    /** Group the committable by the checkpoint id. */
+    private final NavigableMap<Long, GlobalCommT> committablesPerCheckpoint;
+
+    /** The committable's serializer. */
+    private final SimpleVersionedSerializer<GlobalCommT> committableSerializer;
+
+    /** The operator's state. */
+    private ListState<GlobalCommT> streamingCommitterState;
+
+    public GlobalCommitterOperator(
+            GlobalCommitter<CommT, GlobalCommT> globalCommitter,
+            SimpleVersionedSerializer<GlobalCommT> committableSerializer) {
+        this.globalCommitter = checkNotNull(globalCommitter);
+        this.committableSerializer = committableSerializer;
+        this.committablesPerCheckpoint = new TreeMap<>();
+    }
+
+    @Override
+    public void initializeState(StateInitializationContext context) throws 
Exception {
+        super.initializeState(context);
+        streamingCommitterState =
+                new SimpleVersionedListState<>(
+                        context.getOperatorStateStore()
+                                
.getListState(STREAMING_COMMITTER_RAW_STATES_DESC),
+                        committableSerializer);
+        List<GlobalCommT> restored = new ArrayList<>();
+        streamingCommitterState.get().forEach(restored::add);
+        streamingCommitterState.clear();
+        
globalCommitter.commit(globalCommitter.filterRecoveredCommittables(restored));
+    }
+
+    @Override
+    public void snapshotState(StateSnapshotContext context) throws Exception {
+        super.snapshotState(context);
+        List<CommT> committables = pollCommittables();
+        if (committables.size() > 0) {
+            committablesPerCheckpoint.put(
+                    context.getCheckpointId(),
+                    globalCommitter.combine(context.getCheckpointId(), 
committables));
+        }
+        streamingCommitterState.update(new 
ArrayList<>(committablesPerCheckpoint.values()));
+    }
+
+    @Override
+    public void endInput() throws Exception {
+        List<CommT> allCommittables = pollCommittables();
+        if (!allCommittables.isEmpty()) {
+            globalCommitter.commit(
+                    Collections.singletonList(
+                            globalCommitter.combine(Long.MAX_VALUE, 
allCommittables)));
+        }
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        super.notifyCheckpointComplete(checkpointId);
+        LOG.info("Committing the state for checkpoint {}", checkpointId);
+        NavigableMap<Long, GlobalCommT> headMap =
+                committablesPerCheckpoint.headMap(checkpointId, true);
+        globalCommitter.commit(new ArrayList<>(headMap.values()));
+        headMap.clear();
+    }
+
+    @Override
+    public void processElement(StreamRecord<CommittableMessage<CommT>> 
element) {
+        CommittableMessage<CommT> message = element.getValue();
+        if (message instanceof CommittableWithLineage) {
+            this.committables.add(((CommittableWithLineage<CommT>) 
message).getCommittable());
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        globalCommitter.close();
+        committablesPerCheckpoint.clear();
+        committables.clear();
+        super.close();
+    }
+
+    private List<CommT> pollCommittables() {
+        List<CommT> committables = new ArrayList<>(this.committables);
+        this.committables.clear();
+        return committables;
+    }
+}
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSink.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSink.java
new file mode 100644
index 0000000..66b4cd6
--- /dev/null
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSink.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink.global;
+
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import 
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+
+/**
+ * A {@link Sink} for exactly-once semantics using a two-phase commit 
protocol. The {@link Sink}
+ * consists of a {@link SinkWriter} that performs the precommits and a {@link 
GlobalCommitter} that
+ * actually commits the data.
+ *
+ * @param <InputT> The type of the sink's input
+ * @param <CommT> The type of the committables.
+ * @param <GlobalCommT> The type of the aggregated committable.
+ */
+public interface GlobalCommittingSink<InputT, CommT, GlobalCommT> extends 
Sink<InputT> {
+
+    /**
+     * Creates a {@link PrecommittingSinkWriter} that creates committables on 
checkpoint or end of
+     * input.
+     */
+    PrecommittingSinkWriter<InputT, CommT> createWriter(InitContext context) 
throws IOException;
+
+    /**
+     * Creates a {@link GlobalCommitter} that permanently makes the previously 
written data visible
+     * through {@link GlobalCommitter#commit}.
+     */
+    GlobalCommitter<CommT, GlobalCommT> createCommitter() throws IOException;
+
+    /** Returns the serializer of the committable type. */
+    SimpleVersionedSerializer<CommT> getCommittableSerializer();
+
+    /** Returns the serializer of the global committable type. */
+    SimpleVersionedSerializer<GlobalCommT> getGlobalCommittableSerializer();
+}
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSinkTranslator.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSinkTranslator.java
new file mode 100644
index 0000000..4d83149
--- /dev/null
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSinkTranslator.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink.global;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import 
org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
+
+import java.io.IOException;
+
+/** A translator for the {@link GlobalCommittingSink}. */
+public class GlobalCommittingSinkTranslator {
+
+    private static final String GLOBAL_COMMITTER_NAME = "Global Committer";
+
+    private static final String WRITER_NAME = "Writer";
+
+    public static <T, CommT, GlobalCommT> DataStreamSink<?> translate(
+            DataStream<T> input, GlobalCommittingSink<T, CommT, GlobalCommT> 
sink)
+            throws IOException {
+        TypeInformation<CommittableMessage<CommT>> commitType =
+                CommittableMessageTypeInfo.of(sink::getCommittableSerializer);
+        SingleOutputStreamOperator<CommittableMessage<CommT>> written =
+                input.transform(WRITER_NAME, commitType, new 
SinkWriterOperatorFactory<>(sink));
+
+        SingleOutputStreamOperator<Void> committed =
+                written.global()
+                        .transform(
+                                GLOBAL_COMMITTER_NAME,
+                                Types.VOID,
+                                new GlobalCommitterOperator<>(
+                                        sink.createCommitter(),
+                                        sink.getGlobalCommittableSerializer()))
+                        .setParallelism(1)
+                        .setMaxParallelism(1);
+        return committed.addSink(new 
DiscardingSink<>()).name("end").setParallelism(1);
+    }
+}
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java
new file mode 100644
index 0000000..2d59440
--- /dev/null
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link CommittableSerializer}. */
+public class CommittableSerializerTest {
+
+    @Test
+    public void test() {
+        byte[] bytes = new byte[] {4, 5, 1};
+        Committable committable = new Committable(Committable.Kind.LOG, bytes, 
9);
+        byte[] serialize = 
CommittableSerializer.INSTANCE.serialize(committable);
+        Committable deser = CommittableSerializer.INSTANCE.deserialize(1, 
serialize);
+        assertThat(deser.kind()).isEqualTo(Committable.Kind.LOG);
+        assertThat(deser.serializerVersion()).isEqualTo(9);
+        assertThat(deser.wrappedCommittable()).isEqualTo(bytes);
+    }
+}
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LocalCommittableSerializerTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializerTest.java
similarity index 84%
rename from 
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LocalCommittableSerializerTest.java
rename to 
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializerTest.java
index 6fd2ac3..5013529 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LocalCommittableSerializerTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializerTest.java
@@ -30,19 +30,19 @@ import static 
org.apache.flink.table.store.file.manifest.ManifestCommittableSeri
 import static 
org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Test for {@link LocalCommittableSerializer}. */
-public class LocalCommittableSerializerTest {
+/** Test for {@link FileCommittableSerializer}. */
+public class FileCommittableSerializerTest {
 
     @Test
     public void test() throws IOException {
-        LocalCommittableSerializer serializer =
-                new LocalCommittableSerializer(
+        FileCommittableSerializer serializer =
+                new FileCommittableSerializer(
                         RowType.of(new IntType()),
                         RowType.of(new IntType()),
                         RowType.of(new IntType()));
         Increment increment = randomIncrement();
-        LocalCommittable committable = new LocalCommittable(row(0), 1, 
increment);
-        LocalCommittable newCommittable =
+        FileCommittable committable = new FileCommittable(row(0), 1, 
increment);
+        FileCommittable newCommittable =
                 serializer.deserialize(1, serializer.serialize(committable));
         assertThat(newCommittable).isEqualTo(committable);
     }
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/GlobalCommittableSerializerTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/GlobalCommittableSerializerTest.java
new file mode 100644
index 0000000..31753c3
--- /dev/null
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/GlobalCommittableSerializerTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import 
org.apache.flink.table.store.file.manifest.ManifestCommittableSerializerTest;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link GlobalCommittableSerializer}. */
+public class GlobalCommittableSerializerTest {
+
+    @Test
+    public void test() throws IOException {
+        List<String> logs = Arrays.asList("1", "2");
+        GlobalCommittable<String> committable =
+                new GlobalCommittable<>(logs, 
ManifestCommittableSerializerTest.create());
+        GlobalCommittableSerializer<String> serializer =
+                new GlobalCommittableSerializer<>(
+                        new TestStringSerializer(), 
ManifestCommittableSerializerTest.serializer());
+        byte[] serialized = serializer.serialize(committable);
+        GlobalCommittable<String> deser = serializer.deserialize(1, 
serialized);
+        
assertThat(deser.logCommittables()).isEqualTo(committable.logCommittables());
+        
assertThat(deser.fileCommittable()).isEqualTo(committable.fileCommittable());
+    }
+
+    private static final class TestStringSerializer implements 
SimpleVersionedSerializer<String> {
+
+        private static final int VERSION = 1073741823;
+
+        private TestStringSerializer() {}
+
+        public int getVersion() {
+            return VERSION;
+        }
+
+        public byte[] serialize(String str) {
+            return str.getBytes(StandardCharsets.UTF_8);
+        }
+
+        public String deserialize(int version, byte[] serialized) {
+            assertThat(version).isEqualTo(VERSION);
+            return new String(serialized, StandardCharsets.UTF_8);
+        }
+    }
+}
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogOffsetCommittableTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogOffsetCommittableTest.java
new file mode 100644
index 0000000..bf1330d
--- /dev/null
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogOffsetCommittableTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link LogOffsetCommittable}. */
+public class LogOffsetCommittableTest {
+
+    @Test
+    public void test() {
+        LogOffsetCommittable committable = new LogOffsetCommittable(5, 9);
+        LogOffsetCommittable deser = 
LogOffsetCommittable.fromBytes(committable.toBytes());
+        assertThat(deser.bucket()).isEqualTo(committable.bucket());
+        assertThat(deser.offset()).isEqualTo(committable.offset());
+    }
+}
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
index e69ec17..f469ce6 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import 
org.apache.flink.table.store.connector.sink.TestFileStore.TestRecordWriter;
-import org.apache.flink.table.store.file.manifest.ManifestCommittable;
 import org.apache.flink.table.store.file.utils.RecordWriter;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
@@ -56,7 +55,7 @@ public class StoreSinkTest {
 
     @Test
     public void testChangelogs() throws Exception {
-        StoreSink sink = newSink(null);
+        StoreSink<?, ?> sink = newSink(null);
         writeAndCommit(
                 sink,
                 GenericRowData.ofKind(RowKind.INSERT, 0, 0, 1),
@@ -73,8 +72,8 @@ public class StoreSinkTest {
 
     @Test
     public void testNoKeyChangelogs() throws Exception {
-        StoreSink sink =
-                new StoreSink(
+        StoreSink<?, ?> sink =
+                new StoreSink<>(
                         identifier,
                         fileStore,
                         rowType,
@@ -99,7 +98,7 @@ public class StoreSinkTest {
 
     @Test
     public void testAppend() throws Exception {
-        StoreSink sink = newSink(null);
+        StoreSink<?, ?> sink = newSink(null);
         writeAndAssert(sink);
 
         writeAndCommit(sink, GenericRowData.of(0, 8, 9), GenericRowData.of(1, 
10, 11));
@@ -111,7 +110,7 @@ public class StoreSinkTest {
 
     @Test
     public void testOverwrite() throws Exception {
-        StoreSink sink = newSink(new HashMap<>());
+        StoreSink<?, ?> sink = newSink(new HashMap<>());
         writeAndAssert(sink);
 
         writeAndCommit(sink, GenericRowData.of(0, 8, 9), GenericRowData.of(1, 
10, 11));
@@ -126,7 +125,7 @@ public class StoreSinkTest {
     public void testOverwritePartition() throws Exception {
         HashMap<String, String> partition = new HashMap<>();
         partition.put("part", "0");
-        StoreSink sink = newSink(partition);
+        StoreSink<?, ?> sink = newSink(partition);
         writeAndAssert(sink);
 
         writeAndCommit(sink, GenericRowData.of(0, 8, 9), GenericRowData.of(1, 
10, 11));
@@ -138,7 +137,7 @@ public class StoreSinkTest {
                 .isEqualTo(Collections.singletonList("ADD-key-8-value-0/8/9"));
     }
 
-    private void writeAndAssert(StoreSink sink) throws Exception {
+    private void writeAndAssert(StoreSink<?, ?> sink) throws Exception {
         writeAndCommit(
                 sink,
                 GenericRowData.of(0, 0, 1),
@@ -153,17 +152,17 @@ public class StoreSinkTest {
                 .isEqualTo(Arrays.asList("ADD-key-0-value-0/0/1", 
"ADD-key-7-value-0/7/5"));
     }
 
-    private void writeAndCommit(StoreSink sink, RowData... rows) throws 
Exception {
+    private void writeAndCommit(StoreSink<?, ?> sink, RowData... rows) throws 
Exception {
         commit(sink, write(sink, rows));
     }
 
-    private List<LocalCommittable> write(StoreSink sink, RowData... rows) 
throws Exception {
-        StoreSinkWriter writer = sink.createWriter(null, null);
+    private List<Committable> write(StoreSink<?, ?> sink, RowData... rows) 
throws Exception {
+        StoreSinkWriter<?> writer = sink.createWriter(null);
         for (RowData row : rows) {
             writer.write(row, null);
         }
 
-        List<LocalCommittable> localCommittables = writer.prepareCommit(true);
+        List<Committable> committables = writer.prepareCommit();
         Map<BinaryRowData, Map<Integer, RecordWriter>> writers = new 
HashMap<>(writer.writers());
         assertThat(writers.size()).isGreaterThan(0);
 
@@ -176,12 +175,12 @@ public class StoreSinkTest {
                                     assertThat(testWriter.synced).isTrue();
                                     assertThat(testWriter.closed).isTrue();
                                 }));
-        return localCommittables;
+        return committables;
     }
 
-    private void commit(StoreSink sink, List<LocalCommittable> 
localCommittables) throws Exception {
-        StoreGlobalCommitter committer = (StoreGlobalCommitter) 
sink.createGlobalCommitter().get();
-        ManifestCommittable committable = committer.combine(localCommittables);
+    private void commit(StoreSink<?, ?> sink, List<Committable> 
fileCommittables) throws Exception {
+        StoreGlobalCommitter committer = sink.createCommitter();
+        GlobalCommittable<?> committable = committer.combine(0, 
fileCommittables);
 
         fileStore.expired = false;
         lock.locked = false;
@@ -200,8 +199,8 @@ public class StoreSinkTest {
         assertThat(lock.closed).isTrue();
     }
 
-    private StoreSink newSink(Map<String, String> overwritePartition) {
-        return new StoreSink(
+    private StoreSink<?, ?> newSink(Map<String, String> overwritePartition) {
+        return new StoreSink<>(
                 identifier,
                 fileStore,
                 rowType,
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
index 87eee11..e31ffdc 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.table.store.file.operation.FileStoreRead;
 import org.apache.flink.table.store.file.operation.FileStoreScan;
 import org.apache.flink.table.store.file.operation.FileStoreWrite;
 import org.apache.flink.table.store.file.operation.Lock;
+import org.apache.flink.table.store.file.stats.FieldStats;
 import org.apache.flink.table.store.file.utils.RecordWriter;
 
 import java.util.ArrayList;
@@ -130,7 +131,22 @@ public class TestFileStore implements FileStore {
         public Increment prepareCommit() {
             List<SstFileMeta> newFiles =
                     records.stream()
-                            .map(s -> new SstFileMeta(s, 0, 0, null, null, 
null, 0, 0, 0))
+                            .map(
+                                    s ->
+                                            new SstFileMeta(
+                                                    s,
+                                                    0,
+                                                    0,
+                                                    null,
+                                                    null,
+                                                    new FieldStats[] {
+                                                        new FieldStats(null, 
null, 0),
+                                                        new FieldStats(null, 
null, 0),
+                                                        new FieldStats(null, 
null, 0)
+                                                    },
+                                                    0,
+                                                    0,
+                                                    0))
                             .collect(Collectors.toList());
             return new Increment(newFiles, Collections.emptyList(), 
Collections.emptyList());
         }
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperatorTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperatorTest.java
new file mode 100644
index 0000000..77176ef
--- /dev/null
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperatorTest.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink.global;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import 
org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import 
org.apache.flink.streaming.runtime.operators.sink.TestSink.StringCommittableSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.function.SerializableSupplier;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link GlobalCommitterOperator}. */
+public class GlobalCommitterOperatorTest {
+
+    @Test
+    public void closeCommitter() throws Exception {
+        final DefaultGlobalCommitter globalCommitter = new 
DefaultGlobalCommitter();
+        final OneInputStreamOperatorTestHarness<CommittableMessage<String>, 
Void> testHarness =
+                createTestHarness(globalCommitter);
+        testHarness.initializeEmptyState();
+        testHarness.open();
+        testHarness.close();
+        assertThat(globalCommitter.isClosed()).isTrue();
+    }
+
+    @Test
+    public void restoredFromMergedState() throws Exception {
+
+        final List<String> input1 = Arrays.asList("host", "drop");
+        final OperatorSubtaskState operatorSubtaskState1 =
+                buildSubtaskState(createTestHarness(), input1);
+
+        final List<String> input2 = Arrays.asList("future", "evil", "how");
+        final OperatorSubtaskState operatorSubtaskState2 =
+                buildSubtaskState(createTestHarness(), input2);
+
+        final DefaultGlobalCommitter globalCommitter = new 
DefaultGlobalCommitter();
+        final OneInputStreamOperatorTestHarness<CommittableMessage<String>, 
Void> testHarness =
+                createTestHarness(globalCommitter);
+
+        final OperatorSubtaskState mergedOperatorSubtaskState =
+                OneInputStreamOperatorTestHarness.repackageState(
+                        operatorSubtaskState1, operatorSubtaskState2);
+
+        testHarness.initializeState(
+                OneInputStreamOperatorTestHarness.repartitionOperatorState(
+                        mergedOperatorSubtaskState, 2, 2, 1, 0));
+        testHarness.open();
+
+        final List<String> expectedOutput = new ArrayList<>();
+        expectedOutput.add(DefaultGlobalCommitter.COMBINER.apply(input1));
+        expectedOutput.add(DefaultGlobalCommitter.COMBINER.apply(input2));
+
+        testHarness.snapshot(1L, 1L);
+        testHarness.notifyOfCompletedCheckpoint(1L);
+        testHarness.close();
+
+        assertThat(globalCommitter.getCommittedData())
+                .containsExactlyInAnyOrder(expectedOutput.toArray(new 
String[0]));
+    }
+
+    @Test
+    public void commitMultipleStagesTogether() throws Exception {
+
+        final DefaultGlobalCommitter globalCommitter = new 
DefaultGlobalCommitter();
+
+        final List<String> input1 = Arrays.asList("cautious", "nature");
+        final List<String> input2 = Arrays.asList("count", "over");
+        final List<String> input3 = Arrays.asList("lawyer", "grammar");
+
+        final List<String> expectedOutput = new ArrayList<>();
+
+        expectedOutput.add(DefaultGlobalCommitter.COMBINER.apply(input1));
+        expectedOutput.add(DefaultGlobalCommitter.COMBINER.apply(input2));
+        expectedOutput.add(DefaultGlobalCommitter.COMBINER.apply(input3));
+
+        final OneInputStreamOperatorTestHarness<CommittableMessage<String>, 
Void> testHarness =
+                createTestHarness(globalCommitter);
+        testHarness.initializeEmptyState();
+        testHarness.open();
+
+        testHarness.processElements(committableRecords(input1));
+        testHarness.snapshot(1L, 1L);
+        testHarness.processElements(committableRecords(input2));
+        testHarness.snapshot(2L, 2L);
+        testHarness.processElements(committableRecords(input3));
+        testHarness.snapshot(3L, 3L);
+
+        testHarness.notifyOfCompletedCheckpoint(3L);
+
+        testHarness.close();
+
+        assertThat(globalCommitter.getCommittedData())
+                .containsExactlyInAnyOrder(expectedOutput.toArray(new 
String[0]));
+    }
+
+    @Test
+    public void filterRecoveredCommittables() throws Exception {
+        final List<String> input = Arrays.asList("silent", "elder", 
"patience");
+        final String successCommittedCommittable = 
DefaultGlobalCommitter.COMBINER.apply(input);
+
+        final OperatorSubtaskState operatorSubtaskState =
+                buildSubtaskState(createTestHarness(), input);
+        final DefaultGlobalCommitter globalCommitter =
+                new DefaultGlobalCommitter(successCommittedCommittable);
+
+        final OneInputStreamOperatorTestHarness<CommittableMessage<String>, 
Void> testHarness =
+                createTestHarness(globalCommitter);
+
+        // all data from previous checkpoint are expected to be committed,
+        // so we expect no data to be re-committed.
+        testHarness.initializeState(operatorSubtaskState);
+        testHarness.open();
+        testHarness.snapshot(1L, 1L);
+        testHarness.notifyOfCompletedCheckpoint(1L);
+        assertThat(globalCommitter.getCommittedData().isEmpty()).isTrue();
+        testHarness.close();
+    }
+
+    @Test
+    public void endOfInput() throws Exception {
+        final DefaultGlobalCommitter globalCommitter = new 
DefaultGlobalCommitter();
+
+        final OneInputStreamOperatorTestHarness<CommittableMessage<String>, 
Void> testHarness =
+                createTestHarness(globalCommitter);
+        testHarness.initializeEmptyState();
+        testHarness.open();
+        List<String> input = Arrays.asList("silent", "elder", "patience");
+        testHarness.processElements(committableRecords(input));
+        testHarness.endInput();
+        testHarness.close();
+        
assertThat(globalCommitter.getCommittedData()).contains("elder+patience+silent");
+    }
+
+    private OneInputStreamOperatorTestHarness<CommittableMessage<String>, 
Void> createTestHarness()
+            throws Exception {
+        return createTestHarness(new DefaultGlobalCommitter());
+    }
+
+    private OneInputStreamOperatorTestHarness<CommittableMessage<String>, 
Void> createTestHarness(
+            GlobalCommitter<String, String> globalCommitter) throws Exception {
+        return new OneInputStreamOperatorTestHarness<>(
+                new GlobalCommitterOperator<>(
+                        globalCommitter, StringCommittableSerializer.INSTANCE),
+                CommittableMessageTypeInfo.of(
+                                
(SerializableSupplier<SimpleVersionedSerializer<String>>)
+                                        () -> 
StringCommittableSerializer.INSTANCE)
+                        .createSerializer(new ExecutionConfig()));
+    }
+
+    public static OperatorSubtaskState buildSubtaskState(
+            OneInputStreamOperatorTestHarness<CommittableMessage<String>, 
Void> testHarness,
+            List<String> input)
+            throws Exception {
+        testHarness.initializeEmptyState();
+        testHarness.open();
+        testHarness.processElements(
+                input.stream()
+                        .map(GlobalCommitterOperatorTest::toCommittableMessage)
+                        .map(StreamRecord::new)
+                        .collect(Collectors.toList()));
+        testHarness.prepareSnapshotPreBarrier(1L);
+        OperatorSubtaskState operatorSubtaskState = testHarness.snapshot(1L, 
1L);
+        testHarness.close();
+        return operatorSubtaskState;
+    }
+
+    private static List<StreamRecord<CommittableMessage<String>>> 
committableRecords(
+            Collection<String> elements) {
+        return elements.stream()
+                .map(GlobalCommitterOperatorTest::toCommittableMessage)
+                .map(StreamRecord::new)
+                .collect(Collectors.toList());
+    }
+
+    private static CommittableMessage<String> toCommittableMessage(String 
input) {
+        return new CommittableWithLineage<>(input, null, -1);
+    }
+
+    /** A {@link GlobalCommitter} that always commits global committables 
successfully. */
+    private static class DefaultGlobalCommitter implements 
GlobalCommitter<String, String> {
+
+        private static final Function<List<String>, String> COMBINER =
+                strings -> {
+                    // we sort here because we want to have a deterministic 
result during the unit
+                    // test
+                    Collections.sort(strings);
+                    return String.join("+", strings);
+                };
+
+        private final Queue<String> committedData;
+
+        private boolean isClosed;
+
+        private final String committedSuccessData;
+
+        DefaultGlobalCommitter() {
+            this("");
+        }
+
+        DefaultGlobalCommitter(String committedSuccessData) {
+            this.committedData = new ConcurrentLinkedQueue<>();
+            this.isClosed = false;
+            this.committedSuccessData = committedSuccessData;
+        }
+
+        @Override
+        public List<String> filterRecoveredCommittables(List<String> 
globalCommittables) {
+            if (committedSuccessData == null) {
+                return globalCommittables;
+            }
+            return globalCommittables.stream()
+                    .filter(s -> !s.equals(committedSuccessData))
+                    .collect(Collectors.toList());
+        }
+
+        @Override
+        public String combine(long checkpointId, List<String> committables) {
+            return COMBINER.apply(committables);
+        }
+
+        @Override
+        public void commit(List<String> committables) {
+            committedData.addAll(committables);
+        }
+
+        public List<String> getCommittedData() {
+            if (committedData != null) {
+                return new ArrayList<>(committedData);
+            } else {
+                return Collections.emptyList();
+            }
+        }
+
+        @Override
+        public void close() {
+            isClosed = true;
+        }
+
+        public boolean isClosed() {
+            return isClosed;
+        }
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
index 9b9085b..e30286d 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
@@ -37,7 +37,7 @@ public class Snapshot {
     private static final String FIELD_ID = "id";
     private static final String FIELD_MANIFEST_LIST = "manifestList";
     private static final String FIELD_COMMIT_USER = "commitUser";
-    private static final String FIELD_COMMIT_UUID = "commitUuid";
+    private static final String FIELD_COMMIT_IDENTIFIER = "commitIdentifier";
     private static final String FIELD_COMMIT_KIND = "commitKind";
     private static final String FIELD_TIME_MILLIS = "timeMillis";
 
@@ -51,8 +51,8 @@ public class Snapshot {
     private final String commitUser;
 
     // for deduplication
-    @JsonProperty(FIELD_COMMIT_UUID)
-    private final String commitUuid;
+    @JsonProperty(FIELD_COMMIT_IDENTIFIER)
+    private final String commitIdentifier;
 
     @JsonProperty(FIELD_COMMIT_KIND)
     private final CommitKind commitKind;
@@ -65,13 +65,13 @@ public class Snapshot {
             @JsonProperty(FIELD_ID) long id,
             @JsonProperty(FIELD_MANIFEST_LIST) String manifestList,
             @JsonProperty(FIELD_COMMIT_USER) String commitUser,
-            @JsonProperty(FIELD_COMMIT_UUID) String commitUuid,
+            @JsonProperty(FIELD_COMMIT_IDENTIFIER) String commitIdentifier,
             @JsonProperty(FIELD_COMMIT_KIND) CommitKind commitKind,
             @JsonProperty(FIELD_TIME_MILLIS) long timeMillis) {
         this.id = id;
         this.manifestList = manifestList;
         this.commitUser = commitUser;
-        this.commitUuid = commitUuid;
+        this.commitIdentifier = commitIdentifier;
         this.commitKind = commitKind;
         this.timeMillis = timeMillis;
     }
@@ -91,9 +91,9 @@ public class Snapshot {
         return commitUser;
     }
 
-    @JsonGetter(FIELD_COMMIT_UUID)
-    public String commitUuid() {
-        return commitUuid;
+    @JsonGetter(FIELD_COMMIT_IDENTIFIER)
+    public String commitIdentifier() {
+        return commitIdentifier;
     }
 
     @JsonGetter(FIELD_COMMIT_KIND)
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java
index 765fe1a..a461c2d 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java
@@ -27,37 +27,52 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.UUID;
 
 /** Manifest commit message. */
 public class ManifestCommittable {
 
-    private final String uuid;
+    private final String identifier;
+    private final Map<Integer, Long> logOffsets;
     private final Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> newFiles;
     private final Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> 
compactBefore;
     private final Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> 
compactAfter;
 
-    public ManifestCommittable() {
-        this(UUID.randomUUID().toString(), new HashMap<>(), new HashMap<>(), 
new HashMap<>());
+    public ManifestCommittable(String identifier) {
+        this.identifier = identifier;
+        this.logOffsets = new HashMap<>();
+        this.newFiles = new HashMap<>();
+        this.compactBefore = new HashMap<>();
+        this.compactAfter = new HashMap<>();
     }
 
     public ManifestCommittable(
-            String uuid,
+            String identifier,
+            Map<Integer, Long> logOffsets,
             Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> newFiles,
             Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> compactBefore,
             Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> compactAfter) {
-        this.uuid = uuid;
+        this.identifier = identifier;
+        this.logOffsets = logOffsets;
         this.newFiles = newFiles;
         this.compactBefore = compactBefore;
         this.compactAfter = compactAfter;
     }
 
-    public void add(BinaryRowData partition, int bucket, Increment increment) {
+    public void addFileCommittable(BinaryRowData partition, int bucket, 
Increment increment) {
         addFiles(newFiles, partition, bucket, increment.newFiles());
         addFiles(compactBefore, partition, bucket, increment.compactBefore());
         addFiles(compactAfter, partition, bucket, increment.compactAfter());
     }
 
+    public void addLogOffset(int bucket, long offset) {
+        if (logOffsets.containsKey(bucket)) {
+            throw new RuntimeException(
+                    String.format(
+                            "bucket-%d appears multiple times, which is not 
possible.", bucket));
+        }
+        logOffsets.put(bucket, offset);
+    }
+
     private static void addFiles(
             Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> map,
             BinaryRowData partition,
@@ -68,8 +83,12 @@ public class ManifestCommittable {
                 .addAll(files);
     }
 
-    public String uuid() {
-        return uuid;
+    public String identifier() {
+        return identifier;
+    }
+
+    public Map<Integer, Long> logOffsets() {
+        return logOffsets;
     }
 
     public Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> newFiles() {
@@ -93,7 +112,8 @@ public class ManifestCommittable {
             return false;
         }
         ManifestCommittable that = (ManifestCommittable) o;
-        return Objects.equals(uuid, that.uuid)
+        return Objects.equals(identifier, that.identifier)
+                && Objects.equals(logOffsets, that.logOffsets)
                 && Objects.equals(newFiles, that.newFiles)
                 && Objects.equals(compactBefore, that.compactBefore)
                 && Objects.equals(compactAfter, that.compactAfter);
@@ -101,19 +121,23 @@ public class ManifestCommittable {
 
     @Override
     public int hashCode() {
-        return Objects.hash(uuid, newFiles, compactBefore, compactAfter);
+        return Objects.hash(identifier, logOffsets, newFiles, compactBefore, 
compactAfter);
     }
 
     @Override
     public String toString() {
-        return "uuid: "
-                + uuid
-                + "\nnew files:\n"
+        return "ManifestCommittable{"
+                + "identifier="
+                + identifier
+                + ", logOffsets="
+                + logOffsets
+                + ", newFiles="
                 + filesToString(newFiles)
-                + "compact before:\n"
+                + ", compactBefore="
                 + filesToString(compactBefore)
-                + "compact after:\n"
-                + filesToString(compactAfter);
+                + ", compactAfter="
+                + filesToString(compactAfter)
+                + '}';
     }
 
     private static String filesToString(Map<BinaryRowData, Map<Integer, 
List<SstFileMeta>>> files) {
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java
index 81e4e1f..7ceb607 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java
@@ -55,13 +55,32 @@ public class ManifestCommittableSerializer
     public byte[] serialize(ManifestCommittable obj) throws IOException {
         ByteArrayOutputStream out = new ByteArrayOutputStream();
         DataOutputViewStreamWrapper view = new 
DataOutputViewStreamWrapper(out);
-        view.writeUTF(obj.uuid());
+        view.writeUTF(obj.identifier());
+        serializeOffsets(view, obj.logOffsets());
         serializeFiles(view, obj.newFiles());
         serializeFiles(view, obj.compactBefore());
         serializeFiles(view, obj.compactAfter());
         return out.toByteArray();
     }
 
+    private void serializeOffsets(DataOutputViewStreamWrapper view, 
Map<Integer, Long> offsets)
+            throws IOException {
+        view.writeInt(offsets.size());
+        for (Map.Entry<Integer, Long> entry : offsets.entrySet()) {
+            view.writeInt(entry.getKey());
+            view.writeLong(entry.getValue());
+        }
+    }
+
+    private Map<Integer, Long> deserializeOffsets(DataInputDeserializer view) 
throws IOException {
+        int size = view.readInt();
+        Map<Integer, Long> offsets = new HashMap<>(size);
+        for (int i = 0; i < size; i++) {
+            offsets.put(view.readInt(), view.readLong());
+        }
+        return offsets;
+    }
+
     private void serializeFiles(
             DataOutputViewStreamWrapper view,
             Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> files)
@@ -107,6 +126,7 @@ public class ManifestCommittableSerializer
         DataInputDeserializer view = new DataInputDeserializer(serialized);
         return new ManifestCommittable(
                 view.readUTF(),
+                deserializeOffsets(view),
                 deserializeFiles(view),
                 deserializeFiles(view),
                 deserializeFiles(view));
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index cb854ef..15129b3 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -115,18 +115,18 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
             return committableList;
         }
 
-        // check if a committable is already committed by its uuid
-        Map<String, ManifestCommittable> uuids = new LinkedHashMap<>();
+        // check if a committable is already committed by its identifier
+        Map<String, ManifestCommittable> identifiers = new LinkedHashMap<>();
         for (ManifestCommittable committable : committableList) {
-            uuids.put(committable.uuid(), committable);
+            identifiers.put(committable.identifier(), committable);
         }
 
         for (long id = latestSnapshotId; id >= Snapshot.FIRST_SNAPSHOT_ID; 
id--) {
             Path snapshotPath = pathFactory.toSnapshotPath(id);
             Snapshot snapshot = Snapshot.fromPath(snapshotPath);
             if (commitUser.equals(snapshot.commitUser())) {
-                if (uuids.containsKey(snapshot.commitUuid())) {
-                    uuids.remove(snapshot.commitUuid());
+                if (identifiers.containsKey(snapshot.commitIdentifier())) {
+                    identifiers.remove(snapshot.commitIdentifier());
                 } else {
                     // early exit, because committableList must be the latest 
commits by this
                     // commit user
@@ -135,7 +135,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             }
         }
 
-        return new ArrayList<>(uuids.values());
+        return new ArrayList<>(identifiers.values());
     }
 
     @Override
@@ -145,13 +145,13 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         }
 
         List<ManifestEntry> appendChanges = 
collectChanges(committable.newFiles(), ValueKind.ADD);
-        tryCommit(appendChanges, committable.uuid(), 
Snapshot.CommitKind.APPEND, false);
+        tryCommit(appendChanges, committable.identifier(), 
Snapshot.CommitKind.APPEND, false);
 
         List<ManifestEntry> compactChanges = new ArrayList<>();
         compactChanges.addAll(collectChanges(committable.compactBefore(), 
ValueKind.DELETE));
         compactChanges.addAll(collectChanges(committable.compactAfter(), 
ValueKind.ADD));
         if (!compactChanges.isEmpty()) {
-            tryCommit(compactChanges, committable.uuid(), 
Snapshot.CommitKind.COMPACT, true);
+            tryCommit(compactChanges, committable.identifier(), 
Snapshot.CommitKind.COMPACT, true);
         }
     }
 
@@ -183,13 +183,16 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         }
         // overwrite new files
         tryOverwrite(
-                partitionFilter, appendChanges, committable.uuid(), 
Snapshot.CommitKind.APPEND);
+                partitionFilter,
+                appendChanges,
+                committable.identifier(),
+                Snapshot.CommitKind.APPEND);
 
         List<ManifestEntry> compactChanges = new ArrayList<>();
         compactChanges.addAll(collectChanges(committable.compactBefore(), 
ValueKind.DELETE));
         compactChanges.addAll(collectChanges(committable.compactAfter(), 
ValueKind.ADD));
         if (!compactChanges.isEmpty()) {
-            tryCommit(compactChanges, committable.uuid(), 
Snapshot.CommitKind.COMPACT, true);
+            tryCommit(compactChanges, committable.identifier(), 
Snapshot.CommitKind.COMPACT, true);
         }
     }
 
@@ -209,7 +212,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
     private void tryOverwrite(
             Predicate partitionFilter,
             List<ManifestEntry> changes,
-            String hash,
+            String identifier,
             Snapshot.CommitKind commitKind) {
         while (true) {
             Long latestSnapshotId = pathFactory.latestSnapshotId();
@@ -233,7 +236,8 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             }
             changesWithOverwrite.addAll(changes);
 
-            if (tryCommitOnce(changesWithOverwrite, hash, commitKind, 
latestSnapshotId, false)) {
+            if (tryCommitOnce(
+                    changesWithOverwrite, identifier, commitKind, 
latestSnapshotId, false)) {
                 break;
             }
         }
@@ -264,7 +268,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
 
     private boolean tryCommitOnce(
             List<ManifestEntry> changes,
-            String hash,
+            String identifier,
             Snapshot.CommitKind commitKind,
             Long latestSnapshotId,
             boolean checkDeletedFiles) {
@@ -311,7 +315,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                             newSnapshotId,
                             manifestListName,
                             commitUser,
-                            hash,
+                            identifier,
                             commitKind,
                             System.currentTimeMillis());
             FileUtils.writeFileUtf8(tmpSnapshotPath, newSnapshot.toJson());
@@ -325,7 +329,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                             newSnapshotId,
                             newSnapshotPath.toString(),
                             commitUser,
-                            hash,
+                            identifier,
                             commitKind.name()),
                     e);
         }
@@ -354,12 +358,12 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
             throw new RuntimeException(
                     String.format(
                             "Exception occurs when committing snapshot #%d 
(path %s) by user %s "
-                                    + "with hash %s and kind %s. "
+                                    + "with identifier %s and kind %s. "
                                     + "Cannot clean up because we can't 
determine the success.",
                             newSnapshotId,
                             newSnapshotPath.toString(),
                             commitUser,
-                            hash,
+                            identifier,
                             commitKind.name()),
                     e);
         }
@@ -372,12 +376,12 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         LOG.warn(
                 String.format(
                         "Atomic rename failed for snapshot #%d (path %s) by 
user %s "
-                                + "with hash %s and kind %s. "
+                                + "with identifier %s and kind %s. "
                                 + "Clean up and try again.",
                         newSnapshotId,
                         newSnapshotPath.toString(),
                         commitUser,
-                        hash,
+                        identifier,
                         commitKind.name()));
         cleanUpTmpSnapshot(tmpSnapshotPath, manifestListName, oldMetas, 
newMetas);
         return false;
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
index 4ebbf85..10b06c3 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
@@ -29,6 +29,7 @@ import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static 
org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
@@ -41,30 +42,43 @@ public class ManifestCommittableSerializerTest {
 
     @Test
     public void testCommittableSerDe() throws IOException {
-        ManifestCommittableSerializer serializer =
-                new ManifestCommittableSerializer(
-                        RowType.of(new IntType()),
-                        RowType.of(new IntType()),
-                        RowType.of(new IntType()));
-        ManifestCommittable committable = new ManifestCommittable();
+        ManifestCommittableSerializer serializer = serializer();
+        ManifestCommittable committable = create();
+        byte[] serialized = serializer.serialize(committable);
+        assertThat(serializer.deserialize(1, 
serialized)).isEqualTo(committable);
+    }
+
+    public static ManifestCommittableSerializer serializer() {
+        return new ManifestCommittableSerializer(
+                RowType.of(new IntType()), RowType.of(new IntType()), 
RowType.of(new IntType()));
+    }
+
+    public static ManifestCommittable create() {
+        ManifestCommittable committable =
+                new ManifestCommittable(String.valueOf(new 
Random().nextLong()));
         addAndAssert(committable, row(0), 0);
         addAndAssert(committable, row(0), 1);
         addAndAssert(committable, row(1), 0);
         addAndAssert(committable, row(1), 1);
-        byte[] serialized = serializer.serialize(committable);
-        assertThat(serializer.deserialize(1, 
serialized)).isEqualTo(committable);
+        return committable;
     }
 
-    private void addAndAssert(
+    private static void addAndAssert(
             ManifestCommittable committable, BinaryRowData partition, int 
bucket) {
         Increment increment = randomIncrement();
-        committable.add(partition, bucket, increment);
+        committable.addFileCommittable(partition, bucket, increment);
         assertThat(committable.newFiles().get(partition).get(bucket))
                 .isEqualTo(increment.newFiles());
         assertThat(committable.compactBefore().get(partition).get(bucket))
                 .isEqualTo(increment.compactBefore());
         assertThat(committable.compactAfter().get(partition).get(bucket))
                 .isEqualTo(increment.compactAfter());
+
+        if (!committable.logOffsets().containsKey(bucket)) {
+            int offset = ID.incrementAndGet();
+            committable.addLogOffset(bucket, offset);
+            assertThat(committable.logOffsets().get(bucket)).isEqualTo(offset);
+        }
     }
 
     public static Increment randomIncrement() {
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java
index 8b1941e..07a27d3 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java
@@ -46,6 +46,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -224,13 +225,15 @@ public class OperationTestUtils {
         }
 
         FileStoreCommit commit = createCommit(fileFormat, pathFactory);
-        ManifestCommittable committable = new ManifestCommittable();
+        ManifestCommittable committable =
+                new ManifestCommittable(String.valueOf(new 
Random().nextLong()));
         for (Map.Entry<BinaryRowData, Map<Integer, RecordWriter>> 
entryWithPartition :
                 writers.entrySet()) {
             for (Map.Entry<Integer, RecordWriter> entryWithBucket :
                     entryWithPartition.getValue().entrySet()) {
                 Increment increment = 
entryWithBucket.getValue().prepareCommit();
-                committable.add(entryWithPartition.getKey(), 
entryWithBucket.getKey(), increment);
+                committable.addFileCommittable(
+                        entryWithPartition.getKey(), entryWithBucket.getKey(), 
increment);
             }
         }
 
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
index 8033b40..43fb2e9 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
@@ -35,6 +35,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadLocalRandom;
@@ -99,9 +100,10 @@ public class TestCommitThread extends Thread {
         for (int i = 0; i < numWrites && !data.isEmpty(); i++) {
             writeData();
         }
-        ManifestCommittable committable = new ManifestCommittable();
+        ManifestCommittable committable =
+                new ManifestCommittable(String.valueOf(new 
Random().nextLong()));
         for (Map.Entry<BinaryRowData, MergeTreeWriter> entry : 
writers.entrySet()) {
-            committable.add(entry.getKey(), 0, 
entry.getValue().prepareCommit());
+            committable.addFileCommittable(entry.getKey(), 0, 
entry.getValue().prepareCommit());
         }
 
         runWithRetry(committable, () -> commit.commit(committable, 
Collections.emptyMap()));
@@ -109,8 +111,9 @@ public class TestCommitThread extends Thread {
 
     private void doOverwrite() throws Exception {
         BinaryRowData partition = overwriteData();
-        ManifestCommittable committable = new ManifestCommittable();
-        committable.add(partition, 0, writers.get(partition).prepareCommit());
+        ManifestCommittable committable =
+                new ManifestCommittable(String.valueOf(new 
Random().nextLong()));
+        committable.addFileCommittable(partition, 0, 
writers.get(partition).prepareCommit());
 
         runWithRetry(
                 committable,

Reply via email to