This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f91a85  [FLINK-26423] Integrate log store to StoreSink
7f91a85 is described below

commit 7f91a853e70bb558fd2e2aa619c83558b9bccaa7
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Mar 8 14:47:41 2022 +0800

    [FLINK-26423] Integrate log store to StoreSink
    
    This closes #28
---
 flink-table-store-connector/pom.xml                |  14 ++
 .../table/store/connector/sink/Committable.java    |  13 +-
 .../connector/sink/CommittableSerializer.java      |  57 ++++-
 .../store/connector/sink/GlobalCommittable.java    |  44 ----
 .../sink/GlobalCommittableSerializer.java          |  94 --------
 .../store/connector/sink/LogOffsetCommittable.java |  18 ++
 .../store/connector/sink/StoreGlobalCommitter.java |  49 ++--
 .../store/connector/sink/StoreLocalCommitter.java  |  69 ++++++
 .../table/store/connector/sink/StoreSink.java      |  93 ++++++--
 .../store/connector/sink/StoreSinkWriter.java      |  64 +++++-
 ...perator.java => AbstractCommitterOperator.java} |  89 ++++---
 .../sink/global/GlobalCommitterOperator.java       | 112 ++-------
 .../sink/global/GlobalCommittingSink.java          |   6 -
 .../global/GlobalCommittingSinkTranslator.java     |  46 ++--
 .../sink/global/LocalCommitterOperator.java        | 205 +++++++++++++++++
 .../table/store/connector/FileStoreITCase.java     |  31 ++-
 .../table/store/connector/FiniteTestSource.java    | 156 +++++++++++++
 .../connector/sink/CommittableSerializerTest.java  |  71 +++++-
 .../sink/GlobalCommittableSerializerTest.java      |  69 ------
 .../store/connector/sink/LogStoreSinkITCase.java   | 168 ++++++++++++++
 .../table/store/connector/sink/StoreSinkTest.java  |  15 +-
 .../sink/global/GlobalCommitterOperatorTest.java   |  18 +-
 .../sink/global/LocalCommitterOperatorTest.java    | 255 +++++++++++++++++++++
 .../flink/table/store/log/LogWriteCallback.java    |  48 ++++
 .../flink/table/store/kafka/KafkaLogTestUtils.java |  28 ++-
 25 files changed, 1346 insertions(+), 486 deletions(-)

diff --git a/flink-table-store-connector/pom.xml 
b/flink-table-store-connector/pom.xml
index 0b3e178..e4e8f38 100644
--- a/flink-table-store-connector/pom.xml
+++ b/flink-table-store-connector/pom.xml
@@ -211,6 +211,20 @@ under the License.
                 </exclusion>
             </exclusions>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-kafka</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-json</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/Committable.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/Committable.java
index b4ba992..cea1b35 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/Committable.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/Committable.java
@@ -23,28 +23,21 @@ public class Committable {
 
     private final Kind kind;
 
-    private final byte[] wrappedCommittable;
+    private final Object wrappedCommittable;
 
-    private final int serializerVersion;
-
-    public Committable(Kind kind, byte[] wrappedCommittable, int 
serializerVersion) {
+    public Committable(Kind kind, Object wrappedCommittable) {
         this.kind = kind;
         this.wrappedCommittable = wrappedCommittable;
-        this.serializerVersion = serializerVersion;
     }
 
     public Kind kind() {
         return kind;
     }
 
-    public byte[] wrappedCommittable() {
+    public Object wrappedCommittable() {
         return wrappedCommittable;
     }
 
-    public int serializerVersion() {
-        return serializerVersion;
-    }
-
     enum Kind {
         FILE((byte) 0),
 
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableSerializer.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableSerializer.java
index 82627ea..9593862 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableSerializer.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableSerializer.java
@@ -20,12 +20,22 @@ package org.apache.flink.table.store.connector.sink;
 
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 /** {@link SimpleVersionedSerializer} for {@link Committable}. */
 public class CommittableSerializer implements 
SimpleVersionedSerializer<Committable> {
 
-    public static final CommittableSerializer INSTANCE = new 
CommittableSerializer();
+    private final FileCommittableSerializer fileCommittableSerializer;
+
+    private final SimpleVersionedSerializer<Object> logCommittableSerializer;
+
+    public CommittableSerializer(
+            FileCommittableSerializer fileCommittableSerializer,
+            SimpleVersionedSerializer<Object> logCommittableSerializer) {
+        this.fileCommittableSerializer = fileCommittableSerializer;
+        this.logCommittableSerializer = logCommittableSerializer;
+    }
 
     @Override
     public int getVersion() {
@@ -33,22 +43,57 @@ public class CommittableSerializer implements 
SimpleVersionedSerializer<Committa
     }
 
     @Override
-    public byte[] serialize(Committable committable) {
-        byte[] wrapped = committable.wrappedCommittable();
+    public byte[] serialize(Committable committable) throws IOException {
+        byte[] wrapped;
+        int version;
+        switch (committable.kind()) {
+            case FILE:
+                version = fileCommittableSerializer.getVersion();
+                wrapped =
+                        fileCommittableSerializer.serialize(
+                                (FileCommittable) 
committable.wrappedCommittable());
+                break;
+            case LOG:
+                version = logCommittableSerializer.getVersion();
+                wrapped = 
logCommittableSerializer.serialize(committable.wrappedCommittable());
+                break;
+            case LOG_OFFSET:
+                version = 1;
+                wrapped = ((LogOffsetCommittable) 
committable.wrappedCommittable()).toBytes();
+                break;
+            default:
+                throw new UnsupportedOperationException("Unsupported kind: " + 
committable.kind());
+        }
+
         return ByteBuffer.allocate(1 + wrapped.length + 4)
                 .put(committable.kind().toByteValue())
                 .put(wrapped)
-                .putInt(committable.serializerVersion())
+                .putInt(version)
                 .array();
     }
 
     @Override
-    public Committable deserialize(int i, byte[] bytes) {
+    public Committable deserialize(int i, byte[] bytes) throws IOException {
         ByteBuffer buffer = ByteBuffer.wrap(bytes);
         Committable.Kind kind = Committable.Kind.fromByteValue(buffer.get());
         byte[] wrapped = new byte[bytes.length - 5];
         buffer.get(wrapped);
         int version = buffer.getInt();
-        return new Committable(kind, wrapped, version);
+
+        Object wrappedCommittable;
+        switch (kind) {
+            case FILE:
+                wrappedCommittable = 
fileCommittableSerializer.deserialize(version, wrapped);
+                break;
+            case LOG:
+                wrappedCommittable = 
logCommittableSerializer.deserialize(version, wrapped);
+                break;
+            case LOG_OFFSET:
+                wrappedCommittable = LogOffsetCommittable.fromBytes(wrapped);
+                break;
+            default:
+                throw new UnsupportedOperationException("Unsupported kind: " + 
kind);
+        }
+        return new Committable(kind, wrappedCommittable);
     }
 }
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/GlobalCommittable.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/GlobalCommittable.java
deleted file mode 100644
index ac49d15..0000000
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/GlobalCommittable.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector.sink;
-
-import org.apache.flink.table.store.file.manifest.ManifestCommittable;
-
-import java.util.List;
-
-/** Global aggregated committable for {@link StoreGlobalCommitter}. */
-public class GlobalCommittable<LogCommT> {
-
-    private final List<LogCommT> logCommittables;
-
-    private final ManifestCommittable fileCommittable;
-
-    public GlobalCommittable(List<LogCommT> logCommittables, 
ManifestCommittable fileCommittable) {
-        this.logCommittables = logCommittables;
-        this.fileCommittable = fileCommittable;
-    }
-
-    public List<LogCommT> logCommittables() {
-        return logCommittables;
-    }
-
-    public ManifestCommittable fileCommittable() {
-        return fileCommittable;
-    }
-}
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/GlobalCommittableSerializer.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/GlobalCommittableSerializer.java
deleted file mode 100644
index ea180d2..0000000
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/GlobalCommittableSerializer.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector.sink;
-
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.core.memory.DataInputDeserializer;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.table.store.file.manifest.ManifestCommittable;
-import 
org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/** {@link SimpleVersionedSerializer} for {@link GlobalCommittable}. */
-public class GlobalCommittableSerializer<LogCommT>
-        implements SimpleVersionedSerializer<GlobalCommittable<LogCommT>> {
-
-    private final SimpleVersionedSerializer<LogCommT> logSerializer;
-
-    private final ManifestCommittableSerializer fileSerializer;
-
-    public GlobalCommittableSerializer(
-            SimpleVersionedSerializer<LogCommT> logSerializer,
-            ManifestCommittableSerializer fileSerializer) {
-        this.logSerializer = logSerializer;
-        this.fileSerializer = fileSerializer;
-    }
-
-    @Override
-    public int getVersion() {
-        return 1;
-    }
-
-    @Override
-    public byte[] serialize(GlobalCommittable<LogCommT> committable) throws 
IOException {
-        ByteArrayOutputStream out = new ByteArrayOutputStream();
-        DataOutputViewStreamWrapper view = new 
DataOutputViewStreamWrapper(out);
-
-        view.writeInt(logSerializer.getVersion());
-        view.writeInt(committable.logCommittables().size());
-        for (LogCommT commT : committable.logCommittables()) {
-            byte[] bytes = logSerializer.serialize(commT);
-            view.writeInt(bytes.length);
-            view.write(bytes);
-        }
-
-        view.writeInt(fileSerializer.getVersion());
-        byte[] bytes = fileSerializer.serialize(committable.fileCommittable());
-        view.writeInt(bytes.length);
-        view.write(bytes);
-
-        return out.toByteArray();
-    }
-
-    @Override
-    public GlobalCommittable<LogCommT> deserialize(int version, byte[] 
serialized)
-            throws IOException {
-        DataInputDeserializer view = new DataInputDeserializer(serialized);
-
-        int logVersion = view.readInt();
-        int logSize = view.readInt();
-        List<LogCommT> logCommTList = new ArrayList<>();
-        for (int i = 0; i < logSize; i++) {
-            byte[] bytes = new byte[view.readInt()];
-            view.read(bytes);
-            logCommTList.add(logSerializer.deserialize(logVersion, bytes));
-        }
-
-        int fileVersion = view.readInt();
-        byte[] bytes = new byte[view.readInt()];
-        view.read(bytes);
-        ManifestCommittable file = fileSerializer.deserialize(fileVersion, 
bytes);
-
-        return new GlobalCommittable<>(logCommTList, file);
-    }
-}
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/LogOffsetCommittable.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/LogOffsetCommittable.java
index 68b62d8..8e47480 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/LogOffsetCommittable.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/LogOffsetCommittable.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.store.connector.sink;
 
 import java.nio.ByteBuffer;
+import java.util.Objects;
 
 /** Log offset committable for a bucket. */
 public class LogOffsetCommittable {
@@ -48,4 +49,21 @@ public class LogOffsetCommittable {
         ByteBuffer buffer = ByteBuffer.wrap(bytes);
         return new LogOffsetCommittable(buffer.getInt(), buffer.getLong());
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        LogOffsetCommittable that = (LogOffsetCommittable) o;
+        return bucket == that.bucket && offset == that.offset;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(bucket, offset);
+    }
 }
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java
index a70b3b1..bf9fec2 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java
@@ -27,22 +27,17 @@ import 
org.apache.flink.table.store.file.operation.FileStoreExpire;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 /** {@link GlobalCommitter} for dynamic store. */
-public class StoreGlobalCommitter<LogCommT>
-        implements GlobalCommitter<Committable, GlobalCommittable<LogCommT>> {
+public class StoreGlobalCommitter implements GlobalCommitter<Committable, 
ManifestCommittable> {
 
     private final FileStoreCommit fileStoreCommit;
 
     private final FileStoreExpire fileStoreExpire;
 
-    private final FileCommittableSerializer fileCommitSerializer;
-
     @Nullable private final CatalogLock lock;
 
     @Nullable private final Map<String, String> overwritePartition;
@@ -50,12 +45,10 @@ public class StoreGlobalCommitter<LogCommT>
     public StoreGlobalCommitter(
             FileStoreCommit fileStoreCommit,
             FileStoreExpire fileStoreExpire,
-            FileCommittableSerializer fileCommitSerializer,
             @Nullable CatalogLock lock,
             @Nullable Map<String, String> overwritePartition) {
         this.fileStoreCommit = fileStoreCommit;
         this.fileStoreExpire = fileStoreExpire;
-        this.fileCommitSerializer = fileCommitSerializer;
         this.lock = lock;
         this.overwritePartition = overwritePartition;
     }
@@ -68,55 +61,45 @@ public class StoreGlobalCommitter<LogCommT>
     }
 
     @Override
-    public List<GlobalCommittable<LogCommT>> filterRecoveredCommittables(
-            List<GlobalCommittable<LogCommT>> globalCommittables) {
-        List<ManifestCommittable> filtered =
-                fileStoreCommit.filterCommitted(
-                        globalCommittables.stream()
-                                .map(GlobalCommittable::fileCommittable)
-                                .collect(Collectors.toList()));
-        return globalCommittables.stream()
-                .filter(c -> filtered.contains(c.fileCommittable()))
-                .collect(Collectors.toList());
+    public List<ManifestCommittable> filterRecoveredCommittables(
+            List<ManifestCommittable> globalCommittables) {
+        return fileStoreCommit.filterCommitted(globalCommittables);
     }
 
     @Override
-    public GlobalCommittable<LogCommT> combine(long checkpointId, 
List<Committable> committables)
+    public ManifestCommittable combine(long checkpointId, List<Committable> 
committables)
             throws IOException {
-        List<LogCommT> logCommittables = new ArrayList<>();
         ManifestCommittable fileCommittable = new 
ManifestCommittable(String.valueOf(checkpointId));
         for (Committable committable : committables) {
             switch (committable.kind()) {
                 case FILE:
-                    FileCommittable file =
-                            fileCommitSerializer.deserialize(
-                                    committable.serializerVersion(),
-                                    committable.wrappedCommittable());
+                    FileCommittable file = (FileCommittable) 
committable.wrappedCommittable();
                     fileCommittable.addFileCommittable(
                             file.partition(), file.bucket(), file.increment());
                     break;
                 case LOG_OFFSET:
                     LogOffsetCommittable offset =
-                            
LogOffsetCommittable.fromBytes(committable.wrappedCommittable());
+                            (LogOffsetCommittable) 
committable.wrappedCommittable();
                     fileCommittable.addLogOffset(offset.bucket(), 
offset.offset());
                     break;
                 case LOG:
-                    throw new UnsupportedOperationException();
+                    // log should be committed in local committer
+                    break;
             }
         }
-        return new GlobalCommittable<>(logCommittables, fileCommittable);
+        return fileCommittable;
     }
 
     @Override
-    public void commit(List<GlobalCommittable<LogCommT>> committables) {
+    public void commit(List<ManifestCommittable> committables)
+            throws IOException, InterruptedException {
         if (overwritePartition == null) {
-            for (GlobalCommittable<LogCommT> committable : committables) {
-                fileStoreCommit.commit(committable.fileCommittable(), new 
HashMap<>());
+            for (ManifestCommittable committable : committables) {
+                fileStoreCommit.commit(committable, new HashMap<>());
             }
         } else {
-            for (GlobalCommittable<LogCommT> committable : committables) {
-                fileStoreCommit.overwrite(
-                        overwritePartition, committable.fileCommittable(), new 
HashMap<>());
+            for (ManifestCommittable committable : committables) {
+                fileStoreCommit.overwrite(overwritePartition, committable, new 
HashMap<>());
             }
         }
 
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreLocalCommitter.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreLocalCommitter.java
new file mode 100644
index 0000000..d3bda6a
--- /dev/null
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreLocalCommitter.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.api.connector.sink2.Committer;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+
+import static 
org.apache.flink.table.store.connector.sink.global.LocalCommitterOperator.convertCommitRequest;
+
+/** Store local {@link Committer} to commit log sink. */
+public class StoreLocalCommitter<LogCommT> implements Committer<Committable> {
+
+    @Nullable private final Committer<LogCommT> logCommitter;
+
+    public StoreLocalCommitter(@Nullable Committer<LogCommT> logCommitter) {
+        this.logCommitter = logCommitter;
+    }
+
+    @Override
+    public void commit(Collection<CommitRequest<Committable>> requests)
+            throws IOException, InterruptedException {
+        List<CommitRequest<LogCommT>> logRequests = new ArrayList<>();
+        for (CommitRequest<Committable> request : requests) {
+            if (request.getCommittable().kind() == Committable.Kind.LOG) {
+                //noinspection unchecked
+                logRequests.add(
+                        convertCommitRequest(
+                                request,
+                                committable -> (LogCommT) 
committable.wrappedCommittable(),
+                                committable -> new 
Committable(Committable.Kind.LOG, committable)));
+            }
+        }
+
+        if (logRequests.size() > 0) {
+            Objects.requireNonNull(logCommitter, "logCommitter should not be 
null.");
+            logCommitter.commit(logRequests);
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (logCommitter != null) {
+            logCommitter.close();
+        }
+    }
+}
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
index 9ae59fd..a848ca6 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
@@ -18,30 +18,40 @@
 
 package org.apache.flink.table.store.connector.sink;
 
+import org.apache.flink.api.connector.sink2.Committer;
 import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.table.catalog.CatalogLock;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.connector.sink.global.GlobalCommittingSink;
 import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
 import 
org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
 import org.apache.flink.table.store.file.operation.FileStoreCommit;
 import org.apache.flink.table.store.file.operation.Lock;
+import org.apache.flink.table.store.log.LogInitContext;
+import org.apache.flink.table.store.log.LogSinkProvider;
+import org.apache.flink.table.store.log.LogWriteCallback;
+import org.apache.flink.table.store.sink.SinkRecord;
 import org.apache.flink.table.store.sink.SinkRecordConverter;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.Callable;
+import java.util.function.Consumer;
 
 /** {@link Sink} of dynamic store. */
 public class StoreSink<WriterStateT, LogCommT>
         implements StatefulSink<RowData, WriterStateT>,
-                GlobalCommittingSink<RowData, Committable, 
GlobalCommittable<LogCommT>> {
+                GlobalCommittingSink<RowData, Committable, 
ManifestCommittable> {
 
     private static final long serialVersionUID = 1L;
 
@@ -59,6 +69,8 @@ public class StoreSink<WriterStateT, LogCommT>
 
     @Nullable private final Map<String, String> overwritePartition;
 
+    @Nullable private final LogSinkProvider logSinkProvider;
+
     public StoreSink(
             ObjectIdentifier tableIdentifier,
             FileStore fileStore,
@@ -66,7 +78,8 @@ public class StoreSink<WriterStateT, LogCommT>
             int[] primaryKeys,
             int numBucket,
             @Nullable CatalogLock.Factory lockFactory,
-            @Nullable Map<String, String> overwritePartition) {
+            @Nullable Map<String, String> overwritePartition,
+            @Nullable LogSinkProvider logSinkProvider) {
         this.tableIdentifier = tableIdentifier;
         this.fileStore = fileStore;
         this.partitions = partitions;
@@ -74,6 +87,7 @@ public class StoreSink<WriterStateT, LogCommT>
         this.numBucket = numBucket;
         this.lockFactory = lockFactory;
         this.overwritePartition = overwritePartition;
+        this.logSinkProvider = logSinkProvider;
     }
 
     @Override
@@ -84,6 +98,19 @@ public class StoreSink<WriterStateT, LogCommT>
     @Override
     public StoreSinkWriter<WriterStateT> restoreWriter(
             InitContext initContext, Collection<WriterStateT> states) throws 
IOException {
+        SinkWriter<SinkRecord> logWriter = null;
+        LogWriteCallback logCallback = null;
+        if (logSinkProvider != null) {
+            logCallback = new LogWriteCallback();
+            Consumer<?> metadataConsumer = 
logSinkProvider.createMetadataConsumer(logCallback);
+            LogInitContext logInitContext = new LogInitContext(initContext, 
metadataConsumer);
+            Sink<SinkRecord> logSink = logSinkProvider.createSink();
+            logWriter =
+                    states == null
+                            ? logSink.createWriter(logInitContext)
+                            : ((StatefulSink<SinkRecord, WriterStateT>) 
logSink)
+                                    .restoreWriter(logInitContext, states);
+        }
         return new StoreSinkWriter<>(
                 fileStore.newWrite(),
                 new SinkRecordConverter(
@@ -91,17 +118,55 @@ public class StoreSink<WriterStateT, LogCommT>
                         primaryKeys.length > 0 ? fileStore.valueType() : 
fileStore.keyType(),
                         partitions,
                         primaryKeys),
-                fileCommitSerializer(),
-                overwritePartition != null);
+                overwritePartition != null,
+                logWriter,
+                logCallback);
     }
 
     @Override
     public SimpleVersionedSerializer<WriterStateT> getWriterStateSerializer() {
-        return new NoOutputSerializer<>();
+        return logSinkProvider == null
+                ? new NoOutputSerializer<>()
+                : ((StatefulSink<SinkRecord, WriterStateT>) 
logSinkProvider.createSink())
+                        .getWriterStateSerializer();
+    }
+
+    @Nullable
+    private Committer<LogCommT> logCommitter() {
+        if (logSinkProvider != null) {
+            Sink<SinkRecord> sink = logSinkProvider.createSink();
+            if (sink instanceof TwoPhaseCommittingSink) {
+                try {
+                    return ((TwoPhaseCommittingSink<SinkRecord, LogCommT>) 
sink).createCommitter();
+                } catch (IOException e) {
+                    throw new UncheckedIOException(e);
+                }
+            }
+        }
+
+        return null;
+    }
+
+    @Nullable
+    private SimpleVersionedSerializer<LogCommT> logCommitSerializer() {
+        if (logSinkProvider != null) {
+            Sink<SinkRecord> sink = logSinkProvider.createSink();
+            if (sink instanceof TwoPhaseCommittingSink) {
+                return ((TwoPhaseCommittingSink<SinkRecord, LogCommT>) sink)
+                        .getCommittableSerializer();
+            }
+        }
+
+        return null;
     }
 
     @Override
-    public StoreGlobalCommitter<LogCommT> createGlobalCommitter() {
+    public Committer<Committable> createCommitter() {
+        return new StoreLocalCommitter<>(logCommitter());
+    }
+
+    @Override
+    public StoreGlobalCommitter createGlobalCommitter() {
         FileStoreCommit commit = fileStore.newCommit();
         CatalogLock lock;
         if (lockFactory == null) {
@@ -120,22 +185,20 @@ public class StoreSink<WriterStateT, LogCommT>
                     });
         }
 
-        return new StoreGlobalCommitter<>(
-                commit, fileStore.newExpire(), fileCommitSerializer(), lock, 
overwritePartition);
+        return new StoreGlobalCommitter(commit, fileStore.newExpire(), lock, 
overwritePartition);
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public SimpleVersionedSerializer<Committable> getCommittableSerializer() {
-        return CommittableSerializer.INSTANCE;
+        return new CommittableSerializer(
+                fileCommitSerializer(), (SimpleVersionedSerializer<Object>) 
logCommitSerializer());
     }
 
     @Override
-    public GlobalCommittableSerializer<LogCommT> 
getGlobalCommittableSerializer() {
-        ManifestCommittableSerializer fileCommSerializer =
-                new ManifestCommittableSerializer(
-                        fileStore.partitionType(), fileStore.keyType(), 
fileStore.valueType());
-        SimpleVersionedSerializer<LogCommT> logCommitSerializer = new 
NoOutputSerializer<>();
-        return new GlobalCommittableSerializer<>(logCommitSerializer, 
fileCommSerializer);
+    public ManifestCommittableSerializer getGlobalCommittableSerializer() {
+        return new ManifestCommittableSerializer(
+                fileStore.partitionType(), fileStore.keyType(), 
fileStore.valueType());
     }
 
     private FileCommittableSerializer fileCommitSerializer() {
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
index ccc2482..6fea71d 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
@@ -28,16 +28,21 @@ import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.operation.FileStoreWrite;
 import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.store.log.LogWriteCallback;
 import org.apache.flink.table.store.sink.SinkRecord;
 import org.apache.flink.table.store.sink.SinkRecordConverter;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -50,10 +55,12 @@ public class StoreSinkWriter<WriterStateT>
 
     private final SinkRecordConverter recordConverter;
 
-    private final FileCommittableSerializer fileCommitSerializer;
-
     private final boolean overwrite;
 
+    @Nullable private final SinkWriter<SinkRecord> logWriter;
+
+    @Nullable private final LogWriteCallback logCallback;
+
     private final ExecutorService compactExecutor;
 
     private final Map<BinaryRowData, Map<Integer, RecordWriter>> writers;
@@ -61,12 +68,14 @@ public class StoreSinkWriter<WriterStateT>
     public StoreSinkWriter(
             FileStoreWrite fileStoreWrite,
             SinkRecordConverter recordConverter,
-            FileCommittableSerializer fileCommitSerializer,
-            boolean overwrite) {
+            boolean overwrite,
+            @Nullable SinkWriter<SinkRecord> logWriter,
+            @Nullable LogWriteCallback logCallback) {
         this.fileStoreWrite = fileStoreWrite;
         this.recordConverter = recordConverter;
-        this.fileCommitSerializer = fileCommitSerializer;
         this.overwrite = overwrite;
+        this.logWriter = logWriter;
+        this.logCallback = logCallback;
         this.compactExecutor = Executors.newSingleThreadScheduledExecutor();
         this.writers = new HashMap<>();
     }
@@ -95,6 +104,11 @@ public class StoreSinkWriter<WriterStateT>
         } catch (Exception e) {
             throw new IOException(e);
         }
+
+        // write to log store
+        if (logWriter != null) {
+            logWriter.write(record, context);
+        }
     }
 
     private void writeToFileStore(RecordWriter writer, SinkRecord record) 
throws Exception {
@@ -119,15 +133,22 @@ public class StoreSinkWriter<WriterStateT>
     }
 
     @Override
-    public void flush(boolean endOfInput) {}
+    public void flush(boolean endOfInput) throws IOException, 
InterruptedException {
+        if (logWriter != null) {
+            logWriter.flush(endOfInput);
+        }
+    }
 
     @Override
     public List<WriterStateT> snapshotState(long checkpointId) throws 
IOException {
+        if (logWriter != null && logWriter instanceof StatefulSinkWriter) {
+            return ((StatefulSinkWriter<?, WriterStateT>) 
logWriter).snapshotState(checkpointId);
+        }
         return Collections.emptyList();
     }
 
     @Override
-    public List<Committable> prepareCommit() throws IOException {
+    public List<Committable> prepareCommit() throws IOException, 
InterruptedException {
         List<Committable> committables = new ArrayList<>();
         Iterator<Map.Entry<BinaryRowData, Map<Integer, RecordWriter>>> 
partIter =
                 writers.entrySet().iterator();
@@ -146,11 +167,7 @@ public class StoreSinkWriter<WriterStateT>
                 } catch (Exception e) {
                     throw new IOException(e);
                 }
-                committables.add(
-                        new Committable(
-                                Committable.Kind.FILE,
-                                fileCommitSerializer.serialize(committable),
-                                fileCommitSerializer.getVersion()));
+                committables.add(new Committable(Committable.Kind.FILE, 
committable));
 
                 // clear if no update
                 // we need a mechanism to clear writers, otherwise there will 
be more and more
@@ -166,6 +183,25 @@ public class StoreSinkWriter<WriterStateT>
             }
         }
 
+        if (logWriter != null) {
+            if (logWriter instanceof PrecommittingSinkWriter) {
+                Collection<?> logCommittables =
+                        ((PrecommittingSinkWriter<?, ?>) 
logWriter).prepareCommit();
+                for (Object logCommittable : logCommittables) {
+                    committables.add(new Committable(Committable.Kind.LOG, 
logCommittable));
+                }
+            }
+
+            Objects.requireNonNull(logCallback, "logCallback should not be 
null.");
+            logCallback
+                    .offsets()
+                    .forEach(
+                            (k, v) ->
+                                    committables.add(
+                                            new Committable(
+                                                    
Committable.Kind.LOG_OFFSET,
+                                                    new 
LogOffsetCommittable(k, v))));
+        }
         return committables;
     }
 
@@ -187,6 +223,10 @@ public class StoreSinkWriter<WriterStateT>
             }
         }
         writers.clear();
+
+        if (logWriter != null) {
+            logWriter.close();
+        }
     }
 
     @VisibleForTesting
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperator.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/AbstractCommitterOperator.java
similarity index 62%
copy from 
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperator.java
copy to 
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/AbstractCommitterOperator.java
index 36b1680..7b807bb 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperator.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/AbstractCommitterOperator.java
@@ -39,28 +39,23 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Deque;
 import java.util.List;
 import java.util.NavigableMap;
 import java.util.TreeMap;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /** An operator that processes committables of a {@link Sink}. */
-public class GlobalCommitterOperator<CommT, GlobalCommT> extends 
AbstractStreamOperator<Void>
-        implements OneInputStreamOperator<CommittableMessage<CommT>, Void>, 
BoundedOneInput {
+public abstract class AbstractCommitterOperator<IN, CommT>
+        extends AbstractStreamOperator<CommittableMessage<IN>>
+        implements OneInputStreamOperator<CommittableMessage<IN>, 
CommittableMessage<IN>>,
+                BoundedOneInput {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(GlobalCommitterOperator.class);
+    private static final long serialVersionUID = 1L;
 
-    /** Record all the committables until commit. */
-    private final Deque<CommT> committables = new ArrayDeque<>();
+    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractCommitterOperator.class);
 
-    /**
-     * Aggregate committables to global committables and commit the global 
committables to the
-     * external system.
-     */
-    private final SerializableSupplier<GlobalCommitter<CommT, GlobalCommT>> 
committerFactory;
+    /** Record all the inputs until commit. */
+    private final Deque<IN> inputs = new ArrayDeque<>();
 
     /** The operator's state descriptor. */
     private static final ListStateDescriptor<byte[]> 
STREAMING_COMMITTER_RAW_STATES_DESC =
@@ -68,21 +63,16 @@ public class GlobalCommitterOperator<CommT, GlobalCommT> 
extends AbstractStreamO
                     "streaming_committer_raw_states", 
BytePrimitiveArraySerializer.INSTANCE);
 
     /** Group the committable by the checkpoint id. */
-    private final NavigableMap<Long, GlobalCommT> committablesPerCheckpoint;
+    private final NavigableMap<Long, List<CommT>> committablesPerCheckpoint;
 
     /** The committable's serializer. */
-    private final SerializableSupplier<SimpleVersionedSerializer<GlobalCommT>>
-            committableSerializer;
+    private final SerializableSupplier<SimpleVersionedSerializer<CommT>> 
committableSerializer;
 
     /** The operator's state. */
-    private ListState<GlobalCommT> streamingCommitterState;
-
-    private GlobalCommitter<CommT, GlobalCommT> committer;
+    private ListState<CommT> streamingCommitterState;
 
-    public GlobalCommitterOperator(
-            SerializableSupplier<GlobalCommitter<CommT, GlobalCommT>> 
committerFactory,
-            SerializableSupplier<SimpleVersionedSerializer<GlobalCommT>> 
committableSerializer) {
-        this.committerFactory = checkNotNull(committerFactory);
+    public AbstractCommitterOperator(
+            SerializableSupplier<SimpleVersionedSerializer<CommT>> 
committableSerializer) {
         this.committableSerializer = committableSerializer;
         this.committablesPerCheckpoint = new TreeMap<>();
         setChainingStrategy(ChainingStrategy.ALWAYS);
@@ -91,36 +81,43 @@ public class GlobalCommitterOperator<CommT, GlobalCommT> 
extends AbstractStreamO
     @Override
     public void initializeState(StateInitializationContext context) throws 
Exception {
         super.initializeState(context);
-        committer = committerFactory.get();
         streamingCommitterState =
                 new SimpleVersionedListState<>(
                         context.getOperatorStateStore()
                                 
.getListState(STREAMING_COMMITTER_RAW_STATES_DESC),
                         committableSerializer.get());
-        List<GlobalCommT> restored = new ArrayList<>();
+        List<CommT> restored = new ArrayList<>();
         streamingCommitterState.get().forEach(restored::add);
         streamingCommitterState.clear();
-        committer.commit(committer.filterRecoveredCommittables(restored));
+        commit(true, restored);
     }
 
+    public abstract void commit(boolean isRecover, List<CommT> committables) 
throws Exception;
+
+    public abstract List<CommT> toCommittables(long checkpoint, List<IN> 
inputs) throws Exception;
+
     @Override
     public void snapshotState(StateSnapshotContext context) throws Exception {
         super.snapshotState(context);
-        List<CommT> committables = pollCommittables();
-        if (committables.size() > 0) {
+        List<IN> poll = pollInputs();
+        if (poll.size() > 0) {
             committablesPerCheckpoint.put(
-                    context.getCheckpointId(),
-                    committer.combine(context.getCheckpointId(), 
committables));
+                    context.getCheckpointId(), 
toCommittables(context.getCheckpointId(), poll));
         }
-        streamingCommitterState.update(new 
ArrayList<>(committablesPerCheckpoint.values()));
+        
streamingCommitterState.update(committables(committablesPerCheckpoint));
+    }
+
+    private List<CommT> committables(NavigableMap<Long, List<CommT>> map) {
+        List<CommT> committables = new ArrayList<>();
+        map.values().forEach(committables::addAll);
+        return committables;
     }
 
     @Override
     public void endInput() throws Exception {
-        List<CommT> allCommittables = pollCommittables();
-        if (!allCommittables.isEmpty()) {
-            committer.commit(
-                    
Collections.singletonList(committer.combine(Long.MAX_VALUE, allCommittables)));
+        List<IN> poll = pollInputs();
+        if (!poll.isEmpty()) {
+            commit(false, toCommittables(Long.MAX_VALUE, poll));
         }
     }
 
@@ -128,31 +125,31 @@ public class GlobalCommitterOperator<CommT, GlobalCommT> 
extends AbstractStreamO
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
         super.notifyCheckpointComplete(checkpointId);
         LOG.info("Committing the state for checkpoint {}", checkpointId);
-        NavigableMap<Long, GlobalCommT> headMap =
+        NavigableMap<Long, List<CommT>> headMap =
                 committablesPerCheckpoint.headMap(checkpointId, true);
-        committer.commit(new ArrayList<>(headMap.values()));
+        commit(false, committables(headMap));
         headMap.clear();
     }
 
     @Override
-    public void processElement(StreamRecord<CommittableMessage<CommT>> 
element) {
-        CommittableMessage<CommT> message = element.getValue();
+    public void processElement(StreamRecord<CommittableMessage<IN>> element) {
+        output.collect(element);
+        CommittableMessage<IN> message = element.getValue();
         if (message instanceof CommittableWithLineage) {
-            this.committables.add(((CommittableWithLineage<CommT>) 
message).getCommittable());
+            this.inputs.add(((CommittableWithLineage<IN>) 
message).getCommittable());
         }
     }
 
     @Override
     public void close() throws Exception {
-        committer.close();
         committablesPerCheckpoint.clear();
-        committables.clear();
+        inputs.clear();
         super.close();
     }
 
-    private List<CommT> pollCommittables() {
-        List<CommT> committables = new ArrayList<>(this.committables);
-        this.committables.clear();
-        return committables;
+    private List<IN> pollInputs() {
+        List<IN> poll = new ArrayList<>(this.inputs);
+        this.inputs.clear();
+        return poll;
     }
 }
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperator.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperator.java
index 36b1680..a606bf9 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperator.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperator.java
@@ -17,142 +17,60 @@
 
 package org.apache.flink.table.store.connector.sink.global;
 
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
-import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.StateSnapshotContext;
-import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
-import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.BoundedOneInput;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.function.SerializableSupplier;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayDeque;
-import java.util.ArrayList;
+import java.io.IOException;
 import java.util.Collections;
-import java.util.Deque;
 import java.util.List;
-import java.util.NavigableMap;
-import java.util.TreeMap;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
-/** An operator that processes committables of a {@link Sink}. */
-public class GlobalCommitterOperator<CommT, GlobalCommT> extends 
AbstractStreamOperator<Void>
-        implements OneInputStreamOperator<CommittableMessage<CommT>, Void>, 
BoundedOneInput {
+/** An {@link AbstractCommitterOperator} to process global committer. */
+public class GlobalCommitterOperator<CommT, GlobalCommT>
+        extends AbstractCommitterOperator<CommT, GlobalCommT> {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(GlobalCommitterOperator.class);
+    private static final long serialVersionUID = 1L;
 
-    /** Record all the committables until commit. */
-    private final Deque<CommT> committables = new ArrayDeque<>();
+    private final SerializableSupplier<GlobalCommitter<CommT, GlobalCommT>> 
committerFactory;
 
     /**
      * Aggregate committables to global committables and commit the global 
committables to the
      * external system.
      */
-    private final SerializableSupplier<GlobalCommitter<CommT, GlobalCommT>> 
committerFactory;
-
-    /** The operator's state descriptor. */
-    private static final ListStateDescriptor<byte[]> 
STREAMING_COMMITTER_RAW_STATES_DESC =
-            new ListStateDescriptor<>(
-                    "streaming_committer_raw_states", 
BytePrimitiveArraySerializer.INSTANCE);
-
-    /** Group the committable by the checkpoint id. */
-    private final NavigableMap<Long, GlobalCommT> committablesPerCheckpoint;
-
-    /** The committable's serializer. */
-    private final SerializableSupplier<SimpleVersionedSerializer<GlobalCommT>>
-            committableSerializer;
-
-    /** The operator's state. */
-    private ListState<GlobalCommT> streamingCommitterState;
-
     private GlobalCommitter<CommT, GlobalCommT> committer;
 
     public GlobalCommitterOperator(
             SerializableSupplier<GlobalCommitter<CommT, GlobalCommT>> 
committerFactory,
             SerializableSupplier<SimpleVersionedSerializer<GlobalCommT>> 
committableSerializer) {
+        super(committableSerializer);
         this.committerFactory = checkNotNull(committerFactory);
-        this.committableSerializer = committableSerializer;
-        this.committablesPerCheckpoint = new TreeMap<>();
-        setChainingStrategy(ChainingStrategy.ALWAYS);
     }
 
     @Override
     public void initializeState(StateInitializationContext context) throws 
Exception {
-        super.initializeState(context);
         committer = committerFactory.get();
-        streamingCommitterState =
-                new SimpleVersionedListState<>(
-                        context.getOperatorStateStore()
-                                
.getListState(STREAMING_COMMITTER_RAW_STATES_DESC),
-                        committableSerializer.get());
-        List<GlobalCommT> restored = new ArrayList<>();
-        streamingCommitterState.get().forEach(restored::add);
-        streamingCommitterState.clear();
-        committer.commit(committer.filterRecoveredCommittables(restored));
-    }
-
-    @Override
-    public void snapshotState(StateSnapshotContext context) throws Exception {
-        super.snapshotState(context);
-        List<CommT> committables = pollCommittables();
-        if (committables.size() > 0) {
-            committablesPerCheckpoint.put(
-                    context.getCheckpointId(),
-                    committer.combine(context.getCheckpointId(), 
committables));
-        }
-        streamingCommitterState.update(new 
ArrayList<>(committablesPerCheckpoint.values()));
+        super.initializeState(context);
     }
 
     @Override
-    public void endInput() throws Exception {
-        List<CommT> allCommittables = pollCommittables();
-        if (!allCommittables.isEmpty()) {
-            committer.commit(
-                    
Collections.singletonList(committer.combine(Long.MAX_VALUE, allCommittables)));
+    public void commit(boolean isRecover, List<GlobalCommT> committables)
+            throws IOException, InterruptedException {
+        if (isRecover) {
+            committables = committer.filterRecoveredCommittables(committables);
         }
+        committer.commit(committables);
     }
 
     @Override
-    public void notifyCheckpointComplete(long checkpointId) throws Exception {
-        super.notifyCheckpointComplete(checkpointId);
-        LOG.info("Committing the state for checkpoint {}", checkpointId);
-        NavigableMap<Long, GlobalCommT> headMap =
-                committablesPerCheckpoint.headMap(checkpointId, true);
-        committer.commit(new ArrayList<>(headMap.values()));
-        headMap.clear();
-    }
-
-    @Override
-    public void processElement(StreamRecord<CommittableMessage<CommT>> 
element) {
-        CommittableMessage<CommT> message = element.getValue();
-        if (message instanceof CommittableWithLineage) {
-            this.committables.add(((CommittableWithLineage<CommT>) 
message).getCommittable());
-        }
+    public List<GlobalCommT> toCommittables(long checkpoint, List<CommT> 
inputs) throws Exception {
+        return Collections.singletonList(committer.combine(checkpoint, 
inputs));
     }
 
     @Override
     public void close() throws Exception {
         committer.close();
-        committablesPerCheckpoint.clear();
-        committables.clear();
         super.close();
     }
-
-    private List<CommT> pollCommittables() {
-        List<CommT> committables = new ArrayList<>(this.committables);
-        this.committables.clear();
-        return committables;
-    }
 }
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSink.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSink.java
index 4b2b512..90d97b9 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSink.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSink.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.store.connector.sink.global;
 
-import org.apache.flink.api.connector.sink2.Committer;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
@@ -36,11 +35,6 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
 public interface GlobalCommittingSink<InputT, CommT, GlobalCommT>
         extends TwoPhaseCommittingSink<InputT, CommT> {
 
-    @Override
-    default Committer<CommT> createCommitter() {
-        throw new UnsupportedOperationException("Please create global 
committer.");
-    }
-
     /**
      * Creates a {@link GlobalCommitter} that permanently makes the previously 
written data visible
      * through {@link GlobalCommitter#commit}.
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSinkTranslator.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSinkTranslator.java
index e547642..c034276 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSinkTranslator.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSinkTranslator.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.store.connector.sink.global;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
 import 
org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -28,38 +27,47 @@ import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
 
+import java.io.IOException;
+import java.io.UncheckedIOException;
+
 /** A translator for the {@link GlobalCommittingSink}. */
 public class GlobalCommittingSinkTranslator {
 
-    private static final String GLOBAL_COMMITTER_NAME = "Global Committer";
-
     private static final String WRITER_NAME = "Writer";
 
+    private static final String LOCAL_COMMITTER_NAME = "Local Committer";
+
+    private static final String GLOBAL_COMMITTER_NAME = "Global Committer";
+
     public static <T, CommT, GlobalCommT> DataStreamSink<?> translate(
             DataStream<T> input, GlobalCommittingSink<T, CommT, GlobalCommT> 
sink) {
         TypeInformation<CommittableMessage<CommT>> commitType =
                 CommittableMessageTypeInfo.of(sink::getCommittableSerializer);
 
-        boolean checkpointingEnabled =
-                
input.getExecutionEnvironment().getCheckpointConfig().isCheckpointingEnabled();
-
-        // We cannot determine the mode, when the execution mode is auto.
-        // We set isBatch to false and only use checkpointingEnabled to 
determine if we want to do
-        // the final commit.
-        // When isBatch is true, only the checkpointID is different, which has 
no effect on the
-        // commit operator.
-
         SingleOutputStreamOperator<CommittableMessage<CommT>> written =
-                input.transform(
-                        WRITER_NAME,
-                        commitType,
-                        new SinkWriterOperatorFactory<>(sink, false, 
checkpointingEnabled));
+                input.transform(WRITER_NAME, commitType, new 
SinkWriterOperatorFactory<>(sink))
+                        .setParallelism(input.getParallelism());
+
+        SingleOutputStreamOperator<CommittableMessage<CommT>> local =
+                written.transform(
+                                LOCAL_COMMITTER_NAME,
+                                commitType,
+                                new LocalCommitterOperator<>(
+                                        () -> {
+                                            try {
+                                                return sink.createCommitter();
+                                            } catch (IOException e) {
+                                                throw new 
UncheckedIOException(e);
+                                            }
+                                        },
+                                        sink::getCommittableSerializer))
+                        .setParallelism(written.getParallelism());
 
-        SingleOutputStreamOperator<Void> committed =
-                written.global()
+        SingleOutputStreamOperator<?> committed =
+                local.global()
                         .transform(
                                 GLOBAL_COMMITTER_NAME,
-                                Types.VOID,
+                                commitType,
                                 new GlobalCommitterOperator<>(
                                         sink::createGlobalCommitter,
                                         sink::getGlobalCommittableSerializer))
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/LocalCommitterOperator.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/LocalCommitterOperator.java
new file mode 100644
index 0000000..9e7974c
--- /dev/null
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/LocalCommitterOperator.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink.global;
+
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.Committer.CommitRequest;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import 
org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl;
+import 
org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestState;
+import org.apache.flink.util.function.SerializableSupplier;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A {@link AbstractCommitterOperator} to process local committer. */
+public class LocalCommitterOperator<CommT> extends 
AbstractCommitterOperator<CommT, CommT> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final SerializableSupplier<Committer<CommT>> committerFactory;
+
+    private Committer<CommT> committer;
+
+    public LocalCommitterOperator(
+            SerializableSupplier<Committer<CommT>> committerFactory,
+            SerializableSupplier<SimpleVersionedSerializer<CommT>> 
committableSerializer) {
+        super(committableSerializer);
+        this.committerFactory = checkNotNull(committerFactory);
+    }
+
+    @Override
+    public void initializeState(StateInitializationContext context) throws 
Exception {
+        committer = committerFactory.get();
+        super.initializeState(context);
+    }
+
+    @Override
+    public void commit(boolean isRecover, List<CommT> committables)
+            throws IOException, InterruptedException {
+        if (committables.isEmpty()) {
+            return;
+        }
+
+        List<CommitRequestImpl> requests = new 
ArrayList<>(committables.size());
+        for (CommT comm : committables) {
+            requests.add(new CommitRequestImpl(comm));
+        }
+
+        long sleep = 1000;
+        while (true) {
+            // commit
+            requests.forEach(CommitRequestImpl::setSelected);
+            committer.commit(new ArrayList<>(requests));
+            requests.forEach(CommitRequestImpl::setCommittedIfNoError);
+
+            // drain finished
+            requests.removeIf(CommitRequestImpl::isFinished);
+            if (requests.isEmpty()) {
+                return;
+            }
+
+            //noinspection BusyWait
+            Thread.sleep(sleep);
+            sleep *= 2;
+        }
+    }
+
+    @Override
+    public List<CommT> toCommittables(long checkpoint, List<CommT> inputs) {
+        return inputs;
+    }
+
+    @Override
+    public void close() throws Exception {
+        committer.close();
+        super.close();
+    }
+
+    /** {@link CommitRequest} implementation. */
+    public class CommitRequestImpl implements CommitRequest<CommT> {
+
+        private CommT committable;
+        private int numRetries;
+        private CommitRequestState state;
+
+        private CommitRequestImpl(CommT committable) {
+            this.committable = committable;
+            this.state = CommitRequestState.RECEIVED;
+        }
+
+        private boolean isFinished() {
+            return state.isFinalState();
+        }
+
+        @Override
+        public CommT getCommittable() {
+            return this.committable;
+        }
+
+        @Override
+        public int getNumberOfRetries() {
+            return this.numRetries;
+        }
+
+        @Override
+        public void signalFailedWithKnownReason(Throwable t) {
+            this.state = CommitRequestState.FAILED;
+        }
+
+        @Override
+        public void signalFailedWithUnknownReason(Throwable t) {
+            this.state = CommitRequestState.FAILED;
+            throw new IllegalStateException("Failed to commit " + 
this.committable, t);
+        }
+
+        @Override
+        public void retryLater() {
+            this.state = CommitRequestState.RETRY;
+            ++this.numRetries;
+        }
+
+        @Override
+        public void updateAndRetryLater(CommT committable) {
+            this.committable = committable;
+            this.retryLater();
+        }
+
+        @Override
+        public void signalAlreadyCommitted() {
+            this.state = CommitRequestState.COMMITTED;
+        }
+
+        void setSelected() {
+            state = CommitRequestState.RECEIVED;
+        }
+
+        void setCommittedIfNoError() {
+            if (state == CommitRequestState.RECEIVED) {
+                state = CommitRequestState.COMMITTED;
+            }
+        }
+    }
+
+    /** Convert a {@link CommitRequest} to another type. */
+    public static <CommT, NewT> CommitRequest<NewT> convertCommitRequest(
+            CommitRequest<CommT> request, Function<CommT, NewT> to, 
Function<NewT, CommT> from) {
+        return new CommitRequest<NewT>() {
+
+            @Override
+            public NewT getCommittable() {
+                return to.apply(request.getCommittable());
+            }
+
+            @Override
+            public int getNumberOfRetries() {
+                return request.getNumberOfRetries();
+            }
+
+            @Override
+            public void signalFailedWithKnownReason(Throwable throwable) {
+                request.signalFailedWithKnownReason(throwable);
+            }
+
+            @Override
+            public void signalFailedWithUnknownReason(Throwable throwable) {
+                request.signalFailedWithUnknownReason(throwable);
+            }
+
+            @Override
+            public void retryLater() {
+                request.retryLater();
+            }
+
+            @Override
+            public void updateAndRetryLater(NewT committable) {
+                request.updateAndRetryLater(from.apply(committable));
+            }
+
+            @Override
+            public void signalAlreadyCommitted() {
+                request.signalAlreadyCommitted();
+            }
+        };
+    }
+}
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
index 28f2f8f..f671476 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.FiniteTestSource;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
@@ -39,6 +38,7 @@ import org.apache.flink.table.store.file.FileStore;
 import org.apache.flink.table.store.file.FileStoreImpl;
 import 
org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
 import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
+import org.apache.flink.table.store.log.LogSinkProvider;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.VarCharType;
@@ -77,7 +77,7 @@ public class FileStoreITCase extends AbstractTestBase {
     private static final RowType KEY_TYPE =
             new RowType(Collections.singletonList(new RowType.RowField("k", 
new IntType())));
 
-    private static final RowType VALUE_TYPE =
+    public static final RowType VALUE_TYPE =
             new RowType(
                     Arrays.asList(
                             new RowType.RowField("v", new IntType()),
@@ -86,7 +86,7 @@ public class FileStoreITCase extends AbstractTestBase {
                             new RowType.RowField("_k", new IntType())));
 
     @SuppressWarnings({"unchecked", "rawtypes"})
-    private static final DataStructureConverter<RowData, Row> CONVERTER =
+    public static final DataStructureConverter<RowData, Row> CONVERTER =
             (DataStructureConverter)
                     DataStructureConverters.getConverter(
                             TypeConversions.fromLogicalToDataType(VALUE_TYPE));
@@ -209,10 +209,10 @@ public class FileStoreITCase extends AbstractTestBase {
         return env;
     }
 
-    public static Configuration buildConfiguration(boolean isBatch, File 
folder) {
+    public static Configuration buildConfiguration(boolean noFail, File 
folder) {
         Configuration options = new Configuration();
         options.set(BUCKET, NUM_BUCKET);
-        if (isBatch) {
+        if (noFail) {
             options.set(FILE_PATH, folder.toURI().toString());
         } else {
             FailingAtomicRenameFileSystem.get().reset(3, 100);
@@ -237,7 +237,7 @@ public class FileStoreITCase extends AbstractTestBase {
         return isBatch
                 ? env.fromCollection(SOURCE_DATA, 
InternalTypeInfo.of(VALUE_TYPE))
                 : env.addSource(
-                        new FiniteTestSource<>(null, SOURCE_DATA), 
InternalTypeInfo.of(VALUE_TYPE));
+                        new FiniteTestSource<>(SOURCE_DATA), 
InternalTypeInfo.of(VALUE_TYPE));
     }
 
     public static void write(DataStream<RowData> input, FileStore fileStore, 
boolean partitioned)
@@ -251,11 +251,28 @@ public class FileStoreITCase extends AbstractTestBase {
             boolean partitioned,
             @Nullable Map<String, String> overwritePartition)
             throws Exception {
+        write(input, fileStore, partitioned, overwritePartition, null);
+    }
+
+    public static void write(
+            DataStream<RowData> input,
+            FileStore fileStore,
+            boolean partitioned,
+            @Nullable Map<String, String> overwritePartition,
+            @Nullable LogSinkProvider logSinkProvider)
+            throws Exception {
         int[] partitions = partitioned ? new int[] {1} : new int[0];
         int[] keys = new int[] {2};
         StoreSink<?, ?> sink =
                 new StoreSink<>(
-                        null, fileStore, partitions, keys, NUM_BUCKET, null, 
overwritePartition);
+                        null,
+                        fileStore,
+                        partitions,
+                        keys,
+                        NUM_BUCKET,
+                        null,
+                        overwritePartition,
+                        logSinkProvider);
         input = input.keyBy(row -> row.getInt(2)); // key by
         GlobalCommittingSinkTranslator.translate(input, sink);
         input.getExecutionEnvironment().execute();
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FiniteTestSource.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FiniteTestSource.java
new file mode 100644
index 0000000..0e60cea
--- /dev/null
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FiniteTestSource.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A stream source that: 1) emits a list of elements without allowing 
checkpoints, 2) then waits for
+ * two more checkpoints to complete, 3) then re-emits the same elements before 
4) waiting for
+ * another two checkpoints and 5) exiting.
+ *
+ * <p>The reason this class is rewritten is to support {@link 
CheckpointedFunction}.
+ */
+@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+public class FiniteTestSource<T>
+        implements SourceFunction<T>, CheckpointedFunction, CheckpointListener 
{
+
+    private static final long serialVersionUID = 1L;
+
+    @SuppressWarnings("NonSerializableFieldInSerializableClass")
+    private final Iterable<T> elements;
+
+    private volatile boolean running = true;
+
+    private transient int numCheckpointsComplete;
+
+    private transient ListState<Integer> checkpointedState;
+
+    private volatile int numTimesEmitted;
+
+    public FiniteTestSource(Iterable<T> elements) {
+        this.elements = elements;
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws 
Exception {
+        this.checkpointedState =
+                context.getOperatorStateStore()
+                        .getListState(
+                                new ListStateDescriptor<>("emit-times", 
IntSerializer.INSTANCE));
+
+        if (context.isRestored()) {
+            List<Integer> retrievedStates = new ArrayList<>();
+            for (Integer entry : this.checkpointedState.get()) {
+                retrievedStates.add(entry);
+            }
+
+            // given that the parallelism of the function is 1, we can only 
have 1 state
+            Preconditions.checkArgument(
+                    retrievedStates.size() == 1,
+                    getClass().getSimpleName() + " retrieved invalid state.");
+
+            this.numTimesEmitted = retrievedStates.get(0);
+            Preconditions.checkArgument(
+                    numTimesEmitted <= 2,
+                    getClass().getSimpleName()
+                            + " retrieved invalid numTimesEmitted: "
+                            + numTimesEmitted);
+        } else {
+            this.numTimesEmitted = 0;
+        }
+    }
+
+    @Override
+    public void run(SourceContext<T> ctx) throws Exception {
+        switch (numTimesEmitted) {
+            case 0:
+                emitElementsAndWaitForCheckpoints(ctx);
+                emitElementsAndWaitForCheckpoints(ctx);
+                break;
+            case 1:
+                emitElementsAndWaitForCheckpoints(ctx);
+                break;
+            case 2:
+                // Maybe missed notifyCheckpointComplete, wait next 
notifyCheckpointComplete
+                final Object lock = ctx.getCheckpointLock();
+                synchronized (lock) {
+                    int checkpointToAwait = numCheckpointsComplete + 2;
+                    while (running && numCheckpointsComplete < 
checkpointToAwait) {
+                        lock.wait(1);
+                    }
+                }
+                break;
+        }
+    }
+
+    private void emitElementsAndWaitForCheckpoints(SourceContext<T> ctx)
+            throws InterruptedException {
+        final Object lock = ctx.getCheckpointLock();
+
+        final int checkpointToAwait;
+        synchronized (lock) {
+            checkpointToAwait = numCheckpointsComplete + 2;
+            for (T t : elements) {
+                ctx.collect(t);
+            }
+            numTimesEmitted++;
+        }
+
+        synchronized (lock) {
+            while (running && numCheckpointsComplete < checkpointToAwait) {
+                lock.wait(1);
+            }
+        }
+    }
+
+    @Override
+    public void cancel() {
+        running = false;
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) {
+        numCheckpointsComplete++;
+    }
+
+    @Override
+    public void notifyCheckpointAborted(long checkpointId) {}
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+        Preconditions.checkState(
+                this.checkpointedState != null,
+                "The " + getClass().getSimpleName() + " has not been properly 
initialized.");
+
+        this.checkpointedState.clear();
+        this.checkpointedState.add(this.numTimesEmitted);
+    }
+}
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java
index 2d59440..17733f7 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java
@@ -18,21 +18,76 @@
 
 package org.apache.flink.table.store.connector.sink;
 
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.runtime.operators.sink.TestSink;
+import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+
 import org.junit.jupiter.api.Test;
 
+import java.io.IOException;
+
+import static 
org.apache.flink.table.store.file.manifest.ManifestCommittableSerializerTest.randomIncrement;
+import static 
org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link CommittableSerializer}. */
 public class CommittableSerializerTest {
 
+    private final FileCommittableSerializer fileSerializer =
+            new FileCommittableSerializer(
+                    RowType.of(new IntType()),
+                    RowType.of(new IntType()),
+                    RowType.of(new IntType()));
+
+    private final CommittableSerializer serializer =
+            new CommittableSerializer(
+                    fileSerializer,
+                    (SimpleVersionedSerializer) 
TestSink.StringCommittableSerializer.INSTANCE);
+
+    @Test
+    public void testFile() throws IOException {
+        Increment increment = randomIncrement();
+        FileCommittable committable = new FileCommittable(row(0), 1, 
increment);
+        FileCommittable newCommittable =
+                (FileCommittable)
+                        serializer
+                                .deserialize(
+                                        1,
+                                        serializer.serialize(
+                                                new Committable(
+                                                        Committable.Kind.FILE, 
committable)))
+                                .wrappedCommittable();
+        assertThat(newCommittable).isEqualTo(committable);
+    }
+
+    @Test
+    public void testLogOffset() throws IOException {
+        LogOffsetCommittable committable = new LogOffsetCommittable(2, 3);
+        LogOffsetCommittable newCommittable =
+                (LogOffsetCommittable)
+                        serializer
+                                .deserialize(
+                                        1,
+                                        serializer.serialize(
+                                                new Committable(
+                                                        
Committable.Kind.LOG_OFFSET, committable)))
+                                .wrappedCommittable();
+        assertThat(newCommittable).isEqualTo(committable);
+    }
+
     @Test
-    public void test() {
-        byte[] bytes = new byte[] {4, 5, 1};
-        Committable committable = new Committable(Committable.Kind.LOG, bytes, 
9);
-        byte[] serialize = 
CommittableSerializer.INSTANCE.serialize(committable);
-        Committable deser = CommittableSerializer.INSTANCE.deserialize(1, 
serialize);
-        assertThat(deser.kind()).isEqualTo(Committable.Kind.LOG);
-        assertThat(deser.serializerVersion()).isEqualTo(9);
-        assertThat(deser.wrappedCommittable()).isEqualTo(bytes);
+    public void testLog() throws IOException {
+        String log = "random_string";
+        String newCommittable =
+                (String)
+                        serializer
+                                .deserialize(
+                                        1,
+                                        serializer.serialize(
+                                                new 
Committable(Committable.Kind.LOG, log)))
+                                .wrappedCommittable();
+        assertThat(newCommittable).isEqualTo(log);
     }
 }
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/GlobalCommittableSerializerTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/GlobalCommittableSerializerTest.java
deleted file mode 100644
index 31753c3..0000000
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/GlobalCommittableSerializerTest.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector.sink;
-
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-import 
org.apache.flink.table.store.file.manifest.ManifestCommittableSerializerTest;
-
-import org.junit.jupiter.api.Test;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Test for {@link GlobalCommittableSerializer}. */
-public class GlobalCommittableSerializerTest {
-
-    @Test
-    public void test() throws IOException {
-        List<String> logs = Arrays.asList("1", "2");
-        GlobalCommittable<String> committable =
-                new GlobalCommittable<>(logs, 
ManifestCommittableSerializerTest.create());
-        GlobalCommittableSerializer<String> serializer =
-                new GlobalCommittableSerializer<>(
-                        new TestStringSerializer(), 
ManifestCommittableSerializerTest.serializer());
-        byte[] serialized = serializer.serialize(committable);
-        GlobalCommittable<String> deser = serializer.deserialize(1, 
serialized);
-        
assertThat(deser.logCommittables()).isEqualTo(committable.logCommittables());
-        
assertThat(deser.fileCommittable()).isEqualTo(committable.fileCommittable());
-    }
-
-    private static final class TestStringSerializer implements 
SimpleVersionedSerializer<String> {
-
-        private static final int VERSION = 1073741823;
-
-        private TestStringSerializer() {}
-
-        public int getVersion() {
-            return VERSION;
-        }
-
-        public byte[] serialize(String str) {
-            return str.getBytes(StandardCharsets.UTF_8);
-        }
-
-        public String deserialize(int version, byte[] serialized) {
-            assertThat(version).isEqualTo(VERSION);
-            return new String(serialized, StandardCharsets.UTF_8);
-        }
-    }
-}
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
new file mode 100644
index 0000000..023b4b6
--- /dev/null
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.store.connector.FileStoreITCase;
+import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.kafka.KafkaLogSinkProvider;
+import org.apache.flink.table.store.kafka.KafkaLogSourceProvider;
+import org.apache.flink.table.store.kafka.KafkaLogStoreFactory;
+import org.apache.flink.table.store.kafka.KafkaLogTestUtils;
+import org.apache.flink.table.store.kafka.KafkaTableTestBase;
+import org.apache.flink.table.store.log.LogOptions;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.store.connector.FileStoreITCase.buildBatchEnv;
+import static 
org.apache.flink.table.store.connector.FileStoreITCase.buildConfiguration;
+import static 
org.apache.flink.table.store.connector.FileStoreITCase.buildFileStore;
+import static 
org.apache.flink.table.store.connector.FileStoreITCase.buildStreamEnv;
+import static 
org.apache.flink.table.store.connector.FileStoreITCase.buildTestSource;
+import static 
org.apache.flink.table.store.kafka.KafkaLogTestUtils.SINK_CONTEXT;
+import static 
org.apache.flink.table.store.kafka.KafkaLogTestUtils.SOURCE_CONTEXT;
+import static 
org.apache.flink.table.store.kafka.KafkaLogTestUtils.discoverKafkaLogFactory;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for table store with kafka. */
+public class LogStoreSinkITCase extends KafkaTableTestBase {
+
+    @Test
+    public void testStreamingPartitioned() throws Exception {
+        innerTest("testStreamingPartitioned", false, true, true);
+    }
+
+    @Test
+    public void testStreamingNonPartitioned() throws Exception {
+        innerTest("testStreamingNonPartitioned", false, false, true);
+    }
+
+    @Test
+    public void testBatchPartitioned() throws Exception {
+        innerTest("testBatchPartitioned", true, true, true);
+    }
+
+    @Test
+    public void testStreamingEventual() throws Exception {
+        innerTest("testStreamingEventual", false, true, false);
+    }
+
+    private void innerTest(String name, boolean isBatch, boolean partitioned, 
boolean transaction)
+            throws Exception {
+        StreamExecutionEnvironment env = isBatch ? buildBatchEnv() : 
buildStreamEnv();
+
+        // in eventual mode, failure will result in duplicate data
+        FileStore fileStore =
+                buildFileStore(
+                        buildConfiguration(isBatch || !transaction, 
TEMPORARY_FOLDER.newFolder()),
+                        partitioned);
+
+        // prepare log
+        DynamicTableFactory.Context context =
+                KafkaLogTestUtils.testContext(
+                        name,
+                        getBootstrapServers(),
+                        LogOptions.LogChangelogMode.AUTO,
+                        transaction
+                                ? LogOptions.LogConsistency.TRANSACTIONAL
+                                : LogOptions.LogConsistency.EVENTUAL,
+                        FileStoreITCase.VALUE_TYPE,
+                        new int[] {2});
+
+        KafkaLogStoreFactory factory = discoverKafkaLogFactory();
+        KafkaLogSinkProvider sinkProvider = 
factory.createSinkProvider(context, SINK_CONTEXT);
+        KafkaLogSourceProvider sourceProvider =
+                factory.createSourceProvider(context, SOURCE_CONTEXT);
+
+        factory.onCreateTable(context, 3, true);
+
+        try {
+            // write
+            DataStreamSource<RowData> finiteSource = buildTestSource(env, 
isBatch);
+            FileStoreITCase.write(finiteSource, fileStore, partitioned, null, 
sinkProvider);
+
+            // read
+            List<Row> results = FileStoreITCase.read(env, fileStore);
+
+            Row[] expected =
+                    partitioned
+                            ? new Row[] {
+                                Row.of(5, "p2", 1),
+                                Row.of(3, "p2", 5),
+                                Row.of(5, "p1", 1),
+                                Row.of(0, "p1", 2)
+                            }
+                            : new Row[] {
+                                Row.of(5, "p2", 1), Row.of(0, "p1", 2), 
Row.of(3, "p2", 5)
+                            };
+            assertThat(results).containsExactlyInAnyOrder(expected);
+
+            results =
+                    buildStreamEnv()
+                            .fromSource(
+                                    sourceProvider.createSource(null),
+                                    WatermarkStrategy.noWatermarks(),
+                                    "source")
+                            .executeAndCollect(isBatch ? 6 : 12).stream()
+                            .map(FileStoreITCase.CONVERTER::toExternal)
+                            .collect(Collectors.toList());
+
+            if (isBatch) {
+                expected =
+                        new Row[] {
+                            Row.of(0, "p1", 1),
+                            Row.of(0, "p1", 2),
+                            Row.of(5, "p1", 1),
+                            Row.of(6, "p2", 1),
+                            Row.of(3, "p2", 5),
+                            Row.of(5, "p2", 1)
+                        };
+            } else {
+                // read log
+                // expect origin data X 2 (FiniteTestSource)
+                expected =
+                        new Row[] {
+                            Row.of(0, "p1", 1),
+                            Row.of(0, "p1", 2),
+                            Row.of(5, "p1", 1),
+                            Row.of(6, "p2", 1),
+                            Row.of(3, "p2", 5),
+                            Row.of(5, "p2", 1),
+                            Row.of(0, "p1", 1),
+                            Row.of(0, "p1", 2),
+                            Row.of(5, "p1", 1),
+                            Row.of(6, "p2", 1),
+                            Row.of(3, "p2", 5),
+                            Row.of(5, "p2", 1)
+                        };
+            }
+            assertThat(results).containsExactlyInAnyOrder(expected);
+        } finally {
+            factory.onDropTable(context, true);
+        }
+    }
+}
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
index fd3d97f..e2ef3e7 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import 
org.apache.flink.table.store.connector.sink.TestFileStore.TestRecordWriter;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
 import org.apache.flink.table.store.file.utils.RecordWriter;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.IntType;
@@ -123,7 +124,8 @@ public class StoreSinkTest {
                         primaryKeys,
                         2,
                         () -> lock,
-                        new HashMap<>());
+                        new HashMap<>(),
+                        null);
         writeAndCommit(
                 sink,
                 GenericRowData.ofKind(RowKind.INSERT, 0, 0, 1),
@@ -225,7 +227,7 @@ public class StoreSinkTest {
 
     private void commit(StoreSink<?, ?> sink, List<Committable> 
fileCommittables) throws Exception {
         StoreGlobalCommitter committer = sink.createGlobalCommitter();
-        GlobalCommittable<?> committable = committer.combine(0, 
fileCommittables);
+        ManifestCommittable committable = committer.combine(0, 
fileCommittables);
 
         fileStore.expired = false;
         lock.locked = false;
@@ -246,7 +248,14 @@ public class StoreSinkTest {
 
     private StoreSink<?, ?> newSink(Map<String, String> overwritePartition) {
         return new StoreSink<>(
-                identifier, fileStore, partitions, primaryKeys, 2, () -> lock, 
overwritePartition);
+                identifier,
+                fileStore,
+                partitions,
+                primaryKeys,
+                2,
+                () -> lock,
+                overwritePartition,
+                null);
     }
 
     private class TestLock implements CatalogLock {
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperatorTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperatorTest.java
index 3bbead5..17cbba1 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperatorTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperatorTest.java
@@ -49,7 +49,7 @@ public class GlobalCommitterOperatorTest {
     @Test
     public void closeCommitter() throws Exception {
         final DefaultGlobalCommitter globalCommitter = new 
DefaultGlobalCommitter();
-        final OneInputStreamOperatorTestHarness<CommittableMessage<String>, 
Void> testHarness =
+        final OneInputStreamOperatorTestHarness<CommittableMessage<String>, ?> 
testHarness =
                 createTestHarness(globalCommitter);
         testHarness.initializeEmptyState();
         testHarness.open();
@@ -69,7 +69,7 @@ public class GlobalCommitterOperatorTest {
                 buildSubtaskState(createTestHarness(), input2);
 
         final DefaultGlobalCommitter globalCommitter = new 
DefaultGlobalCommitter();
-        final OneInputStreamOperatorTestHarness<CommittableMessage<String>, 
Void> testHarness =
+        final OneInputStreamOperatorTestHarness<CommittableMessage<String>, ?> 
testHarness =
                 createTestHarness(globalCommitter);
 
         final OperatorSubtaskState mergedOperatorSubtaskState =
@@ -108,7 +108,7 @@ public class GlobalCommitterOperatorTest {
         expectedOutput.add(DefaultGlobalCommitter.COMBINER.apply(input2));
         expectedOutput.add(DefaultGlobalCommitter.COMBINER.apply(input3));
 
-        final OneInputStreamOperatorTestHarness<CommittableMessage<String>, 
Void> testHarness =
+        final OneInputStreamOperatorTestHarness<CommittableMessage<String>, ?> 
testHarness =
                 createTestHarness(globalCommitter);
         testHarness.initializeEmptyState();
         testHarness.open();
@@ -138,7 +138,7 @@ public class GlobalCommitterOperatorTest {
         final DefaultGlobalCommitter globalCommitter =
                 new DefaultGlobalCommitter(successCommittedCommittable);
 
-        final OneInputStreamOperatorTestHarness<CommittableMessage<String>, 
Void> testHarness =
+        final OneInputStreamOperatorTestHarness<CommittableMessage<String>, ?> 
testHarness =
                 createTestHarness(globalCommitter);
 
         // all data from previous checkpoint are expected to be committed,
@@ -155,7 +155,7 @@ public class GlobalCommitterOperatorTest {
     public void endOfInput() throws Exception {
         final DefaultGlobalCommitter globalCommitter = new 
DefaultGlobalCommitter();
 
-        final OneInputStreamOperatorTestHarness<CommittableMessage<String>, 
Void> testHarness =
+        final OneInputStreamOperatorTestHarness<CommittableMessage<String>, ?> 
testHarness =
                 createTestHarness(globalCommitter);
         testHarness.initializeEmptyState();
         testHarness.open();
@@ -166,12 +166,12 @@ public class GlobalCommitterOperatorTest {
         
assertThat(globalCommitter.getCommittedData()).contains("elder+patience+silent");
     }
 
-    private OneInputStreamOperatorTestHarness<CommittableMessage<String>, 
Void> createTestHarness()
+    private OneInputStreamOperatorTestHarness<CommittableMessage<String>, ?> 
createTestHarness()
             throws Exception {
         return createTestHarness(new DefaultGlobalCommitter());
     }
 
-    private OneInputStreamOperatorTestHarness<CommittableMessage<String>, 
Void> createTestHarness(
+    private OneInputStreamOperatorTestHarness<CommittableMessage<String>, ?> 
createTestHarness(
             GlobalCommitter<String, String> globalCommitter) throws Exception {
         return new OneInputStreamOperatorTestHarness<>(
                 new GlobalCommitterOperator<>(
@@ -183,7 +183,7 @@ public class GlobalCommitterOperatorTest {
     }
 
     public static OperatorSubtaskState buildSubtaskState(
-            OneInputStreamOperatorTestHarness<CommittableMessage<String>, 
Void> testHarness,
+            OneInputStreamOperatorTestHarness<CommittableMessage<String>, ?> 
testHarness,
             List<String> input)
             throws Exception {
         testHarness.initializeEmptyState();
@@ -199,7 +199,7 @@ public class GlobalCommitterOperatorTest {
         return operatorSubtaskState;
     }
 
-    private static List<StreamRecord<CommittableMessage<String>>> 
committableRecords(
+    static List<StreamRecord<CommittableMessage<String>>> committableRecords(
             Collection<String> elements) {
         return elements.stream()
                 .map(GlobalCommitterOperatorTest::toCommittableMessage)
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/LocalCommitterOperatorTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/LocalCommitterOperatorTest.java
new file mode 100644
index 0000000..528f5dc
--- /dev/null
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/LocalCommitterOperatorTest.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink.global;
+
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import 
org.apache.flink.streaming.runtime.operators.sink.TestSink.StringCommittableSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.store.connector.sink.global.GlobalCommitterOperatorTest.buildSubtaskState;
+import static 
org.apache.flink.table.store.connector.sink.global.GlobalCommitterOperatorTest.committableRecords;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertNotNull;
+
+/** Test the {@link LocalCommitterOperator}. */
+public class LocalCommitterOperatorTest {
+
+    @Test
+    public void supportRetry() throws Exception {
+        final List<String> input = Arrays.asList("lazy", "leaf");
+        final RetryOnceCommitter committer = new RetryOnceCommitter();
+        final OneInputStreamOperatorTestHarness<
+                        CommittableMessage<String>, CommittableMessage<String>>
+                testHarness = createTestHarness(committer);
+
+        testHarness.initializeEmptyState();
+        testHarness.open();
+        testHarness.processElements(committableRecords(input));
+        testHarness.prepareSnapshotPreBarrier(1);
+        testHarness.snapshot(1L, 1L);
+        testHarness.notifyOfCompletedCheckpoint(1L);
+        testHarness.snapshot(2L, 2L);
+        testHarness.notifyOfCompletedCheckpoint(2L);
+
+        testHarness.close();
+
+        assertThat(committer.getCommittedData()).contains("lazy", "leaf");
+    }
+
+    @Test
+    public void closeCommitter() throws Exception {
+        final DefaultCommitter committer = new DefaultCommitter();
+        final OneInputStreamOperatorTestHarness<
+                        CommittableMessage<String>, CommittableMessage<String>>
+                testHarness = createTestHarness(committer);
+        testHarness.initializeEmptyState();
+        testHarness.open();
+        testHarness.close();
+        assertThat(committer.isClosed()).isTrue();
+    }
+
+    @Test
+    public void restoredFromMergedState() throws Exception {
+        final List<String> input1 = Arrays.asList("today", "whom");
+        final OperatorSubtaskState operatorSubtaskState1 =
+                buildSubtaskState(createTestHarness(), input1);
+
+        final List<String> input2 = Arrays.asList("future", "evil", "how");
+        final OperatorSubtaskState operatorSubtaskState2 =
+                buildSubtaskState(createTestHarness(), input2);
+
+        final DefaultCommitter committer = new DefaultCommitter();
+        final OneInputStreamOperatorTestHarness<
+                        CommittableMessage<String>, CommittableMessage<String>>
+                testHarness = createTestHarness(committer);
+
+        final OperatorSubtaskState mergedOperatorSubtaskState =
+                OneInputStreamOperatorTestHarness.repackageState(
+                        operatorSubtaskState1, operatorSubtaskState2);
+
+        testHarness.initializeState(
+                OneInputStreamOperatorTestHarness.repartitionOperatorState(
+                        mergedOperatorSubtaskState, 2, 2, 1, 0));
+        testHarness.open();
+
+        final List<String> expectedOutput = new ArrayList<>();
+        expectedOutput.addAll(input1);
+        expectedOutput.addAll(input2);
+
+        testHarness.prepareSnapshotPreBarrier(1L);
+        testHarness.snapshot(1L, 1L);
+        testHarness.notifyOfCompletedCheckpoint(1);
+
+        testHarness.close();
+
+        assertThat(committer.getCommittedData())
+                .containsExactlyInAnyOrder(expectedOutput.toArray(new 
String[0]));
+    }
+
+    @Test
+    public void commitMultipleStagesTogether() throws Exception {
+
+        final DefaultCommitter committer = new DefaultCommitter();
+
+        final List<String> input1 = Arrays.asList("cautious", "nature");
+        final List<String> input2 = Arrays.asList("count", "over");
+        final List<String> input3 = Arrays.asList("lawyer", "grammar");
+
+        final List<String> expectedOutput = new ArrayList<>();
+
+        expectedOutput.addAll(input1);
+        expectedOutput.addAll(input2);
+        expectedOutput.addAll(input3);
+
+        final OneInputStreamOperatorTestHarness<
+                        CommittableMessage<String>, CommittableMessage<String>>
+                testHarness = createTestHarness(committer);
+        testHarness.initializeEmptyState();
+        testHarness.open();
+
+        testHarness.processElements(committableRecords(input1));
+        testHarness.prepareSnapshotPreBarrier(1L);
+        testHarness.snapshot(1L, 1L);
+        testHarness.processElements(committableRecords(input2));
+        testHarness.prepareSnapshotPreBarrier(2L);
+        testHarness.snapshot(2L, 2L);
+        testHarness.processElements(committableRecords(input3));
+        testHarness.prepareSnapshotPreBarrier(3L);
+        testHarness.snapshot(3L, 3L);
+
+        testHarness.notifyOfCompletedCheckpoint(1);
+        testHarness.notifyOfCompletedCheckpoint(3);
+
+        testHarness.close();
+
+        
assertThat(fromRecords(testHarness.getRecordOutput())).isEqualTo(expectedOutput);
+
+        assertThat(committer.getCommittedData()).isEqualTo(expectedOutput);
+    }
+
+    private static List<String> fromRecords(
+            Collection<StreamRecord<CommittableMessage<String>>> elements) {
+        return elements.stream()
+                .map(StreamRecord::getValue)
+                .filter(message -> message instanceof CommittableWithLineage)
+                .map(message -> ((CommittableWithLineage<String>) 
message).getCommittable())
+                .collect(Collectors.toList());
+    }
+
+    private OneInputStreamOperatorTestHarness<
+                    CommittableMessage<String>, CommittableMessage<String>>
+            createTestHarness() throws Exception {
+        return createTestHarness(new DefaultCommitter());
+    }
+
+    private OneInputStreamOperatorTestHarness<
+                    CommittableMessage<String>, CommittableMessage<String>>
+            createTestHarness(Committer<String> committer) throws Exception {
+        return new OneInputStreamOperatorTestHarness<>(
+                new LocalCommitterOperator<>(
+                        () -> committer, () -> 
StringCommittableSerializer.INSTANCE));
+    }
+
+    /** Base class for testing {@link Committer}. */
+    private static class DefaultCommitter implements Committer<String>, 
Serializable {
+
+        @Nullable protected Queue<String> committedData;
+
+        private boolean isClosed;
+
+        @Nullable private final Supplier<Queue<String>> queueSupplier;
+
+        public DefaultCommitter() {
+            this.committedData = new ConcurrentLinkedQueue<>();
+            this.isClosed = false;
+            this.queueSupplier = null;
+        }
+
+        public List<String> getCommittedData() {
+            if (committedData != null) {
+                return new ArrayList<>(committedData);
+            } else {
+                return Collections.emptyList();
+            }
+        }
+
+        @Override
+        public void close() {
+            isClosed = true;
+        }
+
+        public boolean isClosed() {
+            return isClosed;
+        }
+
+        @Override
+        public void commit(Collection<CommitRequest<String>> requests) {
+            if (committedData == null) {
+                assertNotNull(queueSupplier);
+                committedData = queueSupplier.get();
+            }
+            committedData.addAll(
+                    requests.stream()
+                            .map(CommitRequest::getCommittable)
+                            .collect(Collectors.toList()));
+        }
+    }
+
+    /** A {@link Committer} that always re-commits the committables data it 
received. */
+    private static class RetryOnceCommitter extends DefaultCommitter 
implements Committer<String> {
+
+        private final Set<String> seen = new LinkedHashSet<>();
+
+        @Override
+        public void commit(Collection<CommitRequest<String>> requests) {
+            requests.forEach(
+                    c -> {
+                        if (seen.remove(c.getCommittable())) {
+                            checkNotNull(committedData);
+                            committedData.add(c.getCommittable());
+                        } else {
+                            seen.add(c.getCommittable());
+                            c.retryLater();
+                        }
+                    });
+        }
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogWriteCallback.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogWriteCallback.java
new file mode 100644
index 0000000..03c62d6
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogWriteCallback.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.log;
+
+import org.apache.flink.table.store.log.LogSinkProvider.WriteCallback;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.LongAccumulator;
+
+/** A {@link WriteCallback} implementation. */
+public class LogWriteCallback implements WriteCallback {
+
+    private final ConcurrentHashMap<Integer, LongAccumulator> offsetMap = new 
ConcurrentHashMap<>();
+
+    @Override
+    public void onCompletion(int bucket, long offset) {
+        LongAccumulator acc = offsetMap.get(bucket);
+        if (acc == null) {
+            // computeIfAbsent will lock on the key
+            acc = offsetMap.computeIfAbsent(bucket, k -> new 
LongAccumulator(Long::max, 0));
+        } // else lock free
+        acc.accumulate(offset);
+    }
+
+    public Map<Integer, Long> offsets() {
+        Map<Integer, Long> offsets = new HashMap<>();
+        offsetMap.forEach((k, v) -> offsets.put(k, v.longValue()));
+        return offsets;
+    }
+}
diff --git 
a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
 
b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
index 0fad212..b4b2efc 100644
--- 
a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
+++ 
b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
@@ -63,7 +63,7 @@ import static 
org.apache.flink.table.store.log.LogOptions.LogConsistency;
 /** Utils for the test of {@link KafkaLogStoreFactory}. */
 public class KafkaLogTestUtils {
 
-    static final LogStoreTableFactory.SourceContext SOURCE_CONTEXT =
+    public static final LogStoreTableFactory.SourceContext SOURCE_CONTEXT =
             new LogStoreTableFactory.SourceContext() {
                 @Override
                 public <T> TypeInformation<T> createTypeInformation(DataType 
producedDataType) {
@@ -85,7 +85,7 @@ public class KafkaLogTestUtils {
                 }
             };
 
-    static final LogStoreTableFactory.SinkContext SINK_CONTEXT =
+    public static final LogStoreTableFactory.SinkContext SINK_CONTEXT =
             new LogStoreTableFactory.SinkContext() {
 
                 @Override
@@ -113,7 +113,7 @@ public class KafkaLogTestUtils {
                 }
             };
 
-    static KafkaLogStoreFactory discoverKafkaLogFactory() {
+    public static KafkaLogStoreFactory discoverKafkaLogFactory() {
         return (KafkaLogStoreFactory)
                 LogStoreTableFactory.discoverLogStoreFactory(
                         Thread.currentThread().getContextClassLoader(),
@@ -169,15 +169,27 @@ public class KafkaLogTestUtils {
             LogChangelogMode changelogMode,
             LogConsistency consistency,
             boolean keyed) {
+        return testContext(
+                name,
+                servers,
+                changelogMode,
+                consistency,
+                RowType.of(new IntType(), new IntType()),
+                keyed ? new int[] {0} : new int[0]);
+    }
+
+    public static DynamicTableFactory.Context testContext(
+            String name,
+            String servers,
+            LogChangelogMode changelogMode,
+            LogConsistency consistency,
+            RowType type,
+            int[] keys) {
         Map<String, String> options = new HashMap<>();
         options.put(CHANGELOG_MODE.key(), changelogMode.toString());
         options.put(CONSISTENCY.key(), consistency.toString());
         options.put(BOOTSTRAP_SERVERS.key(), servers);
-        return createContext(
-                name,
-                RowType.of(new IntType(), new IntType()),
-                keyed ? new int[] {0} : new int[0],
-                options);
+        return createContext(name, type, keys, options);
     }
 
     static SinkRecord testRecord(boolean keyed, int bucket, int key, int 
value, RowKind rowKind) {

Reply via email to