This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f8174a62 [flink] add time type for mysql cdc action (#1011)
0f8174a62 is described below

commit 0f8174a6243c26e8dd0b80d6cfe52a448a7af5ac
Author: JunZhang <[email protected]>
AuthorDate: Tue Apr 25 11:27:08 2023 +0800

    [flink] add time type for mysql cdc action (#1011)
---
 .../action/cdc/mysql/MySqlDebeziumJsonEventParser.java   | 13 +++++++++++++
 .../action/cdc/mysql/MySqlSyncTableActionITCase.java     | 16 ++++++++++++----
 .../src/test/resources/mysql/setup.sql                   |  6 +++++-
 3 files changed, 30 insertions(+), 5 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index 177258d95..2d6bfa72e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -277,6 +277,19 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
                                     .toLocalDateTime()
                                     .toString()
                                     .replace('T', ' ');
+                } else if ("io.debezium.time.MicroTime".equals(className)) {
+                    long microseconds = Long.parseLong(oldValue);
+                    long microsecondsPerSecond = 1_000_000;
+                    long nanosecondsPerMicros = 1_000;
+                    long seconds = microseconds / microsecondsPerSecond;
+                    long nanoAdjustment =
+                            (microseconds % microsecondsPerSecond) * 
nanosecondsPerMicros;
+
+                    newValue =
+                            Instant.ofEpochSecond(seconds, nanoAdjustment)
+                                    .atZone(ZoneOffset.UTC)
+                                    .toLocalTime()
+                                    .toString();
                 }
 
                 recordMap.put(fieldName, newValue);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 756db2cee..f4f1b1105 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -353,6 +353,10 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
         mySqlConfig.put("database-name", DATABASE_NAME);
         mySqlConfig.put("table-name", "all_types_table");
 
+        // orc format do not support time type, so we use parquet format.
+        Map<String, String> paimonConfig = getBasicMySqlConfig();
+        paimonConfig.put("file.format", "parquet");
+
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(1);
         env.enableCheckpointing(1000);
@@ -367,7 +371,7 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                         Collections.singletonList("pt"),
                         Arrays.asList("pt", "_id"),
                         Collections.emptyMap(),
-                        Collections.emptyMap());
+                        paimonConfig);
         action.build(env);
         JobClient jobClient = env.executeAsync();
 
@@ -437,7 +441,8 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                             DataTypes.BYTES(), // _longblob
                             DataTypes.STRING(), // _json
                             DataTypes.STRING(), // _enum
-                            DataTypes.INT() // _year
+                            DataTypes.INT(), // _year
+                            DataTypes.TIME() // _time
                         },
                         new String[] {
                             "_id",
@@ -503,7 +508,8 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                             "_longblob",
                             "_json",
                             "_enum",
-                            "_year"
+                            "_year",
+                            "_time"
                         });
         FileStoreTable table = getFileStoreTable();
         List<String> expected =
@@ -540,7 +546,8 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                                 + "[76, 79, 78, 71, 66, 76, 79, 66, 32, 32, 
98, 121, 116, 101, 115, 32, 116, 101, 115, 116, 32, 100, 97, 116, 97], "
                                 + "{\"a\": \"b\"}, "
                                 + "value1, "
-                                + "2023"
+                                + "2023, "
+                                + "36803000"
                                 + "]",
                         "+I["
                                 + "2, 2.2, "
@@ -564,6 +571,7 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                                 + "NULL, NULL, NULL, NULL, NULL, NULL, "
                                 + "NULL, "
                                 + "NULL, "
+                                + "NULL, "
                                 + "NULL"
                                 + "]");
         waitForResult(expected, table, rowType, Arrays.asList("pt", "_id"));
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql 
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
index 92fcfd4f2..8812e7f38 100644
--- a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
+++ b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
@@ -135,6 +135,7 @@ CREATE TABLE all_types_table (
     _enum ENUM ('value1','value2','value3'),
     -- YEAR
     _year YEAR,
+    _time TIME,
     PRIMARY KEY (_id)
 );
 
@@ -182,7 +183,9 @@ INSERT INTO all_types_table VALUES (
     -- enum
     'value1',
      -- YEAR
-     2023
+     2023,
+     -- TIME,
+     '10:13:23'
 ), (
     2, 2.2,
     NULL, NULL, NULL, NULL, NULL, NULL,
@@ -205,6 +208,7 @@ INSERT INTO all_types_table VALUES (
     NULL, NULL, NULL, NULL,NULL, NULL,
     NULL,
     NULL,
+    NULL,
     NULL
 );
 

Reply via email to