This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new bd31bfbd1 [flink][mysql-cdc] Add synchronizing shards test for 
MySqlSyncTableAction and do minor code refactor (#1694)
bd31bfbd1 is described below

commit bd31bfbd1c940a6f3c0c80404f1ce5c4e9273f72
Author: yuzelin <[email protected]>
AuthorDate: Tue Aug 1 18:23:26 2023 +0800

    [flink][mysql-cdc] Add synchronizing shards test for MySqlSyncTableAction 
and do minor code refactor (#1694)
---
 docs/content/how-to/cdc-ingestion.md               | 42 ++++++++++++---
 .../paimon/flink/action/cdc/ComputedColumn.java    |  1 -
 .../flink/action/cdc/{mysql => }/Expression.java   | 12 ++---
 .../flink/action/cdc/kafka/KafkaActionUtils.java   |  2 +-
 .../flink/action/cdc/mysql/MySqlActionUtils.java   |  1 +
 .../action/cdc/mysql/MySqlSyncDatabaseAction.java  |  2 +-
 .../cdc/{mysql => }/TruncateComputerTest.java      |  2 +-
 .../cdc/mysql/MySqlSyncTableActionITCase.java      | 60 +++++++++++++++++++++-
 .../src/test/resources/mysql/sync_table_setup.sql  | 26 ++++++++++
 9 files changed, 130 insertions(+), 18 deletions(-)

diff --git a/docs/content/how-to/cdc-ingestion.md 
b/docs/content/how-to/cdc-ingestion.md
index b16e0e7b6..a071156ff 100644
--- a/docs/content/how-to/cdc-ingestion.md
+++ b/docs/content/how-to/cdc-ingestion.md
@@ -72,7 +72,7 @@ To use this feature through `flink run`, run the following 
shell command.
 
 If the Paimon table you specify does not exist, this action will automatically 
create the table. Its schema will be derived from all specified MySQL tables. 
If the Paimon table already exists, its schema will be compared against the 
schema of all specified MySQL tables.
 
-Example
+Example 1: synchronize tables into one Paimon table
 
 ```bash
 <FLINK_HOME>/bin/flink run \
@@ -83,12 +83,12 @@ Example
     --table test_table \
     --partition-keys pt \
     --primary-keys pt,uid \
-    --computed-columns '_year=year(age)' \
+    --computed-column '_year=year(age)' \
     --mysql-conf hostname=127.0.0.1 \
     --mysql-conf username=root \
     --mysql-conf password=123456 \
-    --mysql-conf database-name=source_db \
-    --mysql-conf table-name='source_table' \
+    --mysql-conf database-name='source_db' \
+    --mysql-conf table-name='source_table1|source_table2' \
     --catalog-conf metastore=hive \
     --catalog-conf uri=thrift://hive-metastore:9083 \
     --table-conf bucket=4 \
@@ -96,8 +96,36 @@ Example
     --table-conf sink.parallelism=4
 ```
 
-The mysql-conf table-name also supports regular expressions to monitor 
multiple tables that satisfy
-the regular expressions.
+As example shows, the mysql-conf's table-name supports regular expressions to 
monitor multiple tables that satisfy
+the regular expressions. The schemas of all the tables will be merged into one 
Paimon table schema.
+
+Example 2: synchronize shards into one Paimon table
+
+You can also set 'database-name' with a regular expression to capture multiple 
databases. A typical scenario is that a 
+table 'source_table' is split into database 'source_db1', 'source_db2' ..., 
then you can synchronize data of all the 
+'source_table's into one Paimon table.
+
+```bash
+<FLINK_HOME>/bin/flink run \
+    /path/to/paimon-flink-action-{{< version >}}.jar \
+    mysql-sync-table \
+    --warehouse hdfs:///path/to/warehouse \
+    --database test_db \
+    --table test_table \
+    --partition-keys pt \
+    --primary-keys pt,uid \
+    --computed-column '_year=year(age)' \
+    --mysql-conf hostname=127.0.0.1 \
+    --mysql-conf username=root \
+    --mysql-conf password=123456 \
+    --mysql-conf database-name='source_db.+' \
+    --mysql-conf table-name='source_table' \
+    --catalog-conf metastore=hive \
+    --catalog-conf uri=thrift://hive-metastore:9083 \
+    --table-conf bucket=4 \
+    --table-conf changelog-producer=input \
+    --table-conf sink.parallelism=4
+```
 
 ### Synchronizing Databases
 
@@ -301,7 +329,7 @@ Example
     --table test_table \
     --partition-keys pt \
     --primary-keys pt,uid \
-    --computed-columns '_year=year(age)' \
+    --computed-column '_year=year(age)' \
     --kafka-conf properties.bootstrap.servers=127.0.0.1:9020 \
     --kafka-conf topic=order \
     --kafka-conf properties.group.id=123456 \
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java
index df8142120..3b262fafd 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.flink.action.cdc;
 
-import org.apache.paimon.flink.action.cdc.mysql.Expression;
 import org.apache.paimon.types.DataType;
 
 import javax.annotation.Nullable;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/Expression.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
similarity index 98%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/Expression.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
index 6566958c3..e38270524 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/Expression.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.action.cdc.mysql;
+package org.apache.paimon.flink.action.cdc;
 
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
@@ -288,9 +288,9 @@ public interface Expression extends Serializable {
 
         private final String fieldReference;
 
-        private DataType fieldType;
+        private final DataType fieldType;
 
-        private int width;
+        private final int width;
 
         TruncateComputer(String fieldReference, DataType fieldType, String 
literal) {
             this.fieldReference = fieldReference;
@@ -320,11 +320,11 @@ public interface Expression extends Serializable {
             switch (fieldType.getTypeRoot()) {
                 case TINYINT:
                 case SMALLINT:
-                    return String.valueOf(truncateShort(width, 
Short.valueOf(input)));
+                    return String.valueOf(truncateShort(width, 
Short.parseShort(input)));
                 case INTEGER:
-                    return String.valueOf(truncateInt(width, 
Integer.valueOf(input)));
+                    return String.valueOf(truncateInt(width, 
Integer.parseInt(input)));
                 case BIGINT:
-                    return String.valueOf(truncateLong(width, 
Long.valueOf(input)));
+                    return String.valueOf(truncateLong(width, 
Long.parseLong(input)));
                 case DECIMAL:
                     return truncateDecimal(BigInteger.valueOf(width), new 
BigDecimal(input))
                             .toString();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
index 148e3ced2..93330eb4e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
@@ -19,7 +19,7 @@
 package org.apache.paimon.flink.action.cdc.kafka;
 
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
-import org.apache.paimon.flink.action.cdc.mysql.Expression;
+import org.apache.paimon.flink.action.cdc.Expression;
 import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.TableSchema;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
index b2a89f03d..fdd99975d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.action.cdc.mysql;
 
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
+import org.apache.paimon.flink.action.cdc.Expression;
 import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.TableSchema;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index 048d9deb2..cf948e091 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -364,7 +364,7 @@ public class MySqlSyncDatabaseAction extends ActionBase {
             // tables, so we should use regular expression to monitor all 
valid tables and exclude
             // certain invalid tables
 
-            // The table list is build by template:
+            // The table list is built by template:
             // 
(?!(^db\\.tbl$)|(^...$))(databasePattern\\.(including_pattern1|...))
 
             // The excluding pattern ?!(^db\\.tbl$)|(^...$) can exclude tables 
whose qualified name
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/TruncateComputerTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/TruncateComputerTest.java
similarity index 99%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/TruncateComputerTest.java
rename to 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/TruncateComputerTest.java
index c36fb8687..5f84c07ee 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/TruncateComputerTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/TruncateComputerTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.action.cdc.mysql;
+package org.apache.paimon.flink.action.cdc;
 
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.BooleanType;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 2ca19d522..9d77ca319 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -945,7 +945,7 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
             List<String> expected =
                     Arrays.asList(
                             "+I[1, 2021-09-15T15:00:10, 21]", "+I[2, 
2023-03-23T16:00:20, 42]");
-            waitForResult(expected, table, rowType, Arrays.asList("pk"));
+            waitForResult(expected, table, rowType, 
Collections.singletonList("pk"));
         }
     }
 
@@ -1033,6 +1033,64 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
         waitForResult(expected, table, rowType, primaryKeys);
     }
 
+    @Test
+    public void testSyncShards() throws Exception {
+        Map<String, String> mySqlConfig = getBasicMySqlConfig();
+        mySqlConfig.put("database-name", "shard_.+");
+        mySqlConfig.put("table-name", "t.+");
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(2);
+        env.enableCheckpointing(1000);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+
+        MySqlSyncTableAction action =
+                new MySqlSyncTableAction(
+                        mySqlConfig,
+                        warehouse,
+                        database,
+                        tableName,
+                        Collections.singletonList("pt"),
+                        Arrays.asList("pk", "pt"),
+                        Collections.singletonList("pt=substring(_date,5)"),
+                        Collections.emptyMap(),
+                        Collections.emptyMap());
+        action.build(env);
+        JobClient client = env.executeAsync();
+        waitJobRunning(client);
+
+        try (Connection conn =
+                        DriverManager.getConnection(
+                                MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME),
+                                MYSQL_CONTAINER.getUsername(),
+                                MYSQL_CONTAINER.getPassword());
+                Statement statement = conn.createStatement()) {
+            statement.execute("USE shard_1");
+            statement.executeUpdate("INSERT INTO t1 VALUES (1, '2023-07-30'), 
(2, '2023-07-30')");
+            statement.execute("USE shard_2");
+            statement.executeUpdate("INSERT INTO t1 VALUES (3, '2023-07-31'), 
(4, '2023-07-31')");
+        }
+
+        FileStoreTable table = getFileStoreTable();
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(),
+                            DataTypes.VARCHAR(10),
+                            DataTypes.STRING().notNull()
+                        },
+                        new String[] {"pk", "_date", "pt"});
+        waitForResult(
+                Arrays.asList(
+                        "+I[1, 2023-07-30, 07-30]",
+                        "+I[2, 2023-07-30, 07-30]",
+                        "+I[3, 2023-07-31, 07-31]",
+                        "+I[4, 2023-07-31, 07-31]"),
+                table,
+                rowType,
+                Arrays.asList("pk", "pt"));
+    }
+
     private FileStoreTable getFileStoreTable() throws Exception {
         return getFileStoreTable(tableName);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
 
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
index eb512695f..608e324ed 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
+++ 
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
@@ -281,6 +281,10 @@ CREATE TABLE test_tinyint1_convert (
     PRIMARY KEY (pk)
 );
 
+-- 
################################################################################
+--  testSchemaEvolutionWithTinyint1Convert
+-- 
################################################################################
+
 CREATE DATABASE paimon_sync_table_tinyint;
 USE paimon_sync_table_tinyint;
 
@@ -290,3 +294,25 @@ CREATE TABLE schema_evolution_3 (
     v1 VARCHAR(10) comment  'v1',
     PRIMARY KEY (_id)
 );
+
+-- 
################################################################################
+--  testSyncShard
+-- 
################################################################################
+
+CREATE DATABASE shard_1;
+USE shard_1;
+
+CREATE TABLE t1 (
+    pk INT,
+    _date VARCHAR(10),
+    PRIMARY KEY (pk)
+);
+
+CREATE DATABASE shard_2;
+USE shard_2;
+
+CREATE TABLE t1 (
+    pk INT,
+    _date VARCHAR(10),
+    PRIMARY KEY (pk)
+);

Reply via email to