This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 38eb10d77 [flink][mysql-cdc] Refactor TINYINT(1) conversion doc and 
codes (#1815)
38eb10d77 is described below

commit 38eb10d773ea72eb4437682b395cb39395281113
Author: yuzelin <[email protected]>
AuthorDate: Tue Aug 15 21:27:49 2023 +0800

    [flink][mysql-cdc] Refactor TINYINT(1) conversion doc and codes (#1815)
---
 docs/content/how-to/cdc-ingestion.md               |   6 ++
 .../shortcodes/generated/mysql_sync_database.html  |   2 +-
 .../shortcodes/generated/mysql_sync_table.html     |   2 +-
 .../action/cdc/mysql/MySqlSyncDatabaseAction.java  |   4 +-
 .../action/cdc/mysql/MySqlTableSchemaBuilder.java  |   8 +-
 .../cdc/mysql/MySqlSyncTableActionITCase.java      | 117 +++++----------------
 .../src/test/resources/mysql/sync_table_setup.sql  |  19 +---
 7 files changed, 47 insertions(+), 111 deletions(-)

diff --git a/docs/content/how-to/cdc-ingestion.md 
b/docs/content/how-to/cdc-ingestion.md
index 4b13d0d08..4daf9cd84 100644
--- a/docs/content/how-to/cdc-ingestion.md
+++ b/docs/content/how-to/cdc-ingestion.md
@@ -438,6 +438,12 @@ behaviors of `RENAME TABLE` and `DROP COLUMN` will be 
ignored, `RENAME COLUMN` w
 
 {{< generated/compute_column >}}
 
+## Special Data Type Conversions
+1. MySQL TINYINT(1) type will be converted to Boolean by default. If you want 
to store number (-128~127) in it like MySQL, 
+you can specify that `--mysql-conf mysql.converter.tinyint1-to-bool=false`, 
then the column will be mapped to TINYINT in Paimon table.
+2. MySQL BIT(1) type will be converted to Boolean.
+3. When using Hive catalog, MySQL TIME type will be converted to STRING.
+
 ## FAQ
 1. Chinese characters in records ingested from MySQL are garbled.
 * Try to set `env.java.opts: -Dfile.encoding=UTF-8` in `flink-conf.yaml`
diff --git a/docs/layouts/shortcodes/generated/mysql_sync_database.html 
b/docs/layouts/shortcodes/generated/mysql_sync_database.html
index fe6f3d5e3..0d07d824b 100644
--- a/docs/layouts/shortcodes/generated/mysql_sync_database.html
+++ b/docs/layouts/shortcodes/generated/mysql_sync_database.html
@@ -63,7 +63,7 @@ under the License.
     </tr>
     <tr>
         <td><h5>--mysql-conf</h5></td>
-        <td>The configuration for Flink CDC MySQL table sources. Each 
configuration should be specified in the format "key=value". hostname, 
username, password, database-name and table-name are required configurations, 
others are optional. See its <a 
href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options";>document</a>
 for a complete list of configurations. <br> Furthermore, TINYINT(1) type in 
MySQL would be converted to Boolean b [...]
+        <td>The configuration for Flink CDC MySQL table sources. Each 
configuration should be specified in the format "key=value". hostname, 
username, password, database-name and table-name are required configurations, 
others are optional. See its <a 
href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options";>document</a>
 for a complete list of configurations.</td>
     </tr>
     <tr>
         <td><h5>--catalog-conf</h5></td>
diff --git a/docs/layouts/shortcodes/generated/mysql_sync_table.html 
b/docs/layouts/shortcodes/generated/mysql_sync_table.html
index b9b286f9f..15cb8da72 100644
--- a/docs/layouts/shortcodes/generated/mysql_sync_table.html
+++ b/docs/layouts/shortcodes/generated/mysql_sync_table.html
@@ -51,7 +51,7 @@ under the License.
     </tr>
     <tr>
         <td><h5>--mysql-conf</h5></td>
-        <td>The configuration for Flink CDC MySQL table sources. Each 
configuration should be specified in the format "key=value". hostname, 
username, password, database-name and table-name are required configurations, 
others are optional. See its <a 
href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options";>document</a>
 for a complete list of configurations. <br> Furthermore, TINYINT(1) type in 
MySQL would be converted to Boolean b [...]
+        <td>The configuration for Flink CDC MySQL table sources. Each 
configuration should be specified in the format "key=value". hostname, 
username, password, database-name and table-name are required configurations, 
others are optional. See its <a 
href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options";>document</a>
 for a complete list of configurations.</td>
     </tr>
     <tr>
         <td><h5>--catalog-conf</h5></td>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index 3c62b0aa9..17bcd9173 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -235,11 +235,11 @@ public class MySqlSyncDatabaseAction extends ActionBase {
 
         String serverTimeZone = 
mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE);
         ZoneId zoneId = serverTimeZone == null ? ZoneId.systemDefault() : 
ZoneId.of(serverTimeZone);
+        Boolean convertTinyint1ToBool = 
mySqlConfig.get(MYSQL_CONVERTER_TINYINT1_BOOL);
         MySqlTableSchemaBuilder schemaBuilder =
-                new MySqlTableSchemaBuilder(tableConfig, caseSensitive);
+                new MySqlTableSchemaBuilder(tableConfig, caseSensitive, 
convertTinyint1ToBool);
         Pattern includingPattern = this.includingPattern;
         Pattern excludingPattern = this.excludingPattern;
-        Boolean convertTinyint1ToBool = 
mySqlConfig.get(MYSQL_CONVERTER_TINYINT1_BOOL);
         EventParser.Factory<String> parserFactory =
                 () ->
                         new MySqlDebeziumJsonEventParser(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
index e7176c6fa..57fd97223 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
@@ -32,7 +32,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
-import static 
org.apache.paimon.flink.action.cdc.mysql.MySqlActionUtils.MYSQL_CONVERTER_TINYINT1_BOOL;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Schema builder for MySQL cdc. */
@@ -40,10 +39,13 @@ public class MySqlTableSchemaBuilder implements 
NewTableSchemaBuilder<JsonNode>
 
     private final Map<String, String> tableConfig;
     private final boolean caseSensitive;
+    private final boolean convertTinyint1ToBool;
 
-    public MySqlTableSchemaBuilder(Map<String, String> tableConfig, boolean 
caseSensitive) {
+    public MySqlTableSchemaBuilder(
+            Map<String, String> tableConfig, boolean caseSensitive, boolean 
convertTinyint1ToBool) {
         this.tableConfig = tableConfig;
         this.caseSensitive = caseSensitive;
+        this.convertTinyint1ToBool = convertTinyint1ToBool;
     }
 
     @Override
@@ -63,7 +65,7 @@ public class MySqlTableSchemaBuilder implements 
NewTableSchemaBuilder<JsonNode>
                                     element.get("typeExpression").asText(),
                                     precision,
                                     scale,
-                                    
MYSQL_CONVERTER_TINYINT1_BOOL.defaultValue())
+                                    convertTinyint1ToBool)
                             .copy(element.get("optional").asBoolean()));
         }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index c4b683775..f0b28982e 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -897,107 +897,46 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
         JobClient client = env.executeAsync();
         waitJobRunning(client);
 
+        checkTableSchema(
+                "[{\"id\":0,\"name\":\"pk\",\"type\":\"INT NOT 
NULL\",\"description\":\"\"},"
+                        + 
"{\"id\":1,\"name\":\"_tinyint1\",\"type\":\"TINYINT\",\"description\":\"\"}]");
+
         try (Statement statement = getStatement()) {
             statement.execute("USE " + DATABASE_NAME);
-            statement.executeUpdate(
-                    "INSERT INTO test_tinyint1_convert VALUES (1, '2021-09-15 
15:00:10', 21)");
-            statement.executeUpdate(
-                    "INSERT INTO test_tinyint1_convert VALUES (2, '2023-03-23 
16:00:20', 42)");
+            statement.executeUpdate("INSERT INTO test_tinyint1_convert VALUES 
(1, 21), (2, 42)");
 
             FileStoreTable table = getFileStoreTable();
             RowType rowType =
                     RowType.of(
-                            new DataType[] {
-                                DataTypes.INT().notNull(),
-                                DataTypes.TIMESTAMP(0),
-                                DataTypes.TINYINT()
-                            },
-                            new String[] {"pk", "_datetime", "_tinyint1"});
-            List<String> expected =
-                    Arrays.asList(
-                            "+I[1, 2021-09-15T15:00:10, 21]", "+I[2, 
2023-03-23T16:00:20, 42]");
+                            new DataType[] {DataTypes.INT().notNull(), 
DataTypes.TINYINT()},
+                            new String[] {"pk", "_tinyint1"});
+            List<String> expected = Arrays.asList("+I[1, 21]", "+I[2, 42]");
             waitForResult(expected, table, rowType, 
Collections.singletonList("pk"));
-        }
-    }
-
-    @Test
-    @Timeout(60)
-    public void testSchemaEvolutionWithTinyint1Convert() throws Exception {
-        Map<String, String> mySqlConfig = getBasicMySqlConfig();
-        mySqlConfig.put("database-name", "paimon_sync_table_tinyint");
-        mySqlConfig.put("table-name", "schema_evolution_3");
-        mySqlConfig.put("mysql.converter.tinyint1-to-bool", "false");
-
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(2);
-        env.enableCheckpointing(1000);
-        env.setRestartStrategy(RestartStrategies.noRestart());
 
-        Map<String, String> tableConfig = getBasicTableConfig();
-        MySqlSyncTableAction action =
-                new MySqlSyncTableAction(
-                        mySqlConfig,
-                        warehouse,
-                        database,
-                        tableName,
-                        Collections.singletonList("pt"),
-                        Arrays.asList("pt", "_id"),
-                        Collections.singletonMap(
-                                CatalogOptions.METASTORE.key(), 
"test-alter-table"),
-                        tableConfig);
-        action.build(env);
-        JobClient client = env.executeAsync();
-        waitJobRunning(client);
-
-        checkTableSchema(
-                "[{\"id\":0,\"name\":\"pt\",\"type\":\"INT NOT 
NULL\",\"description\":\"primary\"},{\"id\":1,\"name\":\"_id\",\"type\":\"INT 
NOT 
NULL\",\"description\":\"_id\"},{\"id\":2,\"name\":\"v1\",\"type\":\"VARCHAR(10)\",\"description\":\"v1\"}]");
+            // test schema evolution
+            statement.executeUpdate(
+                    "ALTER TABLE test_tinyint1_convert ADD COLUMN 
_new_tinyint1 TINYINT(1)");
+            statement.executeUpdate(
+                    "INSERT INTO test_tinyint1_convert VALUES (3, 63, 1), (4, 
127, -128)");
 
-        try (Statement statement = getStatement()) {
-            testSchemaEvolutionImplWithTinyIntConvert(statement);
+            rowType =
+                    RowType.of(
+                            new DataType[] {
+                                DataTypes.INT().notNull(), 
DataTypes.TINYINT(), DataTypes.TINYINT()
+                            },
+                            new String[] {"pk", "_tinyint1", "_new_tinyint1"});
+            waitForResult(
+                    Arrays.asList(
+                            "+I[1, 21, NULL]",
+                            "+I[2, 42, NULL]",
+                            "+I[3, 63, 1]",
+                            "+I[4, 127, -128]"),
+                    table,
+                    rowType,
+                    Collections.singletonList("pk"));
         }
     }
 
-    private void testSchemaEvolutionImplWithTinyIntConvert(Statement 
statement) throws Exception {
-        FileStoreTable table = getFileStoreTable();
-        statement.executeUpdate("USE paimon_sync_table_tinyint");
-
-        statement.executeUpdate("INSERT INTO schema_evolution_3 VALUES (1, 1, 
'one')");
-        statement.executeUpdate(
-                "INSERT INTO schema_evolution_3 VALUES (1, 2, 'two'), (2, 4, 
'four')");
-        RowType rowType =
-                RowType.of(
-                        new DataType[] {
-                            DataTypes.INT().notNull(),
-                            DataTypes.INT().notNull(),
-                            DataTypes.VARCHAR(10)
-                        },
-                        new String[] {"pt", "_id", "v1"});
-        List<String> primaryKeys = Arrays.asList("pt", "_id");
-        List<String> expected = Arrays.asList("+I[1, 1, one]", "+I[1, 2, 
two]", "+I[2, 4, four]");
-        waitForResult(expected, table, rowType, primaryKeys);
-
-        statement.executeUpdate("ALTER TABLE schema_evolution_3 ADD COLUMN v2 
TINYINT(1)");
-        statement.executeUpdate(
-                "INSERT INTO schema_evolution_3 VALUES (2, 3, 'three', 30), 
(1, 5, 'five', 50)");
-        rowType =
-                RowType.of(
-                        new DataType[] {
-                            DataTypes.INT().notNull(),
-                            DataTypes.INT().notNull(),
-                            DataTypes.VARCHAR(10),
-                            DataTypes.TINYINT()
-                        },
-                        new String[] {"pt", "_id", "v1", "v2"});
-        expected =
-                Arrays.asList(
-                        "+I[1, 1, one, NULL]",
-                        "+I[1, 2, two, NULL]",
-                        "+I[2, 3, three, 30]",
-                        "+I[2, 4, four, NULL]",
-                        "+I[1, 5, five, 50]");
-        waitForResult(expected, table, rowType, primaryKeys);
-    }
-
     @Test
     @Timeout(60)
     public void testSyncShards() throws Exception {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
 
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
index 8421b6c67..4e8415ffc 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
+++ 
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
@@ -280,27 +280,16 @@ CREATE TABLE test_computed_column (
     PRIMARY KEY (pk)
 );
 
+-- 
################################################################################
+--  testTinyInt1Convert
+-- 
################################################################################
+
 CREATE TABLE test_tinyint1_convert (
     pk INT,
-    _datetime DATETIME,
     _tinyint1 TINYINT(1),
     PRIMARY KEY (pk)
 );
 
--- 
################################################################################
---  testSchemaEvolutionWithTinyint1Convert
--- 
################################################################################
-
-CREATE DATABASE paimon_sync_table_tinyint;
-USE paimon_sync_table_tinyint;
-
-CREATE TABLE schema_evolution_3 (
-    pt INT comment  'primary',
-    _id INT comment  '_id',
-    v1 VARCHAR(10) comment  'v1',
-    PRIMARY KEY (_id)
-);
-
 -- 
################################################################################
 --  testSyncShard
 -- 
################################################################################

Reply via email to