This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 38eb10d77 [flink][mysql-cdc] Refactor TINYINT(1) conversion doc and
codes (#1815)
38eb10d77 is described below
commit 38eb10d773ea72eb4437682b395cb39395281113
Author: yuzelin <[email protected]>
AuthorDate: Tue Aug 15 21:27:49 2023 +0800
[flink][mysql-cdc] Refactor TINYINT(1) conversion doc and codes (#1815)
---
docs/content/how-to/cdc-ingestion.md | 6 ++
.../shortcodes/generated/mysql_sync_database.html | 2 +-
.../shortcodes/generated/mysql_sync_table.html | 2 +-
.../action/cdc/mysql/MySqlSyncDatabaseAction.java | 4 +-
.../action/cdc/mysql/MySqlTableSchemaBuilder.java | 8 +-
.../cdc/mysql/MySqlSyncTableActionITCase.java | 117 +++++----------------
.../src/test/resources/mysql/sync_table_setup.sql | 19 +---
7 files changed, 47 insertions(+), 111 deletions(-)
diff --git a/docs/content/how-to/cdc-ingestion.md
b/docs/content/how-to/cdc-ingestion.md
index 4b13d0d08..4daf9cd84 100644
--- a/docs/content/how-to/cdc-ingestion.md
+++ b/docs/content/how-to/cdc-ingestion.md
@@ -438,6 +438,12 @@ behaviors of `RENAME TABLE` and `DROP COLUMN` will be
ignored, `RENAME COLUMN` w
{{< generated/compute_column >}}
+## Special Data Type Conversions
+1. MySQL TINYINT(1) type will be converted to Boolean by default. If you want
to store number (-128~127) in it like MySQL,
+you can specify that `--mysql-conf mysql.converter.tinyint1-to-bool=false`,
then the column will be mapped to TINYINT in Paimon table.
+2. MySQL BIT(1) type will be converted to Boolean.
+3. When using Hive catalog, MySQL TIME type will be converted to STRING.
+
## FAQ
1. Chinese characters in records ingested from MySQL are garbled.
* Try to set `env.java.opts: -Dfile.encoding=UTF-8` in `flink-conf.yaml`
diff --git a/docs/layouts/shortcodes/generated/mysql_sync_database.html
b/docs/layouts/shortcodes/generated/mysql_sync_database.html
index fe6f3d5e3..0d07d824b 100644
--- a/docs/layouts/shortcodes/generated/mysql_sync_database.html
+++ b/docs/layouts/shortcodes/generated/mysql_sync_database.html
@@ -63,7 +63,7 @@ under the License.
</tr>
<tr>
<td><h5>--mysql-conf</h5></td>
- <td>The configuration for Flink CDC MySQL table sources. Each
configuration should be specified in the format "key=value". hostname,
username, password, database-name and table-name are required configurations,
others are optional. See its <a
href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options">document</a>
for a complete list of configurations. <br> Furthermore, TINYINT(1) type in
MySQL would be converted to Boolean b [...]
+ <td>The configuration for Flink CDC MySQL table sources. Each
configuration should be specified in the format "key=value". hostname,
username, password, database-name and table-name are required configurations,
others are optional. See its <a
href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options">document</a>
for a complete list of configurations.</td>
</tr>
<tr>
<td><h5>--catalog-conf</h5></td>
diff --git a/docs/layouts/shortcodes/generated/mysql_sync_table.html
b/docs/layouts/shortcodes/generated/mysql_sync_table.html
index b9b286f9f..15cb8da72 100644
--- a/docs/layouts/shortcodes/generated/mysql_sync_table.html
+++ b/docs/layouts/shortcodes/generated/mysql_sync_table.html
@@ -51,7 +51,7 @@ under the License.
</tr>
<tr>
<td><h5>--mysql-conf</h5></td>
- <td>The configuration for Flink CDC MySQL table sources. Each
configuration should be specified in the format "key=value". hostname,
username, password, database-name and table-name are required configurations,
others are optional. See its <a
href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options">document</a>
for a complete list of configurations. <br> Furthermore, TINYINT(1) type in
MySQL would be converted to Boolean b [...]
+ <td>The configuration for Flink CDC MySQL table sources. Each
configuration should be specified in the format "key=value". hostname,
username, password, database-name and table-name are required configurations,
others are optional. See its <a
href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options">document</a>
for a complete list of configurations.</td>
</tr>
<tr>
<td><h5>--catalog-conf</h5></td>
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index 3c62b0aa9..17bcd9173 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -235,11 +235,11 @@ public class MySqlSyncDatabaseAction extends ActionBase {
String serverTimeZone =
mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE);
ZoneId zoneId = serverTimeZone == null ? ZoneId.systemDefault() :
ZoneId.of(serverTimeZone);
+ Boolean convertTinyint1ToBool =
mySqlConfig.get(MYSQL_CONVERTER_TINYINT1_BOOL);
MySqlTableSchemaBuilder schemaBuilder =
- new MySqlTableSchemaBuilder(tableConfig, caseSensitive);
+ new MySqlTableSchemaBuilder(tableConfig, caseSensitive,
convertTinyint1ToBool);
Pattern includingPattern = this.includingPattern;
Pattern excludingPattern = this.excludingPattern;
- Boolean convertTinyint1ToBool =
mySqlConfig.get(MYSQL_CONVERTER_TINYINT1_BOOL);
EventParser.Factory<String> parserFactory =
() ->
new MySqlDebeziumJsonEventParser(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
index e7176c6fa..57fd97223 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
@@ -32,7 +32,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
-import static
org.apache.paimon.flink.action.cdc.mysql.MySqlActionUtils.MYSQL_CONVERTER_TINYINT1_BOOL;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Schema builder for MySQL cdc. */
@@ -40,10 +39,13 @@ public class MySqlTableSchemaBuilder implements
NewTableSchemaBuilder<JsonNode>
private final Map<String, String> tableConfig;
private final boolean caseSensitive;
+ private final boolean convertTinyint1ToBool;
- public MySqlTableSchemaBuilder(Map<String, String> tableConfig, boolean
caseSensitive) {
+ public MySqlTableSchemaBuilder(
+ Map<String, String> tableConfig, boolean caseSensitive, boolean
convertTinyint1ToBool) {
this.tableConfig = tableConfig;
this.caseSensitive = caseSensitive;
+ this.convertTinyint1ToBool = convertTinyint1ToBool;
}
@Override
@@ -63,7 +65,7 @@ public class MySqlTableSchemaBuilder implements
NewTableSchemaBuilder<JsonNode>
element.get("typeExpression").asText(),
precision,
scale,
-
MYSQL_CONVERTER_TINYINT1_BOOL.defaultValue())
+ convertTinyint1ToBool)
.copy(element.get("optional").asBoolean()));
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index c4b683775..f0b28982e 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -897,107 +897,46 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
JobClient client = env.executeAsync();
waitJobRunning(client);
+ checkTableSchema(
+ "[{\"id\":0,\"name\":\"pk\",\"type\":\"INT NOT
NULL\",\"description\":\"\"},"
+ +
"{\"id\":1,\"name\":\"_tinyint1\",\"type\":\"TINYINT\",\"description\":\"\"}]");
+
try (Statement statement = getStatement()) {
statement.execute("USE " + DATABASE_NAME);
- statement.executeUpdate(
- "INSERT INTO test_tinyint1_convert VALUES (1, '2021-09-15
15:00:10', 21)");
- statement.executeUpdate(
- "INSERT INTO test_tinyint1_convert VALUES (2, '2023-03-23
16:00:20', 42)");
+ statement.executeUpdate("INSERT INTO test_tinyint1_convert VALUES
(1, 21), (2, 42)");
FileStoreTable table = getFileStoreTable();
RowType rowType =
RowType.of(
- new DataType[] {
- DataTypes.INT().notNull(),
- DataTypes.TIMESTAMP(0),
- DataTypes.TINYINT()
- },
- new String[] {"pk", "_datetime", "_tinyint1"});
- List<String> expected =
- Arrays.asList(
- "+I[1, 2021-09-15T15:00:10, 21]", "+I[2,
2023-03-23T16:00:20, 42]");
+ new DataType[] {DataTypes.INT().notNull(),
DataTypes.TINYINT()},
+ new String[] {"pk", "_tinyint1"});
+ List<String> expected = Arrays.asList("+I[1, 21]", "+I[2, 42]");
waitForResult(expected, table, rowType,
Collections.singletonList("pk"));
- }
- }
-
- @Test
- @Timeout(60)
- public void testSchemaEvolutionWithTinyint1Convert() throws Exception {
- Map<String, String> mySqlConfig = getBasicMySqlConfig();
- mySqlConfig.put("database-name", "paimon_sync_table_tinyint");
- mySqlConfig.put("table-name", "schema_evolution_3");
- mySqlConfig.put("mysql.converter.tinyint1-to-bool", "false");
-
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(2);
- env.enableCheckpointing(1000);
- env.setRestartStrategy(RestartStrategies.noRestart());
- Map<String, String> tableConfig = getBasicTableConfig();
- MySqlSyncTableAction action =
- new MySqlSyncTableAction(
- mySqlConfig,
- warehouse,
- database,
- tableName,
- Collections.singletonList("pt"),
- Arrays.asList("pt", "_id"),
- Collections.singletonMap(
- CatalogOptions.METASTORE.key(),
"test-alter-table"),
- tableConfig);
- action.build(env);
- JobClient client = env.executeAsync();
- waitJobRunning(client);
-
- checkTableSchema(
- "[{\"id\":0,\"name\":\"pt\",\"type\":\"INT NOT
NULL\",\"description\":\"primary\"},{\"id\":1,\"name\":\"_id\",\"type\":\"INT
NOT
NULL\",\"description\":\"_id\"},{\"id\":2,\"name\":\"v1\",\"type\":\"VARCHAR(10)\",\"description\":\"v1\"}]");
+ // test schema evolution
+ statement.executeUpdate(
+ "ALTER TABLE test_tinyint1_convert ADD COLUMN
_new_tinyint1 TINYINT(1)");
+ statement.executeUpdate(
+ "INSERT INTO test_tinyint1_convert VALUES (3, 63, 1), (4,
127, -128)");
- try (Statement statement = getStatement()) {
- testSchemaEvolutionImplWithTinyIntConvert(statement);
+ rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(),
DataTypes.TINYINT(), DataTypes.TINYINT()
+ },
+ new String[] {"pk", "_tinyint1", "_new_tinyint1"});
+ waitForResult(
+ Arrays.asList(
+ "+I[1, 21, NULL]",
+ "+I[2, 42, NULL]",
+ "+I[3, 63, 1]",
+ "+I[4, 127, -128]"),
+ table,
+ rowType,
+ Collections.singletonList("pk"));
}
}
- private void testSchemaEvolutionImplWithTinyIntConvert(Statement
statement) throws Exception {
- FileStoreTable table = getFileStoreTable();
- statement.executeUpdate("USE paimon_sync_table_tinyint");
-
- statement.executeUpdate("INSERT INTO schema_evolution_3 VALUES (1, 1,
'one')");
- statement.executeUpdate(
- "INSERT INTO schema_evolution_3 VALUES (1, 2, 'two'), (2, 4,
'four')");
- RowType rowType =
- RowType.of(
- new DataType[] {
- DataTypes.INT().notNull(),
- DataTypes.INT().notNull(),
- DataTypes.VARCHAR(10)
- },
- new String[] {"pt", "_id", "v1"});
- List<String> primaryKeys = Arrays.asList("pt", "_id");
- List<String> expected = Arrays.asList("+I[1, 1, one]", "+I[1, 2,
two]", "+I[2, 4, four]");
- waitForResult(expected, table, rowType, primaryKeys);
-
- statement.executeUpdate("ALTER TABLE schema_evolution_3 ADD COLUMN v2
TINYINT(1)");
- statement.executeUpdate(
- "INSERT INTO schema_evolution_3 VALUES (2, 3, 'three', 30),
(1, 5, 'five', 50)");
- rowType =
- RowType.of(
- new DataType[] {
- DataTypes.INT().notNull(),
- DataTypes.INT().notNull(),
- DataTypes.VARCHAR(10),
- DataTypes.TINYINT()
- },
- new String[] {"pt", "_id", "v1", "v2"});
- expected =
- Arrays.asList(
- "+I[1, 1, one, NULL]",
- "+I[1, 2, two, NULL]",
- "+I[2, 3, three, 30]",
- "+I[2, 4, four, NULL]",
- "+I[1, 5, five, 50]");
- waitForResult(expected, table, rowType, primaryKeys);
- }
-
@Test
@Timeout(60)
public void testSyncShards() throws Exception {
diff --git
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
index 8421b6c67..4e8415ffc 100644
---
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
+++
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
@@ -280,27 +280,16 @@ CREATE TABLE test_computed_column (
PRIMARY KEY (pk)
);
+--
################################################################################
+-- testTinyInt1Convert
+--
################################################################################
+
CREATE TABLE test_tinyint1_convert (
pk INT,
- _datetime DATETIME,
_tinyint1 TINYINT(1),
PRIMARY KEY (pk)
);
---
################################################################################
--- testSchemaEvolutionWithTinyint1Convert
---
################################################################################
-
-CREATE DATABASE paimon_sync_table_tinyint;
-USE paimon_sync_table_tinyint;
-
-CREATE TABLE schema_evolution_3 (
- pt INT comment 'primary',
- _id INT comment '_id',
- v1 VARCHAR(10) comment 'v1',
- PRIMARY KEY (_id)
-);
-
--
################################################################################
-- testSyncShard
--
################################################################################