This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ecce2cc87 [FLINK-31434] Introduce CDC sink for Table Store (#674)
ecce2cc87 is described below

commit ecce2cc876c1bd9095abef4fdf24400bf179fefa
Author: tsreaper <[email protected]>
AuthorDate: Wed Mar 22 14:52:51 2023 +0800

    [FLINK-31434] Introduce CDC sink for Table Store (#674)
---
 .../main/java/org/apache/paimon/cdc/CdcRecord.java |   5 +
 .../java/org/apache/paimon/cdc/EventParser.java    |  46 +++++
 .../apache/paimon/flink/sink/CompactorSink.java    |   2 +-
 .../apache/paimon/flink/sink/FileStoreSink.java    |   2 +-
 .../org/apache/paimon/flink/sink/FlinkSink.java    |   9 +-
 .../flink/sink/cdc/CdcBucketStreamPartitioner.java | 141 ++++++++++++++
 .../flink/sink/cdc/CdcParsingProcessFunction.java  |  69 +++++++
 .../{FileStoreSink.java => cdc/FlinkCdcSink.java}  |  32 ++--
 .../paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java | 116 ++++++++++++
 .../sink/cdc/SchemaChangeProcessFunction.java      |  64 +++++++
 .../paimon/flink/sink/cdc/FlinkCdcSinkITCase.java  | 204 +++++++++++++++++++++
 .../apache/paimon/flink/sink/cdc/TestCdcEvent.java |  45 +++--
 .../paimon/flink/sink/cdc/TestCdcEventParser.java  |  52 ++++++
 .../flink/sink/cdc/TestCdcSourceFunction.java      | 110 +++++++++++
 14 files changed, 853 insertions(+), 44 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/cdc/CdcRecord.java 
b/paimon-core/src/main/java/org/apache/paimon/cdc/CdcRecord.java
index 3c65dec4e..4e80002af 100644
--- a/paimon-core/src/main/java/org/apache/paimon/cdc/CdcRecord.java
+++ b/paimon-core/src/main/java/org/apache/paimon/cdc/CdcRecord.java
@@ -55,4 +55,9 @@ public class CdcRecord implements Serializable {
         CdcRecord that = (CdcRecord) o;
         return Objects.equals(kind, that.kind) && Objects.equals(fields, 
that.fields);
     }
+
+    @Override
+    public String toString() {
+        return kind.shortString() + " " + fields;
+    }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/cdc/EventParser.java 
b/paimon-core/src/main/java/org/apache/paimon/cdc/EventParser.java
new file mode 100644
index 000000000..46c744699
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/cdc/EventParser.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.cdc;
+
+import org.apache.paimon.schema.SchemaChange;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Parse a CDC change event to a list of {@link SchemaChange} or {@link 
CdcRecord}.
+ *
+ * @param <T> CDC change event type
+ */
+public interface EventParser<T> {
+
+    void setRawEvent(T rawEvent);
+
+    boolean isSchemaChange();
+
+    List<SchemaChange> getSchemaChanges();
+
+    List<CdcRecord> getRecords();
+
+    /** Factory to create an {@link EventParser}. */
+    interface Factory<T> extends Serializable {
+
+        EventParser<T> create();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
index 166de1eae..ee26918a9 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
@@ -26,7 +26,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.util.function.SerializableFunction;
 
 /** {@link FlinkSink} for dedicated compact jobs. */
-public class CompactorSink extends FlinkSink {
+public class CompactorSink extends FlinkSink<RowData> {
 
     private static final long serialVersionUID = 1L;
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
index 5e6cf4181..b8d5bf2af 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
@@ -32,7 +32,7 @@ import javax.annotation.Nullable;
 import java.util.Map;
 
 /** {@link FlinkSink} for writing records into paimon. */
-public class FileStoreSink extends FlinkSink {
+public class FileStoreSink extends FlinkSink<RowData> {
 
     private static final long serialVersionUID = 1L;
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 5db1ad11a..0e12cbef5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -35,7 +35,6 @@ import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.table.data.RowData;
 import org.apache.flink.util.function.SerializableFunction;
 
 import java.io.Serializable;
@@ -45,7 +44,7 @@ import static 
org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_F
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_LOOKUP_WAIT;
 
 /** Abstract sink of paimon. */
-public abstract class FlinkSink implements Serializable {
+public abstract class FlinkSink<T> implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
@@ -91,7 +90,7 @@ public abstract class FlinkSink implements Serializable {
                 new StoreSinkWriteImpl(table, context, initialCommitUser, 
ioManager, isOverwrite);
     }
 
-    public DataStreamSink<?> sinkFrom(DataStream<RowData> input) {
+    public DataStreamSink<?> sinkFrom(DataStream<T> input) {
         // This commitUser is valid only for new jobs.
         // After the job starts, this commitUser will be recorded into the 
states of write and
         // commit operators.
@@ -102,7 +101,7 @@ public abstract class FlinkSink implements Serializable {
     }
 
     public DataStreamSink<?> sinkFrom(
-            DataStream<RowData> input, String commitUser, 
StoreSinkWrite.Provider sinkProvider) {
+            DataStream<T> input, String commitUser, StoreSinkWrite.Provider 
sinkProvider) {
         StreamExecutionEnvironment env = input.getExecutionEnvironment();
         ReadableConfig conf = 
StreamExecutionEnvironmentUtils.getConfiguration(env);
         CheckpointConfig checkpointConfig = env.getCheckpointConfig();
@@ -150,7 +149,7 @@ public abstract class FlinkSink implements Serializable {
                         + " to exactly-once");
     }
 
-    protected abstract OneInputStreamOperator<RowData, Committable> 
createWriteOperator(
+    protected abstract OneInputStreamOperator<T, Committable> 
createWriteOperator(
             StoreSinkWrite.Provider writeProvider, boolean isStreaming);
 
     protected abstract SerializableFunction<String, Committer> 
createCommitterFactory(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcBucketStreamPartitioner.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcBucketStreamPartitioner.java
new file mode 100644
index 000000000..ef852be7f
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcBucketStreamPartitioner.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.cdc.CdcRecord;
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.Projection;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.BucketComputer;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.TypeUtils;
+
+import org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+/**
+ * A {@link StreamPartitioner} which partitions {@link CdcRecord}s according 
to the hash value of
+ * bucket keys (or primary keys if bucket keys are not specified).
+ */
+public class CdcBucketStreamPartitioner extends StreamPartitioner<CdcRecord> {
+
+    private final List<String> bucketKeys;
+    private final DataType[] bucketTypes;
+    private final List<String> partitionKeys;
+    private final DataType[] partitionTypes;
+    private final boolean shuffleByPartitionEnable;
+
+    private transient int numberOfChannels;
+    private transient Projection bucketProjection;
+    private transient Projection partitionProjection;
+
+    public CdcBucketStreamPartitioner(TableSchema schema, boolean 
shuffleByPartitionEnable) {
+        List<String> bucketKeys = schema.originalBucketKeys();
+        if (bucketKeys.isEmpty()) {
+            bucketKeys = schema.primaryKeys();
+        }
+        Preconditions.checkArgument(
+                bucketKeys.size() > 0, "Either bucket keys or primary keys 
must be defined");
+
+        this.bucketKeys = bucketKeys;
+        this.bucketTypes = getTypes(this.bucketKeys, schema);
+        this.partitionKeys = schema.partitionKeys();
+        this.partitionTypes = getTypes(this.partitionKeys, schema);
+        this.shuffleByPartitionEnable = shuffleByPartitionEnable;
+    }
+
+    private DataType[] getTypes(List<String> keys, TableSchema schema) {
+        List<DataType> types = new ArrayList<>();
+        for (String key : keys) {
+            int idx = schema.fieldNames().indexOf(key);
+            types.add(schema.fields().get(idx).type());
+        }
+        return types.toArray(new DataType[0]);
+    }
+
+    @Override
+    public void setup(int numberOfChannels) {
+        super.setup(numberOfChannels);
+        this.numberOfChannels = numberOfChannels;
+        this.bucketProjection =
+                CodeGenUtils.newProjection(
+                        RowType.of(bucketTypes), IntStream.range(0, 
bucketTypes.length).toArray());
+        this.partitionProjection =
+                CodeGenUtils.newProjection(
+                        RowType.of(partitionTypes),
+                        IntStream.range(0, partitionTypes.length).toArray());
+    }
+
+    @Override
+    public int selectChannel(
+            SerializationDelegate<StreamRecord<CdcRecord>> 
streamRecordSerializationDelegate) {
+        CdcRecord record = 
streamRecordSerializationDelegate.getInstance().getValue();
+        BinaryRow bucketKeyRow =
+                toBinaryRow(record.fields(), bucketKeys, bucketTypes, 
bucketProjection);
+        if (shuffleByPartitionEnable) {
+            BinaryRow partitionKeyRow =
+                    toBinaryRow(
+                            record.fields(), partitionKeys, partitionTypes, 
partitionProjection);
+            return BucketComputer.bucket(
+                    Objects.hash(bucketKeyRow.hashCode(), 
partitionKeyRow.hashCode()),
+                    numberOfChannels);
+        } else {
+            return BucketComputer.bucket(bucketKeyRow.hashCode(), 
numberOfChannels);
+        }
+    }
+
+    private BinaryRow toBinaryRow(
+            Map<String, String> fields,
+            List<String> keys,
+            DataType[] types,
+            Projection projection) {
+        GenericRow genericRow = new GenericRow(keys.size());
+        for (int i = 0; i < keys.size(); i++) {
+            genericRow.setField(i, 
TypeUtils.castFromString(fields.get(keys.get(i)), types[i]));
+        }
+        return projection.apply(genericRow);
+    }
+
+    @Override
+    public StreamPartitioner<CdcRecord> copy() {
+        return null;
+    }
+
+    @Override
+    public SubtaskStateMapper getDownstreamSubtaskStateMapper() {
+        return SubtaskStateMapper.FULL;
+    }
+
+    @Override
+    public boolean isPointwise() {
+        return false;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
new file mode 100644
index 000000000..223b709ec
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.cdc.CdcRecord;
+import org.apache.paimon.cdc.EventParser;
+import org.apache.paimon.schema.SchemaChange;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
+/**
+ * A {@link ProcessFunction} to parse CDC change event to either {@link 
SchemaChange} or {@link
+ * CdcRecord} and send them to different downstreams.
+ *
+ * @param <T> CDC change event type
+ */
+public class CdcParsingProcessFunction<T> extends ProcessFunction<T, 
CdcRecord> {
+
+    public static final OutputTag<SchemaChange> SCHEMA_CHANGE_OUTPUT_TAG =
+            new OutputTag<>("schema-change", 
TypeInformation.of(SchemaChange.class));
+
+    private final EventParser.Factory<T> parserFactory;
+
+    private transient EventParser<T> parser;
+
+    public CdcParsingProcessFunction(EventParser.Factory<T> parserFactory) {
+        this.parserFactory = parserFactory;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        parser = parserFactory.create();
+    }
+
+    @Override
+    public void processElement(T raw, Context context, Collector<CdcRecord> 
collector)
+            throws Exception {
+        parser.setRawEvent(raw);
+        if (parser.isSchemaChange()) {
+            for (SchemaChange schemaChange : parser.getSchemaChanges()) {
+                context.output(SCHEMA_CHANGE_OUTPUT_TAG, schemaChange);
+            }
+        } else {
+            for (CdcRecord record : parser.getRecords()) {
+                collector.collect(record);
+            }
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
similarity index 73%
copy from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
copy to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
index 5e6cf4181..7981fec6e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
@@ -16,45 +16,50 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.sink;
+package org.apache.paimon.flink.sink.cdc;
 
+import org.apache.paimon.cdc.CdcRecord;
 import org.apache.paimon.flink.VersionedSerializerWrapper;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.sink.CommittableStateManager;
+import org.apache.paimon.flink.sink.Committer;
+import org.apache.paimon.flink.sink.FlinkSink;
+import org.apache.paimon.flink.sink.LogSinkFunction;
+import org.apache.paimon.flink.sink.RestoreAndFailCommittableStateManager;
+import org.apache.paimon.flink.sink.StoreCommitter;
+import org.apache.paimon.flink.sink.StoreSinkWrite;
 import org.apache.paimon.manifest.ManifestCommittableSerializer;
 import org.apache.paimon.operation.Lock;
 import org.apache.paimon.table.FileStoreTable;
 
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.table.data.RowData;
 import org.apache.flink.util.function.SerializableFunction;
 
 import javax.annotation.Nullable;
 
-import java.util.Map;
-
-/** {@link FlinkSink} for writing records into paimon. */
-public class FileStoreSink extends FlinkSink {
+/**
+ * A {@link FlinkSink} which accepts {@link CdcRecord} and waits for a schema 
change if necessary.
+ */
+public class FlinkCdcSink extends FlinkSink<CdcRecord> {
 
     private static final long serialVersionUID = 1L;
 
     private final Lock.Factory lockFactory;
-    @Nullable private final Map<String, String> overwritePartition;
     @Nullable private final LogSinkFunction logSinkFunction;
 
-    public FileStoreSink(
+    public FlinkCdcSink(
             FileStoreTable table,
             Lock.Factory lockFactory,
-            @Nullable Map<String, String> overwritePartition,
             @Nullable LogSinkFunction logSinkFunction) {
-        super(table, overwritePartition != null);
+        super(table, false);
         this.lockFactory = lockFactory;
-        this.overwritePartition = overwritePartition;
         this.logSinkFunction = logSinkFunction;
     }
 
     @Override
-    protected OneInputStreamOperator<RowData, Committable> createWriteOperator(
+    protected OneInputStreamOperator<CdcRecord, Committable> 
createWriteOperator(
             StoreSinkWrite.Provider writeProvider, boolean isStreaming) {
-        return new RowDataStoreWriteOperator(table, logSinkFunction, 
writeProvider);
+        return new SchemaAwareStoreWriteOperator(table, logSinkFunction, 
writeProvider);
     }
 
     @Override
@@ -67,7 +72,6 @@ public class FileStoreSink extends FlinkSink {
         return user ->
                 new StoreCommitter(
                         table.newCommit(user)
-                                .withOverwrite(overwritePartition)
                                 .withLock(lockFactory.create())
                                 
.ignoreEmptyCommit(!streamingCheckpointEnabled));
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java
new file mode 100644
index 000000000..190deb464
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.cdc.CdcRecord;
+import org.apache.paimon.cdc.EventParser;
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.sink.LogSinkFunction;
+import org.apache.paimon.operation.Lock;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+
+import javax.annotation.Nullable;
+
+/**
+ * Builder for {@link FlinkCdcSink}.
+ *
+ * @param <T> CDC change event type
+ */
+public class FlinkCdcSinkBuilder<T> {
+
+    private DataStream<T> input = null;
+    private EventParser.Factory<T> parserFactory = null;
+    private FileStoreTable table = null;
+    private Lock.Factory lockFactory = Lock.emptyFactory();
+    @Nullable private LogSinkFunction logSinkFunction;
+
+    @Nullable private Integer parallelism;
+
+    public FlinkCdcSinkBuilder<T> withInput(DataStream<T> input) {
+        this.input = input;
+        return this;
+    }
+
+    public FlinkCdcSinkBuilder<T> withParserFactory(EventParser.Factory<T> 
parserFactory) {
+        this.parserFactory = parserFactory;
+        return this;
+    }
+
+    public FlinkCdcSinkBuilder<T> withTable(FileStoreTable table) {
+        this.table = table;
+        return this;
+    }
+
+    public FlinkCdcSinkBuilder<T> withLockFactory(Lock.Factory lockFactory) {
+        this.lockFactory = lockFactory;
+        return this;
+    }
+
+    public FlinkCdcSinkBuilder<T> withLogSinkFunction(@Nullable 
LogSinkFunction logSinkFunction) {
+        this.logSinkFunction = logSinkFunction;
+        return this;
+    }
+
+    public FlinkCdcSinkBuilder<T> withParallelism(@Nullable Integer 
parallelism) {
+        this.parallelism = parallelism;
+        return this;
+    }
+
+    public DataStreamSink<?> build() {
+        Preconditions.checkNotNull(input);
+        Preconditions.checkNotNull(parserFactory);
+        Preconditions.checkNotNull(table);
+
+        SingleOutputStreamOperator<CdcRecord> parsed =
+                input.forward()
+                        .process(new 
CdcParsingProcessFunction<>(parserFactory))
+                        .setParallelism(input.getParallelism());
+
+        DataStream<Void> schemaChangeProcessFunction =
+                
parsed.getSideOutput(CdcParsingProcessFunction.SCHEMA_CHANGE_OUTPUT_TAG)
+                        .process(
+                                new SchemaChangeProcessFunction(
+                                        new SchemaManager(table.fileIO(), 
table.location())));
+        schemaChangeProcessFunction.getTransformation().setParallelism(1);
+
+        CdcBucketStreamPartitioner partitioner =
+                new CdcBucketStreamPartitioner(
+                        table.schema(),
+                        table.options()
+                                .toConfiguration()
+                                
.get(FlinkConnectorOptions.SINK_SHUFFLE_BY_PARTITION));
+        PartitionTransformation<CdcRecord> partitioned =
+                new PartitionTransformation<>(parsed.getTransformation(), 
partitioner);
+        if (parallelism != null) {
+            partitioned.setParallelism(parallelism);
+        }
+
+        StreamExecutionEnvironment env = input.getExecutionEnvironment();
+        FlinkCdcSink sink = new FlinkCdcSink(table, lockFactory, 
logSinkFunction);
+        return sink.sinkFrom(new DataStream<>(env, partitioned));
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaChangeProcessFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaChangeProcessFunction.java
new file mode 100644
index 000000000..d3ff71e09
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaChangeProcessFunction.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link ProcessFunction} to handle {@link SchemaChange}.
+ *
+ * <p>NOTE: To avoid concurrent schema changes, the parallelism of this {@link 
ProcessFunction} must
+ * be 1.
+ */
+public class SchemaChangeProcessFunction extends ProcessFunction<SchemaChange, 
Void> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SchemaChangeProcessFunction.class);
+
+    private final SchemaManager schemaManager;
+
+    public SchemaChangeProcessFunction(SchemaManager schemaManager) {
+        this.schemaManager = schemaManager;
+    }
+
+    @Override
+    public void processElement(
+            SchemaChange schemaChange, Context context, Collector<Void> 
collector)
+            throws Exception {
+        Preconditions.checkArgument(
+                schemaChange instanceof SchemaChange.AddColumn,
+                "Currently, only SchemaChange.AddColumn is supported.");
+        try {
+            schemaManager.commitChanges(schemaChange);
+        } catch (Exception e) {
+            // This is normal. For example when a table is split into multiple 
database tables, all
+            // these tables will be added the same column. However 
schemaManager can't handle
+            // duplicated column adds, so we just catch the exception and log 
it.
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Failed to perform schema change {}", schemaChange, 
e);
+            }
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
new file mode 100644
index 000000000..eb78c1d07
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.cdc.CdcRecord;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.util.AbstractTestBase;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReaderIterator;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.source.DataTableScan;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FailingFileIO;
+import org.apache.paimon.utils.TraceableFileIO;
+
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT cases for {@link FlinkCdcSink}. */
+public class FlinkCdcSinkITCase extends AbstractTestBase {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    @Test
+    @Timeout(60)
+    public void testRandomCdcEvents() throws Exception {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+
+        int numEvents = random.nextInt(20000) + 1;
+        int numSchemaChanges = random.nextInt(20) + 1;
+        int numKeys = random.nextInt(2000) + 1;
+        boolean enableFailure = random.nextBoolean();
+
+        TestCdcEvent[] events = new TestCdcEvent[numEvents];
+
+        Set<Integer> schemaChangePositions = new HashSet<>();
+        for (int i = 0; i < numSchemaChanges; i++) {
+            int pos;
+            do {
+                pos = random.nextInt(numEvents);
+            } while (schemaChangePositions.contains(pos));
+            schemaChangePositions.add(pos);
+        }
+
+        Map<Integer, Map<String, String>> expected = new HashMap<>();
+        List<String> fieldNames = new ArrayList<>();
+        fieldNames.add("v0");
+        int suffixId = 0;
+        for (int i = 0; i < numEvents; i++) {
+            if (schemaChangePositions.contains(i)) {
+                suffixId++;
+                String newName = "v" + suffixId;
+                fieldNames.add(newName);
+                events[i] = new TestCdcEvent(SchemaChange.addColumn(newName, 
DataTypes.INT()));
+            } else {
+                Map<String, String> fields = new HashMap<>();
+                int key = random.nextInt(numKeys);
+                fields.put("k", String.valueOf(key));
+                for (String fieldName : fieldNames) {
+                    fields.put(fieldName, String.valueOf(random.nextInt()));
+                }
+
+                List<CdcRecord> records = new ArrayList<>();
+                if (expected.containsKey(key)) {
+                    records.add(new CdcRecord(RowKind.DELETE, 
expected.get(key)));
+                }
+                records.add(new CdcRecord(RowKind.INSERT, fields));
+                events[i] = new TestCdcEvent(records);
+                expected.put(key, fields);
+            }
+        }
+
+        Path tablePath;
+        FileIO fileIO;
+        String failingName = UUID.randomUUID().toString();
+        if (enableFailure) {
+            tablePath = new Path(FailingFileIO.getFailingPath(failingName, 
tempDir.toString()));
+            fileIO = new FailingFileIO();
+        } else {
+            tablePath = new Path(TraceableFileIO.SCHEME + "://" + 
tempDir.toString());
+            fileIO = LocalFileIO.create();
+        }
+
+        // no failure when creating table
+        FailingFileIO.reset(failingName, 0, 1);
+
+        FileStoreTable table =
+                createFileStoreTable(
+                        tablePath,
+                        fileIO,
+                        RowType.of(
+                                new DataType[] {DataTypes.INT(), 
DataTypes.INT()},
+                                new String[] {"k", "v0"}),
+                        Collections.emptyList(),
+                        Collections.singletonList("k"));
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getCheckpointConfig().setCheckpointInterval(1000);
+        TestCdcSourceFunction sourceFunction =
+                new TestCdcSourceFunction(
+                        events, record -> 
Integer.valueOf(record.fields().get("k")));
+        DataStreamSource<TestCdcEvent> source = env.addSource(sourceFunction);
+        source.setParallelism(2);
+        new FlinkCdcSinkBuilder<TestCdcEvent>()
+                .withInput(source)
+                .withParserFactory(TestCdcEventParser::new)
+                .withTable(table)
+                .withParallelism(3)
+                .build();
+
+        // enable failure when running jobs if needed
+        FailingFileIO.reset(failingName, 100, 10000);
+
+        env.execute();
+
+        // no failure when checking results
+        FailingFileIO.reset(failingName, 0, 1);
+
+        table = table.copyWithLatestSchema();
+        SchemaManager schemaManager = new SchemaManager(table.fileIO(), 
table.location());
+        TableSchema schema = schemaManager.latest().get();
+
+        Map<Integer, Map<String, String>> actual = new HashMap<>();
+        DataTableScan.DataFilePlan plan = table.newScan().plan();
+        try (RecordReaderIterator<InternalRow> it =
+                new 
RecordReaderIterator<>(table.newRead().createReader(plan))) {
+            while (it.hasNext()) {
+                InternalRow row = it.next();
+                Map<String, String> fields = new HashMap<>();
+                for (int i = 0; i < schema.fieldNames().size(); i++) {
+                    if (!row.isNullAt(i)) {
+                        fields.put(schema.fieldNames().get(i), 
String.valueOf(row.getInt(i)));
+                    }
+                }
+                actual.put(Integer.valueOf(fields.get("k")), fields);
+            }
+        }
+        assertThat(actual).isEqualTo(expected);
+    }
+
+    private FileStoreTable createFileStoreTable(
+            Path tablePath,
+            FileIO fileIO,
+            RowType rowType,
+            List<String> partitions,
+            List<String> primaryKeys)
+            throws Exception {
+        Options conf = new Options();
+        conf.set(CoreOptions.BUCKET, 3);
+        conf.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 * 3));
+        conf.set(CoreOptions.PAGE_SIZE, new MemorySize(4096));
+
+        TableSchema tableSchema =
+                SchemaUtils.forceCommit(
+                        new SchemaManager(fileIO, tablePath),
+                        new Schema(rowType.getFields(), partitions, 
primaryKeys, conf.toMap(), ""));
+        return FileStoreTableFactory.create(fileIO, tablePath, tableSchema);
+    }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/cdc/CdcRecord.java 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
similarity index 52%
copy from paimon-core/src/main/java/org/apache/paimon/cdc/CdcRecord.java
copy to 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
index 3c65dec4e..f65e3002a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/cdc/CdcRecord.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
@@ -16,43 +16,42 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.cdc;
+package org.apache.paimon.flink.sink.cdc;
 
-import org.apache.paimon.types.RowKind;
+import org.apache.paimon.cdc.CdcRecord;
+import org.apache.paimon.schema.SchemaChange;
 
 import java.io.Serializable;
-import java.util.Map;
-import java.util.Objects;
+import java.util.List;
 
-/** A data change message from the CDC source. */
-public class CdcRecord implements Serializable {
+/** Testing CDC change event. */
+public class TestCdcEvent implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
-    private final RowKind kind;
-    private final Map<String, String> fields;
+    private final SchemaChange schemaChange;
+    private final List<CdcRecord> records;
 
-    public CdcRecord(RowKind kind, Map<String, String> fields) {
-        this.kind = kind;
-        this.fields = fields;
+    public TestCdcEvent(SchemaChange schemaChange) {
+        this.schemaChange = schemaChange;
+        this.records = null;
     }
 
-    public RowKind kind() {
-        return kind;
+    public TestCdcEvent(List<CdcRecord> records) {
+        this.schemaChange = null;
+        this.records = records;
     }
 
-    /** Map key is the field's name, and map value is the field's value. */
-    public Map<String, String> fields() {
-        return fields;
+    public SchemaChange schemaChange() {
+        return schemaChange;
     }
 
-    @Override
-    public boolean equals(Object o) {
-        if (!(o instanceof CdcRecord)) {
-            return false;
-        }
+    public List<CdcRecord> records() {
+        return records;
+    }
 
-        CdcRecord that = (CdcRecord) o;
-        return Objects.equals(kind, that.kind) && Objects.equals(fields, 
that.fields);
+    @Override
+    public String toString() {
+        return String.format("{schemChange = %s, records = %s}", schemaChange, 
records);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
new file mode 100644
index 000000000..75d11afa1
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.cdc.CdcRecord;
+import org.apache.paimon.cdc.EventParser;
+import org.apache.paimon.schema.SchemaChange;
+
+import java.util.Collections;
+import java.util.List;
+
+/** Testing {@link EventParser} for {@link TestCdcEvent}. */
+public class TestCdcEventParser implements EventParser<TestCdcEvent> {
+
+    private TestCdcEvent raw;
+
+    @Override
+    public void setRawEvent(TestCdcEvent raw) {
+        this.raw = raw;
+    }
+
+    @Override
+    public boolean isSchemaChange() {
+        return raw.schemaChange() != null;
+    }
+
+    @Override
+    public List<SchemaChange> getSchemaChanges() {
+        return Collections.singletonList(raw.schemaChange());
+    }
+
+    @Override
+    public List<CdcRecord> getRecords() {
+        return raw.records();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcSourceFunction.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcSourceFunction.java
new file mode 100644
index 000000000..caa7e46f8
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcSourceFunction.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.cdc.CdcRecord;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.SerializableFunction;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.stream.Collectors;
+
+/**
+ * Testing {@link RichParallelSourceFunction} to produce {@link TestCdcEvent}. 
{@link TestCdcEvent}s
+ * with the same key will be produced by the same parallelism.
+ */
+public class TestCdcSourceFunction extends 
RichParallelSourceFunction<TestCdcEvent>
+        implements CheckpointedFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    private final LinkedList<TestCdcEvent> events;
+    private final SerializableFunction<CdcRecord, Integer> getKeyHash;
+
+    private volatile boolean isRunning = true;
+    private transient ListState<Integer> remainingEventsCount;
+
+    public TestCdcSourceFunction(
+            TestCdcEvent[] events, SerializableFunction<CdcRecord, Integer> 
getKeyHash) {
+        this.events = 
Arrays.stream(events).collect(Collectors.toCollection(LinkedList::new));
+        this.getKeyHash = getKeyHash;
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws 
Exception {
+        remainingEventsCount =
+                context.getOperatorStateStore()
+                        .getListState(new ListStateDescriptor<>("count", 
Integer.class));
+
+        if (context.isRestored()) {
+            int count = 0;
+            for (int c : remainingEventsCount.get()) {
+                count += c;
+            }
+            while (events.size() > count) {
+                events.poll();
+            }
+        }
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+        remainingEventsCount.clear();
+        remainingEventsCount.add(events.size());
+    }
+
+    @Override
+    public void run(SourceContext<TestCdcEvent> ctx) throws Exception {
+        while (isRunning && !events.isEmpty()) {
+            synchronized (ctx.getCheckpointLock()) {
+                TestCdcEvent event = events.poll();
+                if (event.records() != null) {
+                    for (int i = 0; i + 1 < event.records().size(); i++) {
+                        Preconditions.checkArgument(
+                                getKeyHash
+                                        .apply(event.records().get(i))
+                                        
.equals(getKeyHash.apply(event.records().get(i + 1))),
+                                "Key hashes in the same List<Record> are not 
equal."
+                                        + "This is an invalid test data.");
+                    }
+                    int hash = getKeyHash.apply(event.records().get(0));
+                    int subtaskId = 
getRuntimeContext().getIndexOfThisSubtask();
+                    int totalSubtasks = 
getRuntimeContext().getNumberOfParallelSubtasks();
+                    if (Math.abs(hash) % totalSubtasks != subtaskId) {
+                        continue;
+                    }
+                }
+                ctx.collect(event);
+            }
+        }
+    }
+
+    @Override
+    public void cancel() {
+        isRunning = false;
+    }
+}


Reply via email to