This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new fcefe0475 [flink] Introduce process functions and sinks for CDC 
database syncing (#896)
fcefe0475 is described below

commit fcefe04752da48c612f6288bda3ae006fb4a25e1
Author: tsreaper <[email protected]>
AuthorDate: Thu Apr 13 18:35:01 2023 +0800

    [flink] Introduce process functions and sinks for CDC database syncing 
(#896)
---
 .../cdc/mysql/MySqlDebeziumJsonEventParser.java    |   5 +
 .../action/cdc/mysql/MySqlSyncTableAction.java     |   6 +-
 ...va => CdcMultiTableParsingProcessFunction.java} |  46 ++++-
 .../flink/sink/cdc/CdcParsingProcessFunction.java  |   3 +
 .../apache/paimon/flink/sink/cdc/EventParser.java  |   2 +
 .../sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java  | 123 +++++++++++++
 ...lder.java => FlinkCdcSyncTableSinkBuilder.java} |  15 +-
 .../sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java   | 203 +++++++++++++++++++++
 ...TCase.java => FlinkCdcSyncTableSinkITCase.java} | 102 +----------
 .../apache/paimon/flink/sink/cdc/TestCdcEvent.java |  23 ++-
 .../paimon/flink/sink/cdc/TestCdcEventParser.java  |   5 +
 .../flink/sink/cdc/TestCdcSourceFunction.java      |  24 +--
 .../apache/paimon/flink/sink/cdc/TestTable.java    | 172 +++++++++++++++++
 13 files changed, 592 insertions(+), 137 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index 0704e5f87..ca839f142 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -95,6 +95,11 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
         }
     }
 
+    @Override
+    public String tableName() {
+        return payload.get("source").get("table").asText();
+    }
+
     private void updateFieldTypes(JsonNode schema) {
         mySqlFieldTypes = new HashMap<>();
         fieldClassNames = new HashMap<>();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
index 885b9dd1b..79e10fda4 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
@@ -25,7 +25,7 @@ import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.sink.cdc.EventParser;
-import org.apache.paimon.flink.sink.cdc.FlinkCdcSinkBuilder;
+import org.apache.paimon.flink.sink.cdc.FlinkCdcSyncTableSinkBuilder;
 import org.apache.paimon.flink.sink.cdc.SchemaChangeProcessFunction;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.options.Options;
@@ -167,8 +167,8 @@ public class MySqlSyncTableAction implements Action {
             parserFactory = MySqlDebeziumJsonEventParser::new;
         }
 
-        FlinkCdcSinkBuilder<String> sinkBuilder =
-                new FlinkCdcSinkBuilder<String>()
+        FlinkCdcSyncTableSinkBuilder<String> sinkBuilder =
+                new FlinkCdcSyncTableSinkBuilder<String>()
                         .withInput(
                                 env.fromSource(
                                         source, 
WatermarkStrategy.noWatermarks(), "MySQL Source"))
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java
similarity index 52%
copy from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
copy to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java
index 357c9996d..36e61b443 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java
@@ -26,42 +26,68 @@ import 
org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
 
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * A {@link ProcessFunction} to parse CDC change event to either {@link 
SchemaChange} or {@link
- * CdcRecord} and send them to different downstreams.
+ * CdcRecord} and send them to different side outputs according to table name.
+ *
+ * <p>This {@link ProcessFunction} can handle records for different tables at 
the same time.
  *
  * @param <T> CDC change event type
  */
-public class CdcParsingProcessFunction<T> extends ProcessFunction<T, 
CdcRecord> {
-
-    public static final OutputTag<SchemaChange> SCHEMA_CHANGE_OUTPUT_TAG =
-            new OutputTag<>("schema-change", 
TypeInformation.of(SchemaChange.class));
+public class CdcMultiTableParsingProcessFunction<T> extends ProcessFunction<T, 
Void> {
 
     private final EventParser.Factory<T> parserFactory;
 
     private transient EventParser<T> parser;
+    private transient Map<String, OutputTag<SchemaChange>> 
schemaChangeOutputTags;
+    private transient Map<String, OutputTag<CdcRecord>> recordOutputTags;
 
-    public CdcParsingProcessFunction(EventParser.Factory<T> parserFactory) {
+    public CdcMultiTableParsingProcessFunction(EventParser.Factory<T> 
parserFactory) {
         this.parserFactory = parserFactory;
     }
 
     @Override
     public void open(Configuration parameters) throws Exception {
         parser = parserFactory.create();
+        schemaChangeOutputTags = new HashMap<>();
+        recordOutputTags = new HashMap<>();
     }
 
     @Override
-    public void processElement(T raw, Context context, Collector<CdcRecord> 
collector)
-            throws Exception {
+    public void processElement(T raw, Context context, Collector<Void> 
collector) throws Exception {
         parser.setRawEvent(raw);
+        String tableName = parser.tableName();
+
         if (parser.isSchemaChange()) {
             for (SchemaChange schemaChange : parser.getSchemaChanges()) {
-                context.output(SCHEMA_CHANGE_OUTPUT_TAG, schemaChange);
+                context.output(getSchemaChangeOutputTag(tableName), 
schemaChange);
             }
         } else {
             for (CdcRecord record : parser.getRecords()) {
-                collector.collect(record);
+                context.output(getRecordOutputTag(tableName), record);
             }
         }
     }
+
+    private OutputTag<SchemaChange> getSchemaChangeOutputTag(String tableName) 
{
+        return schemaChangeOutputTags.computeIfAbsent(
+                tableName, 
CdcMultiTableParsingProcessFunction::createSchemaChangeOutputTag);
+    }
+
+    public static OutputTag<SchemaChange> createSchemaChangeOutputTag(String 
tableName) {
+        return new OutputTag<>(
+                "schema-change-" + tableName, 
TypeInformation.of(SchemaChange.class));
+    }
+
+    private OutputTag<CdcRecord> getRecordOutputTag(String tableName) {
+        return recordOutputTags.computeIfAbsent(
+                tableName, 
CdcMultiTableParsingProcessFunction::createRecordOutputTag);
+    }
+
+    public static OutputTag<CdcRecord> createRecordOutputTag(String tableName) 
{
+        return new OutputTag<>("record-" + tableName, 
TypeInformation.of(CdcRecord.class));
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
index 357c9996d..ba3f97cad 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
@@ -30,6 +30,9 @@ import org.apache.flink.util.OutputTag;
  * A {@link ProcessFunction} to parse CDC change event to either {@link 
SchemaChange} or {@link
  * CdcRecord} and send them to different downstreams.
  *
+ * <p>This {@link ProcessFunction} can only handle records for a single 
constant table. To handle
+ * records for different tables, see {@link 
CdcMultiTableParsingProcessFunction}.
+ *
  * @param <T> CDC change event type
  */
 public class CdcParsingProcessFunction<T> extends ProcessFunction<T, 
CdcRecord> {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
index 9e575a338..e4f192d1b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
@@ -32,6 +32,8 @@ public interface EventParser<T> {
 
     void setRawEvent(T rawEvent);
 
+    String tableName();
+
     boolean isSchemaChange();
 
     List<SchemaChange> getSchemaChanges();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
new file mode 100644
index 000000000..e48f929bc
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.flink.sink.BucketingStreamPartitioner;
+import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils;
+import org.apache.paimon.operation.Lock;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Builder for {@link FlinkCdcSink} when syncing the whole database into one 
Paimon database. Each
+ * database table will be written into a separate Paimon table.
+ *
+ * <p>This builder will create a separate sink for each Paimon sink table. 
Thus this implementation
+ * is not very efficient in resource saving.
+ *
+ * @param <T> CDC change event type
+ */
+public class FlinkCdcSyncDatabaseSinkBuilder<T> {
+
+    private DataStream<T> input = null;
+    private EventParser.Factory<T> parserFactory = null;
+    private List<FileStoreTable> tables = new ArrayList<>();
+    private Lock.Factory lockFactory = Lock.emptyFactory();
+
+    @Nullable private Integer parallelism;
+
+    public FlinkCdcSyncDatabaseSinkBuilder<T> withInput(DataStream<T> input) {
+        this.input = input;
+        return this;
+    }
+
+    public FlinkCdcSyncDatabaseSinkBuilder<T> withParserFactory(
+            EventParser.Factory<T> parserFactory) {
+        this.parserFactory = parserFactory;
+        return this;
+    }
+
+    public FlinkCdcSyncDatabaseSinkBuilder<T> withTables(List<FileStoreTable> 
tables) {
+        this.tables = tables;
+        return this;
+    }
+
+    public FlinkCdcSyncDatabaseSinkBuilder<T> withLockFactory(Lock.Factory 
lockFactory) {
+        this.lockFactory = lockFactory;
+        return this;
+    }
+
+    public FlinkCdcSyncDatabaseSinkBuilder<T> withParallelism(@Nullable 
Integer parallelism) {
+        this.parallelism = parallelism;
+        return this;
+    }
+
+    public void build() {
+        Preconditions.checkNotNull(input);
+        Preconditions.checkNotNull(parserFactory);
+
+        StreamExecutionEnvironment env = input.getExecutionEnvironment();
+
+        SingleOutputStreamOperator<Void> parsed =
+                input.forward()
+                        .process(new 
CdcMultiTableParsingProcessFunction<>(parserFactory))
+                        .setParallelism(input.getParallelism());
+
+        for (FileStoreTable table : tables) {
+            DataStream<Void> schemaChangeProcessFunction =
+                    SingleOutputStreamOperatorUtils.getSideOutput(
+                                    parsed,
+                                    
CdcMultiTableParsingProcessFunction.createSchemaChangeOutputTag(
+                                            table.name()))
+                            .process(
+                                    new SchemaChangeProcessFunction(
+                                            new SchemaManager(table.fileIO(), 
table.location())));
+            schemaChangeProcessFunction.getTransformation().setParallelism(1);
+            
schemaChangeProcessFunction.getTransformation().setMaxParallelism(1);
+
+            BucketingStreamPartitioner<CdcRecord> partitioner =
+                    new BucketingStreamPartitioner<>(new 
CdcRecordChannelComputer(table.schema()));
+            PartitionTransformation<CdcRecord> partitioned =
+                    new PartitionTransformation<>(
+                            SingleOutputStreamOperatorUtils.getSideOutput(
+                                            parsed,
+                                            CdcMultiTableParsingProcessFunction
+                                                    
.createRecordOutputTag(table.name()))
+                                    .getTransformation(),
+                            partitioner);
+            if (parallelism != null) {
+                partitioned.setParallelism(parallelism);
+            }
+
+            FlinkCdcSink sink = new FlinkCdcSink(table, lockFactory);
+            sink.sinkFrom(new DataStream<>(env, partitioned));
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkBuilder.java
similarity index 84%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkBuilder.java
index 6663acc54..7c770b045 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkBuilder.java
@@ -34,11 +34,11 @@ import 
org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import javax.annotation.Nullable;
 
 /**
- * Builder for {@link FlinkCdcSink}.
+ * Builder for {@link FlinkCdcSink} when syncing multiple database tables into 
one Paimon table.
  *
  * @param <T> CDC change event type
  */
-public class FlinkCdcSinkBuilder<T> {
+public class FlinkCdcSyncTableSinkBuilder<T> {
 
     private DataStream<T> input = null;
     private EventParser.Factory<T> parserFactory = null;
@@ -47,27 +47,27 @@ public class FlinkCdcSinkBuilder<T> {
 
     @Nullable private Integer parallelism;
 
-    public FlinkCdcSinkBuilder<T> withInput(DataStream<T> input) {
+    public FlinkCdcSyncTableSinkBuilder<T> withInput(DataStream<T> input) {
         this.input = input;
         return this;
     }
 
-    public FlinkCdcSinkBuilder<T> withParserFactory(EventParser.Factory<T> 
parserFactory) {
+    public FlinkCdcSyncTableSinkBuilder<T> 
withParserFactory(EventParser.Factory<T> parserFactory) {
         this.parserFactory = parserFactory;
         return this;
     }
 
-    public FlinkCdcSinkBuilder<T> withTable(FileStoreTable table) {
+    public FlinkCdcSyncTableSinkBuilder<T> withTable(FileStoreTable table) {
         this.table = table;
         return this;
     }
 
-    public FlinkCdcSinkBuilder<T> withLockFactory(Lock.Factory lockFactory) {
+    public FlinkCdcSyncTableSinkBuilder<T> withLockFactory(Lock.Factory 
lockFactory) {
         this.lockFactory = lockFactory;
         return this;
     }
 
-    public FlinkCdcSinkBuilder<T> withParallelism(@Nullable Integer 
parallelism) {
+    public FlinkCdcSyncTableSinkBuilder<T> withParallelism(@Nullable Integer 
parallelism) {
         this.parallelism = parallelism;
         return this;
     }
@@ -89,6 +89,7 @@ public class FlinkCdcSinkBuilder<T> {
                                 new SchemaChangeProcessFunction(
                                         new SchemaManager(table.fileIO(), 
table.location())));
         schemaChangeProcessFunction.getTransformation().setParallelism(1);
+        schemaChangeProcessFunction.getTransformation().setMaxParallelism(1);
 
         BucketingStreamPartitioner<CdcRecord> partitioner =
                 new BucketingStreamPartitioner<>(new 
CdcRecordChannelComputer(table.schema()));
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
new file mode 100644
index 000000000..5eab8d1e5
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.CatalogUtils;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.util.AbstractTestBase;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReaderIterator;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FailingFileIO;
+import org.apache.paimon.utils.TraceableFileIO;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+/** IT cases for {@link FlinkCdcSyncDatabaseSinkBuilder}. */
+public class FlinkCdcSyncDatabaseSinkITCase extends AbstractTestBase {
+
+    private static final String DATABASE_NAME = "test";
+    private static final String TABLE_NAME = "test_tbl";
+
+    @TempDir java.nio.file.Path tempDir;
+
+    @Test
+    @Timeout(120)
+    public void testRandomCdcEvents() throws Exception {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+
+        int numTables = random.nextInt(3) + 1;
+        boolean enableFailure = random.nextBoolean();
+
+        int maxEvents = 1500;
+        int maxSchemaChanges = 10;
+        int maxPartitions = 3;
+        int maxKeys = 150;
+        int maxBuckets = 5;
+
+        String failingName = UUID.randomUUID().toString();
+
+        List<TestTable> testTables = new ArrayList<>();
+        List<FileStoreTable> fileStoreTables = new ArrayList<>();
+        for (int i = 0; i < numTables; i++) {
+            String tableName = TABLE_NAME + i;
+            TestTable testTable =
+                    new TestTable(
+                            tableName,
+                            random.nextInt(maxEvents) + 1,
+                            random.nextInt(maxSchemaChanges) + 1,
+                            random.nextInt(maxPartitions) + 1,
+                            random.nextInt(maxKeys) + 1);
+            testTables.add(testTable);
+
+            Path tablePath;
+            FileIO fileIO;
+            if (enableFailure) {
+                tablePath =
+                        new Path(
+                                FailingFileIO.getFailingPath(
+                                        failingName,
+                                        CatalogUtils.stringifyPath(
+                                                tempDir.toString(), 
DATABASE_NAME, tableName)));
+                fileIO = new FailingFileIO();
+            } else {
+                tablePath =
+                        new Path(
+                                TraceableFileIO.SCHEME
+                                        + "://"
+                                        + CatalogUtils.stringifyPath(
+                                                tempDir.toString(), 
DATABASE_NAME, tableName));
+                fileIO = LocalFileIO.create();
+            }
+
+            // no failure when creating table
+            FailingFileIO.reset(failingName, 0, 1);
+
+            FileStoreTable fileStoreTable =
+                    createFileStoreTable(
+                            tablePath,
+                            fileIO,
+                            testTable.initialRowType(),
+                            Collections.singletonList("pt"),
+                            Arrays.asList("pt", "k"),
+                            random.nextInt(maxBuckets) + 1);
+            fileStoreTables.add(fileStoreTable);
+        }
+
+        List<TestCdcEvent> events = mergeTestTableEvents(testTables);
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getCheckpointConfig().setCheckpointInterval(100);
+        if (!enableFailure) {
+            env.setRestartStrategy(RestartStrategies.noRestart());
+        }
+
+        TestCdcSourceFunction sourceFunction = new 
TestCdcSourceFunction(events);
+        DataStreamSource<TestCdcEvent> source = env.addSource(sourceFunction);
+        source.setParallelism(2);
+        new FlinkCdcSyncDatabaseSinkBuilder<TestCdcEvent>()
+                .withInput(source)
+                .withParserFactory(TestCdcEventParser::new)
+                .withTables(fileStoreTables)
+                // because we have at most 3 tables and 8 slots in 
AbstractTestBase
+                // each table can only get 2 slots
+                .withParallelism(2)
+                .build();
+
+        // enable failure when running jobs if needed
+        FailingFileIO.reset(failingName, 10, 10000);
+
+        env.execute();
+
+        // no failure when checking results
+        FailingFileIO.reset(failingName, 0, 1);
+
+        for (int i = 0; i < numTables; i++) {
+            FileStoreTable table = 
fileStoreTables.get(i).copyWithLatestSchema();
+            SchemaManager schemaManager = new SchemaManager(table.fileIO(), 
table.location());
+            TableSchema schema = schemaManager.latest().get();
+
+            TableScan.Plan plan = table.newScan().plan();
+            try (RecordReaderIterator<InternalRow> it =
+                    new 
RecordReaderIterator<>(table.newRead().createReader(plan))) {
+                testTables.get(i).assertResult(schema, it);
+            }
+        }
+    }
+
+    private FileStoreTable createFileStoreTable(
+            Path tablePath,
+            FileIO fileIO,
+            RowType rowType,
+            List<String> partitions,
+            List<String> primaryKeys,
+            int numBucket)
+            throws Exception {
+        Options conf = new Options();
+        conf.set(CoreOptions.BUCKET, numBucket);
+        conf.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 * 3));
+        conf.set(CoreOptions.PAGE_SIZE, new MemorySize(4096));
+
+        TableSchema tableSchema =
+                SchemaUtils.forceCommit(
+                        new SchemaManager(fileIO, tablePath),
+                        new Schema(rowType.getFields(), partitions, 
primaryKeys, conf.toMap(), ""));
+        return FileStoreTableFactory.create(fileIO, tablePath, tableSchema);
+    }
+
+    private List<TestCdcEvent> mergeTestTableEvents(List<TestTable> 
testTables) {
+        List<Integer> toShuffle = new ArrayList<>();
+        for (int i = 0; i < testTables.size(); i++) {
+            for (int j = 0; j < testTables.get(i).events().size(); j++) {
+                toShuffle.add(i);
+            }
+        }
+        Collections.shuffle(toShuffle);
+
+        List<TestCdcEvent> events = new ArrayList<>();
+        for (int idx : toShuffle) {
+            events.add(testTables.get(idx).events().poll());
+        }
+        return events;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
similarity index 56%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
rename to 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
index 7be475e3c..45d6b34e5 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
@@ -28,16 +28,12 @@ import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.reader.RecordReaderIterator;
 import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.SchemaUtils;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.source.TableScan;
-import org.apache.paimon.types.DataType;
-import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FailingFileIO;
 import org.apache.paimon.utils.TraceableFileIO;
@@ -49,21 +45,14 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.io.TempDir;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** IT cases for {@link FlinkCdcSink}. */
-public class FlinkCdcSinkITCase extends AbstractTestBase {
+/** IT cases for {@link FlinkCdcSyncTableSinkBuilder}. */
+public class FlinkCdcSyncTableSinkITCase extends AbstractTestBase {
 
     @TempDir java.nio.file.Path tempDir;
 
@@ -79,63 +68,8 @@ public class FlinkCdcSinkITCase extends AbstractTestBase {
         int numBucket = random.nextInt(5) + 1;
         boolean enableFailure = random.nextBoolean();
 
-        TestCdcEvent[] events = new TestCdcEvent[numEvents];
-
-        Set<Integer> schemaChangePositions = new HashSet<>();
-        for (int i = 0; i < numSchemaChanges; i++) {
-            int pos;
-            do {
-                pos = random.nextInt(numEvents);
-            } while (schemaChangePositions.contains(pos));
-            schemaChangePositions.add(pos);
-        }
-
-        Map<Integer, Map<String, String>> expected = new HashMap<>();
-        List<String> fieldNames = new ArrayList<>();
-        List<Boolean> isBigInt = new ArrayList<>();
-        fieldNames.add("v0");
-        isBigInt.add(false);
-        int suffixId = 0;
-        for (int i = 0; i < numEvents; i++) {
-            if (schemaChangePositions.contains(i)) {
-                if (random.nextBoolean()) {
-                    int idx = random.nextInt(fieldNames.size());
-                    isBigInt.set(idx, true);
-                    events[i] =
-                            new TestCdcEvent(
-                                    SchemaChange.updateColumnType(
-                                            fieldNames.get(idx), 
DataTypes.BIGINT()));
-                } else {
-                    suffixId++;
-                    String newName = "v" + suffixId;
-                    fieldNames.add(newName);
-                    isBigInt.add(false);
-                    events[i] = new 
TestCdcEvent(SchemaChange.addColumn(newName, DataTypes.INT()));
-                }
-            } else {
-                Map<String, String> fields = new HashMap<>();
-                int key = random.nextInt(numKeys);
-                fields.put("k", String.valueOf(key));
-                int pt = key % numPartitions;
-                fields.put("pt", String.valueOf(pt));
-                for (int j = 0; j < fieldNames.size(); j++) {
-                    String fieldName = fieldNames.get(j);
-                    if (isBigInt.get(j)) {
-                        fields.put(fieldName, 
String.valueOf(random.nextLong()));
-                    } else {
-                        fields.put(fieldName, 
String.valueOf(random.nextInt()));
-                    }
-                }
-
-                List<CdcRecord> records = new ArrayList<>();
-                if (expected.containsKey(key)) {
-                    records.add(new CdcRecord(RowKind.DELETE, 
expected.get(key)));
-                }
-                records.add(new CdcRecord(RowKind.INSERT, fields));
-                events[i] = new TestCdcEvent(records);
-                expected.put(key, fields);
-            }
-        }
+        TestTable testTable =
+                new TestTable("test_tbl", numEvents, numSchemaChanges, 
numPartitions, numKeys);
 
         Path tablePath;
         FileIO fileIO;
@@ -155,9 +89,7 @@ public class FlinkCdcSinkITCase extends AbstractTestBase {
                 createFileStoreTable(
                         tablePath,
                         fileIO,
-                        RowType.of(
-                                new DataType[] {DataTypes.INT(), 
DataTypes.INT(), DataTypes.INT()},
-                                new String[] {"pt", "k", "v0"}),
+                        testTable.initialRowType(),
                         Collections.singletonList("pt"),
                         Arrays.asList("pt", "k"),
                         numBucket);
@@ -168,12 +100,10 @@ public class FlinkCdcSinkITCase extends AbstractTestBase {
             env.setRestartStrategy(RestartStrategies.noRestart());
         }
 
-        TestCdcSourceFunction sourceFunction =
-                new TestCdcSourceFunction(
-                        events, record -> 
Integer.valueOf(record.fields().get("k")));
+        TestCdcSourceFunction sourceFunction = new 
TestCdcSourceFunction(testTable.events());
         DataStreamSource<TestCdcEvent> source = env.addSource(sourceFunction);
         source.setParallelism(2);
-        new FlinkCdcSinkBuilder<TestCdcEvent>()
+        new FlinkCdcSyncTableSinkBuilder<TestCdcEvent>()
                 .withInput(source)
                 .withParserFactory(TestCdcEventParser::new)
                 .withTable(table)
@@ -192,27 +122,11 @@ public class FlinkCdcSinkITCase extends AbstractTestBase {
         SchemaManager schemaManager = new SchemaManager(table.fileIO(), 
table.location());
         TableSchema schema = schemaManager.latest().get();
 
-        Map<Integer, Map<String, String>> actual = new HashMap<>();
         TableScan.Plan plan = table.newScan().plan();
         try (RecordReaderIterator<InternalRow> it =
                 new 
RecordReaderIterator<>(table.newRead().createReader(plan))) {
-            while (it.hasNext()) {
-                InternalRow row = it.next();
-                Map<String, String> fields = new HashMap<>();
-                for (int i = 0; i < schema.fieldNames().size(); i++) {
-                    if (!row.isNullAt(i)) {
-                        fields.put(
-                                schema.fieldNames().get(i),
-                                String.valueOf(
-                                        
schema.fields().get(i).type().equals(DataTypes.BIGINT())
-                                                ? row.getLong(i)
-                                                : row.getInt(i)));
-                    }
-                }
-                actual.put(Integer.valueOf(fields.get("k")), fields);
-            }
+            testTable.assertResult(schema, it);
         }
-        assertThat(actual).isEqualTo(expected);
     }
 
     private FileStoreTable createFileStoreTable(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
index aedcc0362..f7a933951 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
@@ -28,17 +28,27 @@ public class TestCdcEvent implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
+    private final String tableName;
     private final SchemaChange schemaChange;
     private final List<CdcRecord> records;
+    private final int keyHash;
 
-    public TestCdcEvent(SchemaChange schemaChange) {
+    public TestCdcEvent(String tableName, SchemaChange schemaChange) {
+        this.tableName = tableName;
         this.schemaChange = schemaChange;
         this.records = null;
+        this.keyHash = 0;
     }
 
-    public TestCdcEvent(List<CdcRecord> records) {
+    public TestCdcEvent(String tableName, List<CdcRecord> records, int 
keyHash) {
+        this.tableName = tableName;
         this.schemaChange = null;
         this.records = records;
+        this.keyHash = keyHash;
+    }
+
+    public String tableName() {
+        return tableName;
     }
 
     public SchemaChange schemaChange() {
@@ -49,8 +59,15 @@ public class TestCdcEvent implements Serializable {
         return records;
     }
 
+    @Override
+    public int hashCode() {
+        return keyHash;
+    }
+
     @Override
     public String toString() {
-        return String.format("{schemChange = %s, records = %s}", schemaChange, 
records);
+        return String.format(
+                "{tableName = %s, schemChange = %s, records = %s}",
+                tableName, schemaChange, records);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
index bb544d24c..9772d3292 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
@@ -33,6 +33,11 @@ public class TestCdcEventParser implements 
EventParser<TestCdcEvent> {
         this.raw = raw;
     }
 
+    @Override
+    public String tableName() {
+        return raw.tableName();
+    }
+
     @Override
     public boolean isSchemaChange() {
         return raw.schemaChange() != null;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcSourceFunction.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcSourceFunction.java
index 889c8216d..507fa033d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcSourceFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcSourceFunction.java
@@ -18,9 +18,6 @@
 
 package org.apache.paimon.flink.sink.cdc;
 
-import org.apache.paimon.utils.Preconditions;
-import org.apache.paimon.utils.SerializableFunction;
-
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -28,11 +25,10 @@ import 
org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 
-import java.util.Arrays;
+import java.util.Collection;
 import java.util.LinkedList;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
 
 /**
  * Testing {@link RichParallelSourceFunction} to produce {@link TestCdcEvent}. 
{@link TestCdcEvent}s
@@ -44,17 +40,14 @@ public class TestCdcSourceFunction extends 
RichParallelSourceFunction<TestCdcEve
     private static final long serialVersionUID = 1L;
 
     private final LinkedList<TestCdcEvent> events;
-    private final SerializableFunction<CdcRecord, Integer> getKeyHash;
 
     private volatile boolean isRunning = true;
     private transient int numRecordsPerCheckpoint;
     private transient AtomicInteger recordsThisCheckpoint;
     private transient ListState<Integer> remainingEventsCount;
 
-    public TestCdcSourceFunction(
-            TestCdcEvent[] events, SerializableFunction<CdcRecord, Integer> 
getKeyHash) {
-        this.events = 
Arrays.stream(events).collect(Collectors.toCollection(LinkedList::new));
-        this.getKeyHash = getKeyHash;
+    public TestCdcSourceFunction(Collection<TestCdcEvent> events) {
+        this.events = new LinkedList<>(events);
     }
 
     @Override
@@ -95,18 +88,9 @@ public class TestCdcSourceFunction extends 
RichParallelSourceFunction<TestCdcEve
             synchronized (ctx.getCheckpointLock()) {
                 TestCdcEvent event = events.poll();
                 if (event.records() != null) {
-                    for (int i = 0; i + 1 < event.records().size(); i++) {
-                        Preconditions.checkArgument(
-                                getKeyHash
-                                        .apply(event.records().get(i))
-                                        
.equals(getKeyHash.apply(event.records().get(i + 1))),
-                                "Key hashes in the same List<Record> are not 
equal."
-                                        + "This is an invalid test data.");
-                    }
-                    int hash = getKeyHash.apply(event.records().get(0));
                     int subtaskId = 
getRuntimeContext().getIndexOfThisSubtask();
                     int totalSubtasks = 
getRuntimeContext().getNumberOfParallelSubtasks();
-                    if (Math.abs(hash) % totalSubtasks != subtaskId) {
+                    if (Math.abs(event.hashCode()) % totalSubtasks != 
subtaskId) {
                         continue;
                     }
                 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
new file mode 100644
index 000000000..f5be3dca3
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Generate test data for {@link FlinkCdcSyncTableSinkITCase} and {@link
+ * FlinkCdcSyncDatabaseSinkITCase}.
+ */
+public class TestTable {
+
+    private final RowType initialRowType;
+
+    private final Queue<TestCdcEvent> events;
+    private final Map<Integer, Map<String, String>> expected;
+
+    public TestTable(
+            String tableName, int numEvents, int numSchemaChanges, int 
numPartitions, int numKeys) {
+        List<String> fieldNames = new ArrayList<>();
+        List<Boolean> isBigInt = new ArrayList<>();
+
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        int numFields = random.nextInt(5) + 1;
+
+        DataType[] initialFieldTypes = new DataType[2 + numFields];
+        initialFieldTypes[0] = DataTypes.INT();
+        initialFieldTypes[1] = DataTypes.INT();
+
+        String[] initialFieldNames = new String[2 + numFields];
+        initialFieldNames[0] = "pt";
+        initialFieldNames[1] = "k";
+
+        for (int i = 0; i < numFields; i++) {
+            fieldNames.add("v" + i);
+            initialFieldNames[2 + i] = "v" + i;
+
+            if (random.nextBoolean()) {
+                isBigInt.add(true);
+                initialFieldTypes[2 + i] = DataTypes.BIGINT();
+            } else {
+                isBigInt.add(false);
+                initialFieldTypes[2 + i] = DataTypes.INT();
+            }
+        }
+
+        initialRowType = RowType.of(initialFieldTypes, initialFieldNames);
+
+        Set<Integer> schemaChangePositions = new HashSet<>();
+
+        // if numSchemaChanges is larger than numEvents, the following loop 
will never end
+        // we can, of course, use fisher's algorithm or such, but let's make 
things simple for tests
+        numSchemaChanges = Math.min(numSchemaChanges, numEvents / 2);
+        for (int i = 0; i < numSchemaChanges; i++) {
+            int pos;
+            do {
+                pos = random.nextInt(numEvents);
+            } while (schemaChangePositions.contains(pos));
+            schemaChangePositions.add(pos);
+        }
+
+        events = new LinkedList<>();
+        expected = new HashMap<>();
+        for (int i = 0; i < numEvents; i++) {
+            if (schemaChangePositions.contains(i)) {
+                if (random.nextBoolean()) {
+                    int idx = random.nextInt(fieldNames.size());
+                    isBigInt.set(idx, true);
+                    events.add(
+                            new TestCdcEvent(
+                                    tableName,
+                                    SchemaChange.updateColumnType(
+                                            fieldNames.get(idx), 
DataTypes.BIGINT())));
+                } else {
+                    String newName = "v" + fieldNames.size();
+                    fieldNames.add(newName);
+                    isBigInt.add(false);
+                    events.add(
+                            new TestCdcEvent(
+                                    tableName, SchemaChange.addColumn(newName, 
DataTypes.INT())));
+                }
+            } else {
+                Map<String, String> fields = new HashMap<>();
+                int key = random.nextInt(numKeys);
+                fields.put("k", String.valueOf(key));
+                int pt = key % numPartitions;
+                fields.put("pt", String.valueOf(pt));
+
+                for (int j = 0; j < fieldNames.size(); j++) {
+                    String fieldName = fieldNames.get(j);
+                    if (isBigInt.get(j)) {
+                        fields.put(fieldName, 
String.valueOf(random.nextLong()));
+                    } else {
+                        fields.put(fieldName, 
String.valueOf(random.nextInt()));
+                    }
+                }
+
+                List<CdcRecord> records = new ArrayList<>();
+                if (expected.containsKey(key)) {
+                    records.add(new CdcRecord(RowKind.DELETE, 
expected.get(key)));
+                }
+                records.add(new CdcRecord(RowKind.INSERT, fields));
+                events.add(new TestCdcEvent(tableName, records, 
Objects.hash(tableName, key)));
+                expected.put(key, fields);
+            }
+        }
+    }
+
+    public RowType initialRowType() {
+        return initialRowType;
+    }
+
+    public Queue<TestCdcEvent> events() {
+        return events;
+    }
+
+    public void assertResult(TableSchema schema, Iterator<InternalRow> it) {
+        Map<Integer, Map<String, String>> actual = new HashMap<>();
+        while (it.hasNext()) {
+            InternalRow row = it.next();
+            Map<String, String> fields = new HashMap<>();
+            for (int i = 0; i < schema.fieldNames().size(); i++) {
+                if (!row.isNullAt(i)) {
+                    fields.put(
+                            schema.fieldNames().get(i),
+                            String.valueOf(
+                                    
schema.fields().get(i).type().equals(DataTypes.BIGINT())
+                                            ? row.getLong(i)
+                                            : row.getInt(i)));
+                }
+            }
+            actual.put(Integer.valueOf(fields.get("k")), fields);
+        }
+        assertThat(actual).isEqualTo(expected);
+    }
+}


Reply via email to