This is an automated email from the ASF dual-hosted git repository.

ruanhang1993 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 09f36a4a0 [FLINK-35143][pipeline-connector][mysql] Expose newly added 
tables capture in mysql pipeline connector. (#3411)
09f36a4a0 is described below

commit 09f36a4a05797b7e319653db3b90658cd7c31659
Author: North Lin <[email protected]>
AuthorDate: Fri Aug 9 13:30:46 2024 +0800

    [FLINK-35143][pipeline-connector][mysql] Expose newly added tables capture 
in mysql pipeline connector. (#3411)
    
    
    Co-authored-by: Muhammet Orazov <[email protected]>
    Co-authored-by: north.lin <[email protected]>
---
 .../docs/connectors/pipeline-connectors/mysql.md   |   7 +
 .../docs/connectors/pipeline-connectors/mysql.md   |   7 +
 .../mysql/factory/MySqlDataSourceFactory.java      |   7 +-
 .../mysql/source/MySqlDataSourceOptions.java       |   8 +
 .../source/MysqlPipelineNewlyAddedTableITCase.java | 582 +++++++++++++++++++++
 .../src/test/resources/ddl/customer.sql            | 328 ++++++++++++
 6 files changed, 937 insertions(+), 2 deletions(-)

diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md 
b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
index 40c774ffc..636ed975f 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
@@ -268,6 +268,13 @@ pipeline:
       <td>是否在快照结束后关闭空闲的 Reader。 此特性需要 flink 版本大于等于 1.14 并且 
'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 需要设置为 true。<br>
           若 flink 版本大于等于 
1.15,'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 默认值变更为 
true,可以不用显式配置 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 
= true。</td>
     </tr>
+    <tr>
+      <td>scan.newly-added-table.enabled</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>是否启用动态加表特性,默认关闭。 此配置项只有作业从savepoint/checkpoint启动时才生效。</td>
+    </tr>
     </tbody>
 </table>
 </div>
diff --git a/docs/content/docs/connectors/pipeline-connectors/mysql.md 
b/docs/content/docs/connectors/pipeline-connectors/mysql.md
index 41eccf939..879701614 100644
--- a/docs/content/docs/connectors/pipeline-connectors/mysql.md
+++ b/docs/content/docs/connectors/pipeline-connectors/mysql.md
@@ -275,6 +275,13 @@ pipeline:
           so it does not need to be explicitly configured 
'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true'
       </td>
     </tr>
+    <tr>
+      <td>scan.newly-added-table.enabled</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>Whether to enable scan the newly added tables feature or not, by 
default is false. This option is only useful when we start the job from a 
savepoint/checkpoint.</td>
+    </tr>
     </tbody>
 </table>
 </div>
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
index b1d3e5966..e8c39ce3d 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
@@ -61,6 +61,7 @@ import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOption
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
+import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_MODE;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE;
@@ -123,6 +124,7 @@ public class MySqlDataSourceFactory implements 
DataSourceFactory {
         Duration connectTimeout = config.get(CONNECT_TIMEOUT);
         int connectMaxRetries = config.get(CONNECT_MAX_RETRIES);
         int connectionPoolSize = config.get(CONNECTION_POOL_SIZE);
+        boolean scanNewlyAddedTableEnabled = 
config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
 
         validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 
1);
         validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
@@ -158,7 +160,8 @@ public class MySqlDataSourceFactory implements 
DataSourceFactory {
                         .closeIdleReaders(closeIdleReaders)
                         .includeSchemaChanges(includeSchemaChanges)
                         .debeziumProperties(getDebeziumProperties(configMap))
-                        .jdbcProperties(getJdbcProperties(configMap));
+                        .jdbcProperties(getJdbcProperties(configMap))
+                        
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled);
 
         Selectors selectors = new 
Selectors.SelectorsBuilder().includeTables(tables).build();
         List<String> capturedTables = 
getTableList(configFactory.createConfig(0), selectors);
@@ -216,7 +219,7 @@ public class MySqlDataSourceFactory implements 
DataSourceFactory {
         options.add(CONNECTION_POOL_SIZE);
         options.add(HEARTBEAT_INTERVAL);
         options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
-
+        options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED);
         options.add(CHUNK_META_GROUP_SIZE);
         options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
         options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
index e852eb3d7..f6f1a671a 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
@@ -224,6 +224,14 @@ public class MySqlDataSourceOptions {
                                     + "FLIP-147: Support Checkpoints After 
Tasks Finished. The flink version is required to be "
                                     + "greater than or equal to 1.14 when 
enabling this feature.");
 
+    @Experimental
+    public static final ConfigOption<Boolean> SCAN_NEWLY_ADDED_TABLE_ENABLED =
+            ConfigOptions.key("scan.newly-added-table.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to scan the newly added tables or not, by 
default is false. This option is only useful when we start the job from a 
savepoint/checkpoint.");
+
     @Experimental
     public static final ConfigOption<Boolean> SCHEMA_CHANGE_ENABLED =
             ConfigOptions.key("schema-change.enabled")
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java
new file mode 100644
index 000000000..7187e6447
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java
@@ -0,0 +1,582 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.source;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.factories.Factory;
+import org.apache.flink.cdc.common.factories.FactoryHelper;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.source.FlinkSourceProvider;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
+import org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.operators.collect.AbstractCollectResultBuffer;
+import 
org.apache.flink.streaming.api.operators.collect.CheckpointedCollectResultBuffer;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import 
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.util.ExceptionUtils;
+
+import io.debezium.connector.mysql.MySqlConnection;
+import io.debezium.jdbc.JdbcConnection;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.sql.SQLException;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME;
+import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
+import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
+import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
+import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_ID;
+import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_TIME_ZONE;
+import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
+import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME;
+import static 
org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD;
+import static 
org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_USER;
+import static 
org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.fetchResults;
+import static 
org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.getServerId;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT tests to cover various newly added tables during capture process in 
pipeline mode. */
+public class MysqlPipelineNewlyAddedTableITCase extends MySqlSourceTestBase {
+    private final UniqueDatabase customDatabase =
+            new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", 
"mysqlpw");
+
+    private final ScheduledExecutorService mockBinlogExecutor = 
Executors.newScheduledThreadPool(1);
+
+    @Before
+    public void before() throws SQLException {
+        TestValuesTableFactory.clearAllData();
+        customDatabase.createAndInitialize();
+
+        try (MySqlConnection connection = getConnection()) {
+            connection.setAutoCommit(false);
+            // prepare initial data for given table
+            String tableId = customDatabase.getDatabaseName() + 
".produce_binlog_table";
+            connection.execute(
+                    format("CREATE TABLE %s ( id BIGINT PRIMARY KEY, cnt 
BIGINT);", tableId));
+            connection.execute(
+                    format("INSERT INTO  %s VALUES (0, 100), (1, 101), (2, 
102);", tableId));
+            connection.commit();
+
+            // mock continuous binlog during the newly added table capturing 
process
+            mockBinlogExecutor.schedule(
+                    () -> {
+                        try {
+                            connection.execute(
+                                    format("UPDATE  %s SET  cnt = cnt +1 WHERE 
id < 2;", tableId));
+                            connection.commit();
+                        } catch (SQLException e) {
+                            e.printStackTrace();
+                        }
+                    },
+                    500,
+                    TimeUnit.MICROSECONDS);
+        }
+    }
+
+    @After
+    public void after() {
+        mockBinlogExecutor.shutdown();
+    }
+
+    private MySqlConnection getConnection() {
+        Map<String, String> properties = new HashMap<>();
+        properties.put("database.hostname", MYSQL_CONTAINER.getHost());
+        properties.put("database.port", 
String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
+        properties.put("database.user", customDatabase.getUsername());
+        properties.put("database.password", customDatabase.getPassword());
+        properties.put("database.serverTimezone", ZoneId.of("UTC").toString());
+        io.debezium.config.Configuration configuration =
+                io.debezium.config.Configuration.from(properties);
+        return DebeziumUtils.createMySqlConnection(configuration, new 
Properties());
+    }
+
+    @Test
+    public void testAddNewTableOneByOneSingleParallelism() throws Exception {
+        TestParam testParam =
+                TestParam.newBuilder(
+                                Collections.singletonList("address_hangzhou"),
+                                4,
+                                Arrays.asList("address_hangzhou", 
"address_beijing"),
+                                4)
+                        .setFirstRoundInitTables(
+                                Arrays.asList("address_hangzhou", 
"address_beijing"))
+                        .build();
+
+        testAddNewTable(testParam, 1);
+    }
+
+    @Test
+    public void testAddNewTableOneByOne() throws Exception {
+        TestParam testParam =
+                TestParam.newBuilder(
+                                Collections.singletonList("address_hangzhou"),
+                                4,
+                                Arrays.asList("address_hangzhou", 
"address_beijing"),
+                                4)
+                        .setFirstRoundInitTables(
+                                Arrays.asList("address_hangzhou", 
"address_beijing"))
+                        .build();
+
+        testAddNewTable(testParam, DEFAULT_PARALLELISM);
+    }
+
+    @Test
+    public void testAddNewTableByPatternSingleParallelism() throws Exception {
+        TestParam testParam =
+                TestParam.newBuilder(
+                                Collections.singletonList("address_\\.*"),
+                                8,
+                                Collections.singletonList("address_\\.*"),
+                                8)
+                        .setFirstRoundInitTables(
+                                Arrays.asList("address_hangzhou", 
"address_beijing"))
+                        .setSecondRoundInitTables(
+                                Arrays.asList("address_shanghai", 
"address_suzhou"))
+                        .build();
+
+        testAddNewTable(testParam, 1);
+    }
+
+    @Test
+    public void testAddNewTableByPattern() throws Exception {
+        TestParam testParam =
+                TestParam.newBuilder(
+                                Collections.singletonList("address_\\.*"),
+                                8,
+                                Collections.singletonList("address_\\.*"),
+                                12)
+                        .setFirstRoundInitTables(
+                                Arrays.asList("address_hangzhou", 
"address_beijing"))
+                        .setSecondRoundInitTables(
+                                Arrays.asList(
+                                        "address_shanghai", "address_suzhou", 
"address_shenzhen"))
+                        .build();
+
+        testAddNewTable(testParam, DEFAULT_PARALLELISM);
+    }
+
+    private void testAddNewTable(TestParam testParam, int parallelism) throws 
Exception {
+        // step 1: create mysql tables
+        if (CollectionUtils.isNotEmpty(testParam.getFirstRoundInitTables())) {
+            initialAddressTables(getConnection(), 
testParam.getFirstRoundInitTables());
+        }
+        Path savepointDir = Files.createTempDirectory("add-new-table-test");
+        final String savepointDirectory = 
savepointDir.toAbsolutePath().toString();
+        String finishedSavePointPath = null;
+        StreamExecutionEnvironment env =
+                getStreamExecutionEnvironment(finishedSavePointPath, 
parallelism);
+        // step 2: listen tables first time
+        List<String> listenTablesFirstRound = 
testParam.getFirstRoundListenTables();
+
+        FlinkSourceProvider sourceProvider =
+                getFlinkSourceProvider(listenTablesFirstRound, parallelism);
+        DataStreamSource<Event> source =
+                env.fromSource(
+                        sourceProvider.getSource(),
+                        WatermarkStrategy.noWatermarks(),
+                        MySqlDataSourceFactory.IDENTIFIER,
+                        new EventTypeInfo());
+
+        TypeSerializer<Event> serializer =
+                
source.getTransformation().getOutputType().createSerializer(env.getConfig());
+        CheckpointedCollectResultBuffer<Event> resultBuffer =
+                new CheckpointedCollectResultBuffer<>(serializer);
+        String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
+        CollectResultIterator<Event> iterator =
+                addCollector(env, source, resultBuffer, serializer, 
accumulatorName);
+        JobClient jobClient = env.executeAsync("beforeAddNewTable");
+        iterator.setJobClient(jobClient);
+
+        List<Event> actual = fetchResults(iterator, 
testParam.getFirstRoundFetchSize());
+        Optional<String> listenByPattern =
+                listenTablesFirstRound.stream()
+                        .filter(table -> StringUtils.contains(table, "\\.*"))
+                        .findAny();
+        multiAssert(
+                actual,
+                listenByPattern.isPresent()
+                        ? testParam.getFirstRoundInitTables()
+                        : listenTablesFirstRound);
+
+        // step 3: create new tables if needed
+        if (CollectionUtils.isNotEmpty(testParam.getSecondRoundInitTables())) {
+            initialAddressTables(getConnection(), 
testParam.getSecondRoundInitTables());
+        }
+
+        // step 4: trigger a savepoint and cancel the job
+        finishedSavePointPath = triggerSavepointWithRetry(jobClient, 
savepointDirectory);
+        jobClient.cancel().get();
+        iterator.close();
+
+        // step 5: restore from savepoint
+        StreamExecutionEnvironment restoredEnv =
+                getStreamExecutionEnvironment(finishedSavePointPath, 
parallelism);
+        List<String> listenTablesSecondRound = 
testParam.getSecondRoundListenTables();
+        FlinkSourceProvider restoredSourceProvider =
+                getFlinkSourceProvider(listenTablesSecondRound, parallelism);
+        DataStreamSource<Event> restoreSource =
+                restoredEnv.fromSource(
+                        restoredSourceProvider.getSource(),
+                        WatermarkStrategy.noWatermarks(),
+                        MySqlDataSourceFactory.IDENTIFIER,
+                        new EventTypeInfo());
+        CollectResultIterator<Event> restoredIterator =
+                addCollector(restoredEnv, restoreSource, resultBuffer, 
serializer, accumulatorName);
+        JobClient restoreClient = restoredEnv.executeAsync("AfterAddNewTable");
+
+        List<String> newlyAddTables =
+                listenTablesSecondRound.stream()
+                        .filter(table -> 
!listenTablesFirstRound.contains(table))
+                        .collect(Collectors.toList());
+        // it means listen by pattern when newlyAddTables is empty
+        if (CollectionUtils.isEmpty(newlyAddTables)) {
+            newlyAddTables = testParam.getSecondRoundInitTables();
+        }
+        List<Event> newlyTableEvent =
+                fetchResults(restoredIterator, 
testParam.getSecondRoundFetchSize());
+        multiAssert(newlyTableEvent, newlyAddTables);
+        restoreClient.cancel().get();
+        restoredIterator.close();
+    }
+
+    private void multiAssert(List<Event> actualEvents, List<String> 
listenTables) {
+        List<Event> expectedCreateTableEvents = new ArrayList<>();
+        List<Event> expectedDataChangeEvents = new ArrayList<>();
+        for (String table : listenTables) {
+            expectedCreateTableEvents.add(
+                    
getCreateTableEvent(TableId.tableId(customDatabase.getDatabaseName(), table)));
+            expectedDataChangeEvents.addAll(
+                    
getSnapshotExpected(TableId.tableId(customDatabase.getDatabaseName(), table)));
+        }
+        // compare create table events
+        List<Event> actualCreateTableEvents =
+                actualEvents.stream()
+                        .filter(event -> event instanceof CreateTableEvent)
+                        .collect(Collectors.toList());
+        assertThat(actualCreateTableEvents)
+                
.containsExactlyInAnyOrder(expectedCreateTableEvents.toArray(new Event[0]));
+
+        // compare data change events
+        List<Event> actualDataChangeEvents =
+                actualEvents.stream()
+                        .filter(event -> event instanceof DataChangeEvent)
+                        .collect(Collectors.toList());
+        assertThat(actualDataChangeEvents)
+                
.containsExactlyInAnyOrder(expectedDataChangeEvents.toArray(new Event[0]));
+    }
+
+    private CreateTableEvent getCreateTableEvent(TableId tableId) {
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.BIGINT().notNull())
+                        .physicalColumn("country", 
DataTypes.VARCHAR(255).notNull())
+                        .physicalColumn("city", 
DataTypes.VARCHAR(255).notNull())
+                        .physicalColumn("detail_address", 
DataTypes.VARCHAR(1024))
+                        .primaryKey(Collections.singletonList("id"))
+                        .build();
+        return new CreateTableEvent(tableId, schema);
+    }
+
+    private List<Event> getSnapshotExpected(TableId tableId) {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.BIGINT().notNull(),
+                            DataTypes.VARCHAR(255).notNull(),
+                            DataTypes.VARCHAR(255).notNull(),
+                            DataTypes.VARCHAR(1024)
+                        },
+                        new String[] {"id", "country", "city", 
"detail_address"});
+        BinaryRecordDataGenerator generator = new 
BinaryRecordDataGenerator(rowType);
+        String cityName = tableId.getTableName().split("_")[1];
+        return Arrays.asList(
+                DataChangeEvent.insertEvent(
+                        tableId,
+                        generator.generate(
+                                new Object[] {
+                                    416874195632735147L,
+                                    BinaryStringData.fromString("China"),
+                                    BinaryStringData.fromString(cityName),
+                                    BinaryStringData.fromString(cityName + " 
West Town address 1")
+                                })),
+                DataChangeEvent.insertEvent(
+                        tableId,
+                        generator.generate(
+                                new Object[] {
+                                    416927583791428523L,
+                                    BinaryStringData.fromString("China"),
+                                    BinaryStringData.fromString(cityName),
+                                    BinaryStringData.fromString(cityName + " 
West Town address 2")
+                                })),
+                DataChangeEvent.insertEvent(
+                        tableId,
+                        generator.generate(
+                                new Object[] {
+                                    417022095255614379L,
+                                    BinaryStringData.fromString("China"),
+                                    BinaryStringData.fromString(cityName),
+                                    BinaryStringData.fromString(cityName + " 
West Town address 3")
+                                })));
+    }
+
+    private String triggerSavepointWithRetry(JobClient jobClient, String 
savepointDirectory)
+            throws ExecutionException, InterruptedException {
+        int retryTimes = 0;
+        // retry 600 times, it takes 100 milliseconds per time, at most retry 
1 minute
+        while (retryTimes < 600) {
+            try {
+                return jobClient.triggerSavepoint(savepointDirectory).get();
+            } catch (Exception e) {
+                Optional<CheckpointException> exception =
+                        ExceptionUtils.findThrowable(e, 
CheckpointException.class);
+                if (exception.isPresent()
+                        && exception.get().getMessage().contains("Checkpoint 
triggering task")) {
+                    Thread.sleep(100);
+                    retryTimes++;
+                } else {
+                    throw e;
+                }
+            }
+        }
+        return null;
+    }
+
+    private void initialAddressTables(JdbcConnection connection, List<String> 
addressTables)
+            throws SQLException {
+        try {
+            connection.setAutoCommit(false);
+            for (String tableName : addressTables) {
+                // make initial data for given table
+                String tableId = customDatabase.getDatabaseName() + "." + 
tableName;
+                String cityName = tableName.split("_")[1];
+                connection.execute(
+                        "CREATE TABLE IF NOT EXISTS "
+                                + tableId
+                                + "("
+                                + "  id BIGINT NOT NULL PRIMARY KEY,"
+                                + "  country VARCHAR(255) NOT NULL,"
+                                + "  city VARCHAR(255) NOT NULL,"
+                                + "  detail_address VARCHAR(1024)"
+                                + ");");
+                connection.execute(
+                        format(
+                                "INSERT INTO  %s "
+                                        + "VALUES (416874195632735147, 
'China', '%s', '%s West Town address 1'),"
+                                        + "       (416927583791428523, 
'China', '%s', '%s West Town address 2'),"
+                                        + "       (417022095255614379, 
'China', '%s', '%s West Town address 3');",
+                                tableId, cityName, cityName, cityName, 
cityName, cityName,
+                                cityName));
+            }
+            connection.commit();
+        } finally {
+            connection.close();
+        }
+    }
+
+    private FlinkSourceProvider getFlinkSourceProvider(List<String> tables, 
int parallelism) {
+        List<String> fullTableNames =
+                tables.stream()
+                        .map(table -> customDatabase.getDatabaseName() + "." + 
table)
+                        .collect(Collectors.toList());
+        Map<String, String> options = new HashMap<>();
+        options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost());
+        options.put(PORT.key(), 
String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
+        options.put(USERNAME.key(), TEST_USER);
+        options.put(PASSWORD.key(), TEST_PASSWORD);
+        options.put(SERVER_TIME_ZONE.key(), "UTC");
+        options.put(TABLES.key(), StringUtils.join(fullTableNames, ","));
+        options.put(SERVER_ID.key(), getServerId(parallelism));
+        options.put(SCAN_NEWLY_ADDED_TABLE_ENABLED.key(), "true");
+        Factory.Context context =
+                new FactoryHelper.DefaultContext(
+                        
org.apache.flink.cdc.common.configuration.Configuration.fromMap(options),
+                        null,
+                        this.getClass().getClassLoader());
+
+        MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
+        MySqlDataSource dataSource = (MySqlDataSource) 
factory.createDataSource(context);
+
+        return (FlinkSourceProvider) dataSource.getEventSourceProvider();
+    }
+
+    private <T> CollectResultIterator<T> addCollector(
+            StreamExecutionEnvironment env,
+            DataStreamSource<T> source,
+            AbstractCollectResultBuffer<T> buffer,
+            TypeSerializer<T> serializer,
+            String accumulatorName) {
+        CollectSinkOperatorFactory<T> sinkFactory =
+                new CollectSinkOperatorFactory<>(serializer, accumulatorName);
+        CollectSinkOperator<T> operator = (CollectSinkOperator<T>) 
sinkFactory.getOperator();
+        CollectResultIterator<T> iterator =
+                new CollectResultIterator<>(
+                        buffer, operator.getOperatorIdFuture(), 
accumulatorName, 0);
+        CollectStreamSink<T> sink = new CollectStreamSink<>(source, 
sinkFactory);
+        sink.name("Data stream collect sink");
+        env.addOperator(sink.getTransformation());
+        env.registerCollectIterator(iterator);
+        return iterator;
+    }
+
+    private StreamExecutionEnvironment getStreamExecutionEnvironment(
+            String finishedSavePointPath, int parallelism) {
+        Configuration configuration = new Configuration();
+        if (finishedSavePointPath != null) {
+            configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, 
finishedSavePointPath);
+        }
+        StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
+        env.setParallelism(parallelism);
+        env.enableCheckpointing(500L);
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 1000L));
+        return env;
+    }
+
+    private static class TestParam {
+        private final List<String> firstRoundInitTables;
+        private final List<String> firstRoundListenTables;
+        private final Integer firstRoundFetchSize;
+        private final List<String> secondRoundInitTables;
+        private final List<String> secondRoundListenTables;
+        private final Integer secondRoundFetchSize;
+
+        private TestParam(Builder builder) {
+            this.firstRoundInitTables = builder.firstRoundInitTables;
+            this.firstRoundListenTables = builder.firstRoundListenTables;
+            this.firstRoundFetchSize = builder.firstRoundFetchSize;
+            this.secondRoundInitTables = builder.secondRoundInitTables;
+            this.secondRoundListenTables = builder.secondRoundListenTables;
+            this.secondRoundFetchSize = builder.secondRoundFetchSize;
+        }
+
+        public static Builder newBuilder(
+                List<String> firstRoundListenTables,
+                Integer firstRoundFetchSize,
+                List<String> secondRoundListenTables,
+                Integer secondRoundFetchSize) {
+            return new Builder(
+                    firstRoundListenTables,
+                    firstRoundFetchSize,
+                    secondRoundListenTables,
+                    secondRoundFetchSize);
+        }
+
+        public static class Builder {
+            private List<String> firstRoundInitTables;
+            private final List<String> firstRoundListenTables;
+            private final Integer firstRoundFetchSize;
+
+            private List<String> secondRoundInitTables;
+            private final List<String> secondRoundListenTables;
+            private final Integer secondRoundFetchSize;
+
+            public Builder(
+                    List<String> firstRoundListenTables,
+                    Integer firstRoundFetchSize,
+                    List<String> secondRoundListenTables,
+                    Integer secondRoundFetchSize) {
+                this.firstRoundListenTables = firstRoundListenTables;
+                this.firstRoundFetchSize = firstRoundFetchSize;
+                this.secondRoundListenTables = secondRoundListenTables;
+                this.secondRoundFetchSize = secondRoundFetchSize;
+            }
+
+            public TestParam build() {
+                return new TestParam(this);
+            }
+
+            public Builder setFirstRoundInitTables(List<String> 
firstRoundInitTables) {
+                this.firstRoundInitTables = firstRoundInitTables;
+                return this;
+            }
+
+            public Builder setSecondRoundInitTables(List<String> 
secondRoundInitTables) {
+                this.secondRoundInitTables = secondRoundInitTables;
+                return this;
+            }
+        }
+
+        public List<String> getFirstRoundInitTables() {
+            return firstRoundInitTables;
+        }
+
+        public List<String> getFirstRoundListenTables() {
+            return firstRoundListenTables;
+        }
+
+        public Integer getFirstRoundFetchSize() {
+            return firstRoundFetchSize;
+        }
+
+        public List<String> getSecondRoundInitTables() {
+            return secondRoundInitTables;
+        }
+
+        public List<String> getSecondRoundListenTables() {
+            return secondRoundListenTables;
+        }
+
+        public Integer getSecondRoundFetchSize() {
+            return secondRoundFetchSize;
+        }
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/customer.sql
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/customer.sql
new file mode 100644
index 000000000..e4df63f1a
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/customer.sql
@@ -0,0 +1,328 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+-- 
+--      http://www.apache.org/licenses/LICENSE-2.0
+-- 
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+-- 
----------------------------------------------------------------------------------------------------------------
+-- DATABASE:  customer
+-- 
----------------------------------------------------------------------------------------------------------------
+
+-- Create and populate our users using a single insert with many rows
+CREATE TABLE customers (
+  id INTEGER NOT NULL PRIMARY KEY,
+  name VARCHAR(255) NOT NULL DEFAULT 'flink',
+  address VARCHAR(1024),
+  phone_number VARCHAR(512)
+);
+
+INSERT INTO customers
+VALUES (101,"user_1","Shanghai","123567891234"),
+       (102,"user_2","Shanghai","123567891234"),
+       (103,"user_3","Shanghai","123567891234"),
+       (109,"user_4","Shanghai","123567891234"),
+       (110,"user_5","Shanghai","123567891234"),
+       (111,"user_6","Shanghai","123567891234"),
+       (118,"user_7","Shanghai","123567891234"),
+       (121,"user_8","Shanghai","123567891234"),
+       (123,"user_9","Shanghai","123567891234"),
+       (1009,"user_10","Shanghai","123567891234"),
+       (1010,"user_11","Shanghai","123567891234"),
+       (1011,"user_12","Shanghai","123567891234"),
+       (1012,"user_13","Shanghai","123567891234"),
+       (1013,"user_14","Shanghai","123567891234"),
+       (1014,"user_15","Shanghai","123567891234"),
+       (1015,"user_16","Shanghai","123567891234"),
+       (1016,"user_17","Shanghai","123567891234"),
+       (1017,"user_18","Shanghai","123567891234"),
+       (1018,"user_19","Shanghai","123567891234"),
+       (1019,"user_20","Shanghai","123567891234"),
+       (2000,"user_21","Shanghai","123567891234");
+
+-- Create a table will not be read
+CREATE TABLE prefix_customers (
+  id INTEGER NOT NULL PRIMARY KEY,
+  name VARCHAR(255) NOT NULL DEFAULT 'flink',
+  address VARCHAR(1024),
+  phone_number VARCHAR(512)
+);
+
+INSERT INTO prefix_customers
+VALUES (101,"user_1","Shanghai","123567891234"),
+       (102,"user_2","Shanghai","123567891234"),
+       (103,"user_3","Shanghai","123567891234"),
+       (109,"user_4","Shanghai","123567891234");
+
+-- table has same name prefix with 'customers.*'
+CREATE TABLE customers_1 (
+  id INTEGER NOT NULL PRIMARY KEY,
+  name VARCHAR(255) NOT NULL DEFAULT 'flink',
+  address VARCHAR(1024),
+  phone_number VARCHAR(512)
+);
+
+INSERT INTO customers_1
+VALUES (101,"user_1","Shanghai","123567891234"),
+       (102,"user_2","Shanghai","123567891234"),
+       (103,"user_3","Shanghai","123567891234"),
+       (109,"user_4","Shanghai","123567891234"),
+       (110,"user_5","Shanghai","123567891234"),
+       (111,"user_6","Shanghai","123567891234"),
+       (118,"user_7","Shanghai","123567891234"),
+       (121,"user_8","Shanghai","123567891234"),
+       (123,"user_9","Shanghai","123567891234"),
+       (1009,"user_10","Shanghai","123567891234"),
+       (1010,"user_11","Shanghai","123567891234"),
+       (1011,"user_12","Shanghai","123567891234"),
+       (1012,"user_13","Shanghai","123567891234"),
+       (1013,"user_14","Shanghai","123567891234"),
+       (1014,"user_15","Shanghai","123567891234"),
+       (1015,"user_16","Shanghai","123567891234"),
+       (1016,"user_17","Shanghai","123567891234"),
+       (1017,"user_18","Shanghai","123567891234"),
+       (1018,"user_19","Shanghai","123567891234"),
+       (1019,"user_20","Shanghai","123567891234"),
+       (2000,"user_21","Shanghai","123567891234");
+
+-- create table whose split key is evenly distributed
+CREATE TABLE customers_even_dist (
+  id INTEGER NOT NULL PRIMARY KEY,
+  name VARCHAR(255) NOT NULL ,
+  address VARCHAR(1024),
+  phone_number VARCHAR(512)
+);
+INSERT INTO customers_even_dist
+VALUES (101,'user_1','Shanghai','123567891234'),
+       (102,'user_2','Shanghai','123567891234'),
+       (103,'user_3','Shanghai','123567891234'),
+       (104,'user_4','Shanghai','123567891234'),
+       (105,'user_5','Shanghai','123567891234'),
+       (106,'user_6','Shanghai','123567891234'),
+       (107,'user_7','Shanghai','123567891234'),
+       (108,'user_8','Shanghai','123567891234'),
+       (109,'user_9','Shanghai','123567891234'),
+       (110,'user_10','Shanghai','123567891234');
+
+-- create table whose split key is evenly distributed and sparse
+CREATE TABLE customers_sparse_dist (
+  id INTEGER NOT NULL PRIMARY KEY,
+  name VARCHAR(255) NOT NULL ,
+  address VARCHAR(1024),
+  phone_number VARCHAR(512)
+);
+INSERT INTO customers_sparse_dist
+VALUES (2,'user_1','Shanghai','123567891234'),
+       (4,'user_2','Shanghai','123567891234'),
+       (6,'user_3','Shanghai','123567891234'),
+       (8,'user_4','Shanghai','123567891234'),
+       (10,'user_5','Shanghai','123567891234'),
+       (16,'user_6','Shanghai','123567891234'),
+       (17,'user_7','Shanghai','123567891234'),
+       (18,'user_8','Shanghai','123567891234'),
+       (20,'user_9','Shanghai','123567891234'),
+       (22,'user_10','Shanghai','123567891234');
+
+-- create table whose split key is evenly distributed and dense
+CREATE TABLE customers_dense_dist (
+ id1 INTEGER NOT NULL,
+ id2 VARCHAR(255) NOT NULL ,
+ address VARCHAR(1024),
+ phone_number VARCHAR(512),
+ PRIMARY KEY(id1, id2)
+);
+INSERT INTO customers_dense_dist
+VALUES (1,'user_1','Shanghai','123567891234'),
+       (1,'user_2','Shanghai','123567891234'),
+       (1,'user_3','Shanghai','123567891234'),
+       (1,'user_4','Shanghai','123567891234'),
+       (2,'user_5','Shanghai','123567891234'),
+       (2,'user_6','Shanghai','123567891234'),
+       (2,'user_7','Shanghai','123567891234'),
+       (3,'user_8','Shanghai','123567891234'),
+       (3,'user_9','Shanghai','123567891234'),
+       (3,'user_10','Shanghai','123567891234');
+
+CREATE TABLE customers_no_pk (
+   id INTEGER,
+   name VARCHAR(255) DEFAULT 'flink',
+   address VARCHAR(1024),
+   phone_number VARCHAR(512)
+);
+
+INSERT INTO customers_no_pk
+VALUES (101,"user_1","Shanghai","123567891234"),
+       (102,"user_2","Shanghai","123567891234"),
+       (103,"user_3","Shanghai","123567891234"),
+       (109,"user_4","Shanghai","123567891234"),
+       (110,"user_5","Shanghai","123567891234"),
+       (111,"user_6","Shanghai","123567891234"),
+       (118,"user_7","Shanghai","123567891234"),
+       (121,"user_8","Shanghai","123567891234"),
+       (123,"user_9","Shanghai","123567891234"),
+       (1009,"user_10","Shanghai","123567891234"),
+       (1010,"user_11","Shanghai","123567891234"),
+       (1011,"user_12","Shanghai","123567891234"),
+       (1012,"user_13","Shanghai","123567891234"),
+       (1013,"user_14","Shanghai","123567891234"),
+       (1014,"user_15","Shanghai","123567891234"),
+       (1015,"user_16","Shanghai","123567891234"),
+       (1016,"user_17","Shanghai","123567891234"),
+       (1017,"user_18","Shanghai","123567891234"),
+       (1018,"user_19","Shanghai","123567891234"),
+       (1019,"user_20","Shanghai","123567891234"),
+       (2000,"user_21","Shanghai","123567891234");
+
+-- table has combined primary key
+CREATE TABLE customer_card (
+  card_no BIGINT NOT NULL,
+  level VARCHAR(10) NOT NULL,
+  name VARCHAR(255) NOT NULL DEFAULT 'flink',
+  note VARCHAR(1024),
+  PRIMARY KEY(card_no, level)
+);
+
+insert into customer_card
+VALUES (20001, 'LEVEL_4', 'user_1', 'user with level 4'),
+       (20002, 'LEVEL_4', 'user_2', 'user with level 4'),
+       (20003, 'LEVEL_4', 'user_3', 'user with level 4'),
+       (20004, 'LEVEL_4', 'user_4', 'user with level 4'),
+       (20004, 'LEVEL_1', 'user_4', 'user with level 4'),
+       (20004, 'LEVEL_2', 'user_4', 'user with level 4'),
+       (20004, 'LEVEL_3', 'user_4', 'user with level 4'),
+       (30006, 'LEVEL_3', 'user_5', 'user with level 3'),
+       (30007, 'LEVEL_3', 'user_6', 'user with level 3'),
+       (30008, 'LEVEL_3', 'user_7', 'user with level 3'),
+       (30009, 'LEVEL_3', 'user_8', 'user with level 3'),
+       (30009, 'LEVEL_2', 'user_8', 'user with level 3'),
+       (30009, 'LEVEL_1', 'user_8', 'user with level 3'),
+       (40001, 'LEVEL_2', 'user_9', 'user with level 2'),
+       (40002, 'LEVEL_2', 'user_10', 'user with level 2'),
+       (40003, 'LEVEL_2', 'user_11', 'user with level 2'),
+       (50001, 'LEVEL_1', 'user_12', 'user with level 1'),
+       (50002, 'LEVEL_1', 'user_13', 'user with level 1'),
+       (50003, 'LEVEL_1', 'user_14', 'user with level 1');
+
+-- table has single line
+CREATE TABLE customer_card_single_line (
+  card_no BIGINT NOT NULL,
+  level VARCHAR(10) NOT NULL,
+  name VARCHAR(255) NOT NULL DEFAULT 'flink',
+  note VARCHAR(1024),
+  PRIMARY KEY(card_no, level)
+);
+
+insert into customer_card_single_line
+VALUES (20001, 'LEVEL_1', 'user_1', 'user with level 1');
+
+
+-- table has combined primary key
+CREATE TABLE shopping_cart (
+  product_no INT NOT NULL,
+  product_kind VARCHAR(255),
+  user_id VARCHAR(255) NOT NULL,
+  description VARCHAR(255) NOT NULL,
+  PRIMARY KEY(user_id, product_no, product_kind)
+);
+
+insert into shopping_cart
+VALUES (101, 'KIND_001', 'user_1', 'my shopping cart'),
+       (101, 'KIND_002', 'user_1', 'my shopping cart'),
+       (102, 'KIND_007', 'user_1', 'my shopping cart'),
+       (102, 'KIND_008', 'user_1', 'my shopping cart'),
+       (501, 'KIND_100', 'user_2', 'my shopping list'),
+       (701, 'KIND_999', 'user_3', 'my shopping list'),
+       (801, 'KIND_010', 'user_4', 'my shopping list'),
+       (600, 'KIND_009', 'user_4', 'my shopping list'),
+       (401, 'KIND_002', 'user_5', 'leo list'),
+       (401, 'KIND_007', 'user_5', 'leo list'),
+       (404, 'KIND_008', 'user_5', 'leo list'),
+       (600, 'KIND_009', 'user_6', 'my shopping cart');
+
+-- table has combined primary key and one of the primary key is evenly
+CREATE TABLE evenly_shopping_cart (
+  product_no INT NOT NULL,
+  product_kind VARCHAR(255),
+  user_id VARCHAR(255) NOT NULL,
+  description VARCHAR(255) NOT NULL,
+  PRIMARY KEY(product_kind, product_no, user_id)
+);
+
+insert into evenly_shopping_cart
+VALUES (101, 'KIND_001', 'user_1', 'my shopping cart'),
+       (102, 'KIND_002', 'user_1', 'my shopping cart'),
+       (103, 'KIND_007', 'user_1', 'my shopping cart'),
+       (104, 'KIND_008', 'user_1', 'my shopping cart'),
+       (105, 'KIND_100', 'user_2', 'my shopping list'),
+       (105, 'KIND_999', 'user_3', 'my shopping list'),
+       (107, 'KIND_010', 'user_4', 'my shopping list'),
+       (108, 'KIND_009', 'user_4', 'my shopping list'),
+       (109, 'KIND_002', 'user_5', 'leo list'),
+       (111, 'KIND_007', 'user_5', 'leo list'),
+       (111, 'KIND_008', 'user_5', 'leo list'),
+       (112, 'KIND_009', 'user_6', 'my shopping cart');
+
+-- table has bigint unsigned auto increment primary key
+CREATE TABLE shopping_cart_big (
+  product_no BIGINT UNSIGNED AUTO_INCREMENT NOT NULL,
+  product_kind VARCHAR(255),
+  user_id VARCHAR(255) NOT NULL,
+  description VARCHAR(255) NOT NULL,
+  PRIMARY KEY(product_no)
+);
+
+insert into shopping_cart_big
+VALUES (default, 'KIND_001', 'user_1', 'my shopping cart'),
+       (default, 'KIND_002', 'user_1', 'my shopping cart'),
+       (default, 'KIND_003', 'user_1', 'my shopping cart');
+
+-- table has decimal primary key
+CREATE TABLE shopping_cart_dec (
+  product_no DECIMAL(10, 4) NOT NULL,
+  product_kind VARCHAR(255),
+  user_id VARCHAR(255) NOT NULL,
+  description VARCHAR(255) DEFAULT 'flink',
+  PRIMARY KEY(product_no)
+);
+
+insert into shopping_cart_dec
+VALUES (123456.123, 'KIND_001', 'user_1', 'my shopping cart'),
+       (123457.456, 'KIND_002', 'user_2', 'my shopping cart'),
+       (123458.6789, 'KIND_003', 'user_3', 'my shopping cart'),
+       (123459.1234, 'KIND_004', 'user_4', null);
+
+-- create table whose primary key are produced by snowflake algorithm
+CREATE TABLE address (
+  id BIGINT UNSIGNED NOT NULL PRIMARY KEY,
+  country VARCHAR(255) NOT NULL,
+  city VARCHAR(255) NOT NULL,
+  detail_address VARCHAR(1024)
+);
+
+INSERT INTO address
+VALUES (416874195632735147, 'China', 'Beijing', 'West Town address 1'),
+       (416927583791428523, 'China', 'Beijing', 'West Town address 2'),
+       (417022095255614379, 'China', 'Beijing', 'West Town address 3'),
+       (417111867899200427, 'America', 'New York', 'East Town address 1'),
+       (417271541558096811, 'America', 'New York', 'East Town address 2'),
+       (417272886855938987, 'America', 'New York', 'East Town address 3'),
+       (417420106184475563, 'Germany', 'Berlin', 'West Town address 1'),
+       (418161258277847979, 'Germany', 'Berlin', 'West Town address 2');
+
+CREATE TABLE default_value_test (
+  id INTEGER NOT NULL PRIMARY KEY,
+  name VARCHAR(255) NOT NULL DEFAULT 'flink',
+  address VARCHAR(1024),
+  phone_number INTEGER DEFAULT ' 123 '
+);
+INSERT INTO default_value_test
+VALUES (1,'user1','Shanghai',123567),
+       (2,'user2','Shanghai',123567);

Reply via email to