This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 455e6cd1d [cdc] Scan newly added tables when recover from snapshot 
(#1345)
455e6cd1d is described below

commit 455e6cd1d4806d885dc93cf5ad1117922b3ff8ee
Author: Dian Qi <[email protected]>
AuthorDate: Fri Jun 9 18:33:21 2023 +0800

    [cdc] Scan newly added tables when recover from snapshot (#1345)
---
 docs/content/how-to/cdc-ingestion.md               |  51 ++++-
 .../src/test/resources/log4j2-test.properties      |   2 +-
 .../flink/action/cdc/mysql/MySqlActionUtils.java   |  17 +-
 .../cdc/mysql/MySqlSyncDatabaseActionITCase.java   | 246 +++++++++++++++++++++
 .../src/test/resources/mysql/setup.sql             |  23 ++
 5 files changed, 336 insertions(+), 3 deletions(-)

diff --git a/docs/content/how-to/cdc-ingestion.md 
b/docs/content/how-to/cdc-ingestion.md
index 4a0f7f5f6..acd5edc1a 100644
--- a/docs/content/how-to/cdc-ingestion.md
+++ b/docs/content/how-to/cdc-ingestion.md
@@ -123,7 +123,7 @@ Only tables with primary keys will be synchronized.
 
 For each MySQL table to be synchronized, if the corresponding Paimon table 
does not exist, this action will automatically create the table. Its schema 
will be derived from all specified MySQL tables. If the Paimon table already 
exists, its schema will be compared against the schema of all specified MySQL 
tables.
 
-Example
+Example 1: synchronize entire database
 
 ```bash
 <FLINK_HOME>/bin/flink run \
@@ -142,6 +142,55 @@ Example
     --table-conf sink.parallelism=4
 ```
 
+Example 2: synchronize newly added tables under database
+
+Let's say at first a Flink job is synchronizing tables [product, user, 
address] 
+under database `source_db`. The command to submit the job looks like:
+
+```bash
+<FLINK_HOME>/bin/flink run \
+    /path/to/paimon-flink-action-{{< version >}}.jar \
+    mysql-sync-database \
+    --warehouse hdfs:///path/to/warehouse \
+    --database test_db \
+    --mysql-conf hostname=127.0.0.1 \
+    --mysql-conf username=root \
+    --mysql-conf password=123456 \
+    --mysql-conf database-name=source_db \
+    --catalog-conf metastore=hive \
+    --catalog-conf uri=thrift://hive-metastore:9083 \
+    --table-conf bucket=4 \
+    --table-conf changelog-producer=input \
+    --table-conf sink.parallelism=4 \
+    --including-tables 'product|user|address'
+```
+
+At a later point we would like the job to also synchronize tables [order, 
custom], 
+which contains history data. We can achieve this by recovering from the 
previous
+snapshot of the job and thus reusing existing state of the job. The recovered 
job will 
+first snapshot newly added tables, and then continue reading changelog from 
previous 
+position automatically.
+
+The command to recover from previous snapshot and add new tables to 
synchronize looks like:
+
+
+```bash
+<FLINK_HOME>/bin/flink run \
+    --fromSavepoint {{< savepointPath >}} \
+    /path/to/paimon-flink-action-{{< version >}}.jar \
+    mysql-sync-database \
+    --warehouse hdfs:///path/to/warehouse \
+    --database test_db \
+    --mysql-conf hostname=127.0.0.1 \
+    --mysql-conf username=root \
+    --mysql-conf password=123456 \
+    --mysql-conf database-name=source_db \
+    --catalog-conf metastore=hive \
+    --catalog-conf uri=thrift://hive-metastore:9083 \
+    --table-conf bucket=4 \
+    --including-tables 'product|user|address|order|custom'
+```
+
 ## Kafka
 
 ### Prepare Kafka Bundled Jar
diff --git a/paimon-e2e-tests/src/test/resources/log4j2-test.properties 
b/paimon-e2e-tests/src/test/resources/log4j2-test.properties
index 1b3980d15..e27922dad 100644
--- a/paimon-e2e-tests/src/test/resources/log4j2-test.properties
+++ b/paimon-e2e-tests/src/test/resources/log4j2-test.properties
@@ -18,7 +18,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-rootLogger.level = OFF
+rootLogger.level = INFO
 rootLogger.appenderRef.test.ref = TestLogger
 
 appender.testlogger.name = TestLogger
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
index dc6815f2c..46dc25347 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
@@ -36,6 +36,8 @@ import 
com.ververica.cdc.connectors.mysql.table.StartupOptions;
 import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
 import com.ververica.cdc.debezium.table.DebeziumOptions;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.kafka.connect.json.JsonConverterConfig;
 import org.slf4j.Logger;
@@ -57,6 +59,12 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
 class MySqlActionUtils {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(MySqlActionUtils.class);
+    public static final ConfigOption<Boolean> SCAN_NEWLY_ADDED_TABLE_ENABLED =
+            ConfigOptions.key("scan.newly-added-table.enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Whether capture the scan the newly added tables 
or not, by default is true.");
 
     static Connection getConnection(Configuration mySqlConfig) throws 
Exception {
         return DriverManager.getConnection(
@@ -254,7 +262,14 @@ class MySqlActionUtils {
         customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, 
"numeric");
         JsonDebeziumDeserializationSchema schema =
                 new JsonDebeziumDeserializationSchema(true, 
customConverterConfigs);
-        return 
sourceBuilder.deserializer(schema).includeSchemaChanges(true).build();
+
+        boolean scanNewlyAddedTables = 
mySqlConfig.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
+
+        return sourceBuilder
+                .deserializer(schema)
+                .includeSchemaChanges(true)
+                .scanNewlyAddedTableEnabled(scanNewlyAddedTables)
+                .build();
     }
 
     private static void validateMySqlConfig(Configuration mySqlConfig) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
index c4f960925..52b14119d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -32,20 +32,32 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.JsonSerdeUtil;
 
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.io.TempDir;
 
 import javax.annotation.Nullable;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
+import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -55,6 +67,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 public class MySqlSyncDatabaseActionITCase extends MySqlActionITCaseBase {
 
     private static final String DATABASE_NAME = "paimon_sync_database";
+    @TempDir java.nio.file.Path tempDir;
 
     @Test
     @Timeout(60)
@@ -619,6 +632,239 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
         }
     }
 
+    @Test
+    @Timeout(600)
+    public void testNewlyAddedTables() throws Exception {
+        JobClient client = buildSyncDatabaseActionWithNewlyAddedTables();
+        waitJobRunning(client);
+
+        try (Connection conn =
+                DriverManager.getConnection(
+                        
MYSQL_CONTAINER.getJdbcUrl("paimon_sync_database_newly_added_tables"),
+                        MYSQL_CONTAINER.getUsername(),
+                        MYSQL_CONTAINER.getPassword())) {
+            try (Statement statement = conn.createStatement()) {
+                testNewlyAddedTableImpl(client, statement, 1, true, false);
+            }
+        }
+    }
+
+    private void testNewlyAddedTableImpl(
+            JobClient client,
+            Statement statement,
+            int newlyAddedTableCount,
+            boolean testSavepointRecovery,
+            boolean testSchemaChange)
+            throws Exception {
+        FileStoreTable table1 = getFileStoreTable("t1");
+        FileStoreTable table2 = getFileStoreTable("t2");
+
+        statement.executeUpdate("USE paimon_sync_database_newly_added_tables");
+
+        statement.executeUpdate("INSERT INTO t1 VALUES (1, 'one')");
+        statement.executeUpdate("INSERT INTO t2 VALUES (2, 'two', 20, 200)");
+        statement.executeUpdate("INSERT INTO t1 VALUES (3, 'three')");
+        statement.executeUpdate("INSERT INTO t2 VALUES (4, 'four', 40, 400)");
+        RowType rowType1 =
+                RowType.of(
+                        new DataType[] {DataTypes.INT().notNull(), 
DataTypes.VARCHAR(10)},
+                        new String[] {"k", "v1"});
+        List<String> primaryKeys1 = Collections.singletonList("k");
+        List<String> expected = Arrays.asList("+I[1, one]", "+I[3, three]");
+        waitForResult(expected, table1, rowType1, primaryKeys1);
+
+        RowType rowType2 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(),
+                            DataTypes.VARCHAR(10).notNull(),
+                            DataTypes.INT(),
+                            DataTypes.BIGINT()
+                        },
+                        new String[] {"k1", "k2", "v1", "v2"});
+        List<String> primaryKeys2 = Arrays.asList("k1", "k2");
+        expected = Arrays.asList("+I[2, two, 20, 200]", "+I[4, four, 40, 
400]");
+        waitForResult(expected, table2, rowType2, primaryKeys2);
+
+        // Create new tables at runtime. The Flink job is guaranteed to at 
incremental
+        //    sync phase, because the newly added table will not be captured 
in snapshot
+        //    phase.
+        Map<String, List<Tuple2<Integer, String>>> recordsMap = new 
HashMap<>();
+        List<String> newTablePrimaryKeys = Collections.singletonList("k");
+        RowType newTableRowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT().notNull(), 
DataTypes.VARCHAR(10)},
+                        new String[] {"k", "v1"});
+        int newTableCount = 0;
+        String newTableName = getNewTableName(newTableCount);
+
+        createNewTable(statement, newTableName);
+        statement.executeUpdate("INSERT INTO t2 VALUES (8, 'eight', 80, 800)");
+        List<Tuple2<Integer, String>> newTableRecords = 
getNewTableRecords(newTableCount);
+        recordsMap.put(newTableName, newTableRecords);
+        List<String> newTableExpected = getNewTableExpected(newTableRecords);
+        insertRecordsIntoNewTable(statement, newTableName, newTableRecords);
+
+        // suspend the job and restart from savepoint
+        if (testSavepointRecovery) {
+            String savepoint =
+                    client.stopWithSavepoint(
+                                    false,
+                                    tempDir.toUri().toString(),
+                                    SavepointFormatType.CANONICAL)
+                            .join();
+            assertThat(savepoint).isNotBlank();
+
+            client = buildSyncDatabaseActionWithNewlyAddedTables(savepoint);
+            waitJobRunning(client);
+        }
+
+        // wait until table t2 contains the updated record, and then check
+        //     for existence of first newly added table
+        expected =
+                Arrays.asList(
+                        "+I[2, two, 20, 200]", "+I[4, four, 40, 400]", "+I[8, 
eight, 80, 800]");
+        waitForResult(expected, table2, rowType2, primaryKeys2);
+
+        FileStoreTable newTable = getFileStoreTable(newTableName);
+        waitForResult(newTableExpected, newTable, newTableRowType, 
newTablePrimaryKeys);
+
+        for (newTableCount = 1; newTableCount < newlyAddedTableCount; 
++newTableCount) {
+            // create new table
+            newTableName = getNewTableName(newTableCount);
+            createNewTable(statement, newTableName);
+
+            Thread.sleep(5000L);
+
+            // insert records
+            newTableRecords = getNewTableRecords(newTableCount);
+            recordsMap.put(newTableName, newTableRecords);
+            insertRecordsIntoNewTable(statement, newTableName, 
newTableRecords);
+            newTable = getFileStoreTable(newTableName);
+            newTableExpected = getNewTableExpected(newTableRecords);
+            waitForResult(newTableExpected, newTable, newTableRowType, 
newTablePrimaryKeys);
+        }
+
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+
+        // pick a random newly added table and insert records
+        int pick = random.nextInt(newlyAddedTableCount);
+        String tableName = getNewTableName(pick);
+        List<Tuple2<Integer, String>> records = recordsMap.get(tableName);
+        records.add(Tuple2.of(80, "eighty"));
+        newTable = getFileStoreTable(newTableName);
+        newTableExpected = getNewTableExpected(records);
+        statement.executeUpdate(String.format("INSERT INTO %s VALUES (80, 
'eighty')", tableName));
+
+        waitForResult(newTableExpected, newTable, newTableRowType, 
newTablePrimaryKeys);
+
+        // test schema change
+        if (testSchemaChange) {
+            pick = random.nextInt(newlyAddedTableCount);
+            tableName = getNewTableName(pick);
+            records = recordsMap.get(tableName);
+
+            statement.executeUpdate(String.format("ALTER TABLE %s ADD COLUMN 
v2 INT", tableName));
+            statement.executeUpdate(
+                    String.format("INSERT INTO %s VALUES (100, 'hundred', 
10000)", tableName));
+
+            List<String> expectedRecords =
+                    records.stream()
+                            .map(tuple -> String.format("+I[%d, %s, NULL]", 
tuple.f0, tuple.f1))
+                            .collect(Collectors.toList());
+            expectedRecords.add("+I[100, hundred, 10000]");
+
+            newTable = getFileStoreTable(tableName);
+            RowType rowType =
+                    RowType.of(
+                            new DataType[] {
+                                DataTypes.INT().notNull(), 
DataTypes.VARCHAR(10), DataTypes.INT()
+                            },
+                            new String[] {"k", "v1", "v2"});
+            waitForResult(expectedRecords, newTable, rowType, 
newTablePrimaryKeys);
+        }
+    }
+
+    private List<String> getNewTableExpected(List<Tuple2<Integer, String>> 
newTableRecords) {
+        return newTableRecords.stream()
+                .map(tuple -> String.format("+I[%d, %s]", tuple.f0, tuple.f1))
+                .collect(Collectors.toList());
+    }
+
+    private List<Tuple2<Integer, String>> getNewTableRecords(int 
newTableCount) {
+        List<Tuple2<Integer, String>> records = new LinkedList<>();
+        int count = ThreadLocalRandom.current().nextInt(10) + 1;
+        for (int i = 0; i < count; i++) {
+            records.add(Tuple2.of(i, "varchar_" + i));
+        }
+        return records;
+    }
+
+    private void insertRecordsIntoNewTable(
+            Statement statement, String newTableName, List<Tuple2<Integer, 
String>> newTableRecords)
+            throws SQLException {
+        String sql =
+                String.format(
+                        "INSERT INTO %s VALUES %s",
+                        newTableName,
+                        newTableRecords.stream()
+                                .map(tuple -> String.format("(%d, '%s')", 
tuple.f0, tuple.f1))
+                                .collect(Collectors.joining(", ")));
+        statement.executeUpdate(sql);
+    }
+
+    private String getNewTableName(int newTableCount) {
+        return "t_new_table_" + newTableCount;
+    }
+
+    private void createNewTable(Statement statement, String newTableName) 
throws SQLException {
+        statement.executeUpdate(
+                String.format(
+                        "CREATE TABLE %s (k INT, v1 VARCHAR(10), PRIMARY KEY 
(k))", newTableName));
+    }
+
+    private JobClient buildSyncDatabaseActionWithNewlyAddedTables() throws 
Exception {
+        return buildSyncDatabaseActionWithNewlyAddedTables(null);
+    }
+
+    private JobClient buildSyncDatabaseActionWithNewlyAddedTables(String 
savepointPath)
+            throws Exception {
+
+        Map<String, String> mySqlConfig = getBasicMySqlConfig();
+        mySqlConfig.put("database-name", 
"paimon_sync_database_newly_added_tables");
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(2);
+        env.enableCheckpointing(1000);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        Map<String, String> tableConfig = new HashMap<>();
+        tableConfig.put("bucket", String.valueOf(random.nextInt(3) + 1));
+        tableConfig.put("sink.parallelism", String.valueOf(random.nextInt(3) + 
1));
+        MySqlSyncDatabaseAction action =
+                new MySqlSyncDatabaseAction(
+                        mySqlConfig,
+                        warehouse,
+                        database,
+                        false,
+                        null,
+                        null,
+                        "t.+",
+                        null,
+                        Collections.emptyMap(),
+                        tableConfig);
+        action.build(env);
+
+        if (Objects.nonNull(savepointPath)) {
+            StreamGraph streamGraph = env.getStreamGraph();
+            JobGraph jobGraph = streamGraph.getJobGraph();
+            jobGraph.setSavepointRestoreSettings(
+                    SavepointRestoreSettings.forPath(savepointPath, true));
+            return env.executeAsync(streamGraph);
+        }
+        return env.executeAsync();
+    }
+
     private FileStoreTable getFileStoreTable(String tableName) throws 
Exception {
         Catalog catalog = 
CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
         Identifier identifier = Identifier.create(database, tableName);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql 
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
index 839c0ab79..351324bf9 100644
--- a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
+++ b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
@@ -456,3 +456,26 @@ CREATE TABLE T (
     UPPERCASE_V0 VARCHAR(20),
     PRIMARY KEY (k)
 );
+
+
+-- 
################################################################################
+--  MySqlSyncDatabaseActionITCase#testTableAffix
+-- 
################################################################################
+
+CREATE DATABASE paimon_sync_database_newly_added_tables;
+USE paimon_sync_database_newly_added_tables;
+
+
+CREATE TABLE t1 (
+    k INT,
+    v1 VARCHAR(10),
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE t2 (
+    k1 INT,
+    k2 VARCHAR(10),
+    v1 INT,
+    v2 BIGINT,
+    PRIMARY KEY (k1, k2)
+);

Reply via email to