This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/release-0.4 by this push:
new 4cd194337 [flink] add time type for mysql cdc action (#1011)
4cd194337 is described below
commit 4cd19433774a09c8a4eb2ea491eb999afbaa4a8f
Author: JunZhang <[email protected]>
AuthorDate: Tue Apr 25 11:27:08 2023 +0800
[flink] add time type for mysql cdc action (#1011)
---
.../action/cdc/mysql/MySqlDebeziumJsonEventParser.java | 13 +++++++++++++
.../action/cdc/mysql/MySqlSyncTableActionITCase.java | 16 ++++++++++++----
.../src/test/resources/mysql/setup.sql | 6 +++++-
3 files changed, 30 insertions(+), 5 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index 177258d95..2d6bfa72e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -277,6 +277,19 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
.toLocalDateTime()
.toString()
.replace('T', ' ');
+ } else if ("io.debezium.time.MicroTime".equals(className)) {
+ long microseconds = Long.parseLong(oldValue);
+ long microsecondsPerSecond = 1_000_000;
+ long nanosecondsPerMicros = 1_000;
+ long seconds = microseconds / microsecondsPerSecond;
+ long nanoAdjustment =
+ (microseconds % microsecondsPerSecond) *
nanosecondsPerMicros;
+
+ newValue =
+ Instant.ofEpochSecond(seconds, nanoAdjustment)
+ .atZone(ZoneOffset.UTC)
+ .toLocalTime()
+ .toString();
}
recordMap.put(fieldName, newValue);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 756db2cee..f4f1b1105 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -353,6 +353,10 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
mySqlConfig.put("database-name", DATABASE_NAME);
mySqlConfig.put("table-name", "all_types_table");
+ // orc format do not support time type, so we use parquet format.
+ Map<String, String> paimonConfig = getBasicMySqlConfig();
+ paimonConfig.put("file.format", "parquet");
+
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(1000);
@@ -367,7 +371,7 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
Collections.singletonList("pt"),
Arrays.asList("pt", "_id"),
Collections.emptyMap(),
- Collections.emptyMap());
+ paimonConfig);
action.build(env);
JobClient jobClient = env.executeAsync();
@@ -437,7 +441,8 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
DataTypes.BYTES(), // _longblob
DataTypes.STRING(), // _json
DataTypes.STRING(), // _enum
- DataTypes.INT() // _year
+ DataTypes.INT(), // _year
+ DataTypes.TIME() // _time
},
new String[] {
"_id",
@@ -503,7 +508,8 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
"_longblob",
"_json",
"_enum",
- "_year"
+ "_year",
+ "_time"
});
FileStoreTable table = getFileStoreTable();
List<String> expected =
@@ -540,7 +546,8 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
+ "[76, 79, 78, 71, 66, 76, 79, 66, 32, 32,
98, 121, 116, 101, 115, 32, 116, 101, 115, 116, 32, 100, 97, 116, 97], "
+ "{\"a\": \"b\"}, "
+ "value1, "
- + "2023"
+ + "2023, "
+ + "36803000"
+ "]",
"+I["
+ "2, 2.2, "
@@ -564,6 +571,7 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
+ "NULL, NULL, NULL, NULL, NULL, NULL, "
+ "NULL, "
+ "NULL, "
+ + "NULL, "
+ "NULL"
+ "]");
waitForResult(expected, table, rowType, Arrays.asList("pt", "_id"));
diff --git
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
index 92fcfd4f2..8812e7f38 100644
--- a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
+++ b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
@@ -135,6 +135,7 @@ CREATE TABLE all_types_table (
_enum ENUM ('value1','value2','value3'),
-- YEAR
_year YEAR,
+ _time TIME,
PRIMARY KEY (_id)
);
@@ -182,7 +183,9 @@ INSERT INTO all_types_table VALUES (
-- enum
'value1',
-- YEAR
- 2023
+ 2023,
+ -- TIME,
+ '10:13:23'
), (
2, 2.2,
NULL, NULL, NULL, NULL, NULL, NULL,
@@ -205,6 +208,7 @@ INSERT INTO all_types_table VALUES (
NULL, NULL, NULL, NULL,NULL, NULL,
NULL,
NULL,
+ NULL,
NULL
);