This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d632ed16 [flink][bug] Fix CDC Ingestion mysql DATETIME type cast 
error (#909)
5d632ed16 is described below

commit 5d632ed164bf35c4aa693c11ca98020a33fb3866
Author: yuzelin <[email protected]>
AuthorDate: Mon Apr 17 09:32:58 2023 +0800

    [flink][bug] Fix CDC Ingestion mysql DATETIME type cast error (#909)
---
 .../org/apache/paimon/utils/DateTimeUtils.java     | 72 ++++++++++++++++++++++
 .../cdc/mysql/MySqlDebeziumJsonEventParser.java    | 29 ++++++---
 .../cdc/mysql/MySqlSyncTableActionITCase.java      | 16 ++++-
 .../src/test/resources/mysql/setup.sql             | 49 ++++++++++++++-
 4 files changed, 157 insertions(+), 9 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/DateTimeUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/DateTimeUtils.java
index 6ed26cafd..28d88dc67 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/DateTimeUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/DateTimeUtils.java
@@ -121,6 +121,40 @@ public class DateTimeUtils {
                 + time.getNano() / 1000_000;
     }
 
+    /**
+     * Format a {@link LocalDateTime} to yyyy-MM-dd HH:mm:ss[.nano] string.
+     *
+     * @param precision how many digits of nanoseconds to be retained
+     */
+    public static String formatLocalDateTime(LocalDateTime localDateTime, int 
precision) {
+        // nanosecond is range in 0 ~ 999_999_999
+        Preconditions.checkArgument(
+                precision >= 0 && precision <= 9, "precision should be in 
range 0 ~ 9.");
+        // format year to second part
+        StringBuilder ymdhms =
+                ymdhms(
+                        new StringBuilder(),
+                        localDateTime.getYear(),
+                        localDateTime.getMonthValue(),
+                        localDateTime.getDayOfMonth(),
+                        localDateTime.getHour(),
+                        localDateTime.getMinute(),
+                        localDateTime.getSecond());
+
+        // format nanosecond part
+        StringBuilder fraction = new 
StringBuilder(Long.toString(localDateTime.getNano()));
+        while (fraction.length() < 9) {
+            fraction.insert(0, "0");
+        }
+        String nano = fraction.substring(0, precision);
+
+        if (nano.length() > 0) {
+            ymdhms.append(".").append(fraction);
+        }
+
+        return ymdhms.toString();
+    }
+
     // 
--------------------------------------------------------------------------------------------
     // Java 8 time conversion
     // 
--------------------------------------------------------------------------------------------
@@ -450,4 +484,42 @@ public class DateTimeUtils {
         long tenToTheN = (long) Math.pow(10, n);
         return (l / tenToTheN) * tenToTheN;
     }
+
+    /** Appends year-month-day and hour:minute:second to a buffer; assumes 
they are valid. */
+    private static StringBuilder ymdhms(
+            StringBuilder b, int year, int month, int day, int h, int m, int 
s) {
+        ymd(b, year, month, day);
+        b.append(' ');
+        return hms(b, h, m, s);
+    }
+
+    /** Appends year-month-day to a buffer; assumes they are valid. */
+    private static StringBuilder ymd(StringBuilder b, int year, int month, int 
day) {
+        int4(b, year);
+        b.append('-');
+        int2(b, month);
+        b.append('-');
+        return int2(b, day);
+    }
+
+    /** Appends hour:minute:second to a buffer; assumes they are valid. */
+    private static StringBuilder hms(StringBuilder b, int h, int m, int s) {
+        int2(b, h);
+        b.append(':');
+        int2(b, m);
+        b.append(':');
+        return int2(b, s);
+    }
+
+    private static StringBuilder int4(StringBuilder buf, int i) {
+        buf.append((char) ('0' + (i / 1000) % 10));
+        buf.append((char) ('0' + (i / 100) % 10));
+        buf.append((char) ('0' + (i / 10) % 10));
+        return buf.append((char) ('0' + i % 10));
+    }
+
+    private static StringBuilder int2(StringBuilder buf, int i) {
+        buf.append((char) ('0' + (i / 10) % 10));
+        return buf.append((char) ('0' + i % 10));
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index ca839f142..4855eda4d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -25,6 +25,7 @@ import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeRefe
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.paimon.types.RowKind;
+import org.apache.paimon.utils.DateTimeUtils;
 import org.apache.paimon.utils.Preconditions;
 
 import org.apache.kafka.connect.json.JsonConverterConfig;
@@ -33,7 +34,7 @@ import org.slf4j.LoggerFactory;
 
 import java.math.BigDecimal;
 import java.time.Instant;
-import java.time.LocalDate;
+import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Base64;
@@ -201,6 +202,8 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
                     continue;
                 }
 
+                // pay attention to the temporal types
+                // 
https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-temporal-types
                 if ("bytes".equals(mySqlType) && className == null) {
                     // MySQL binary, varbinary, blob
                     newValue = new 
String(Base64.getDecoder().decode(oldValue));
@@ -221,15 +224,27 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
                     }
                 } else if ("io.debezium.time.Date".equals(className)) {
                     // MySQL date
-                    newValue = 
LocalDate.ofEpochDay(Integer.parseInt(oldValue)).toString();
+                    newValue = 
DateTimeUtils.toLocalDate(Integer.parseInt(oldValue)).toString();
                 } else if ("io.debezium.time.Timestamp".equals(className)) {
-                    // MySQL datetime
-                    newValue =
+                    // MySQL datetime (precision 0-3)
+                    LocalDateTime localDateTime =
                             Instant.ofEpochMilli(Long.parseLong(oldValue))
                                     .atZone(serverTimeZone)
-                                    .toLocalDateTime()
-                                    .toString()
-                                    .replace('T', ' ');
+                                    .toLocalDateTime();
+                    newValue = 
DateTimeUtils.formatLocalDateTime(localDateTime, 3);
+                } else if 
("io.debezium.time.MicroTimestamp".equals(className)) {
+                    // MySQL datetime (precision 4-6)
+                    long microseconds = Long.parseLong(oldValue);
+                    long microsecondsPerSecond = 1_000_000;
+                    long nanosecondsPerMicros = 1_000;
+                    long seconds = microseconds / microsecondsPerSecond;
+                    long nanoAdjustment =
+                            (microseconds % microsecondsPerSecond) * 
nanosecondsPerMicros;
+                    LocalDateTime localDateTime =
+                            Instant.ofEpochSecond(seconds, nanoAdjustment)
+                                    .atZone(serverTimeZone)
+                                    .toLocalDateTime();
+                    newValue = 
DateTimeUtils.formatLocalDateTime(localDateTime, 6);
                 } else if 
("io.debezium.time.ZonedTimestamp".equals(className)) {
                     // MySQL timestamp
                     newValue =
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 9b874bab9..ed0e9fba7 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -311,6 +311,10 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                             DataTypes.DECIMAL(8, 0), // 
_decimal_unsigned_zerofill
                             DataTypes.DATE(), // _date
                             DataTypes.TIMESTAMP(0), // _datetime
+                            DataTypes.TIMESTAMP(3), // _datetime3
+                            DataTypes.TIMESTAMP(6), // _datetime6
+                            DataTypes.TIMESTAMP(0), // _datetime_p
+                            DataTypes.TIMESTAMP(2), // _datetime_p2
                             DataTypes.TIMESTAMP(6), // _timestamp
                             DataTypes.CHAR(10), // _char
                             DataTypes.VARCHAR(20), // _varchar
@@ -361,6 +365,10 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                             "_decimal_unsigned_zerofill",
                             "_date",
                             "_datetime",
+                            "_datetime3",
+                            "_datetime6",
+                            "_datetime_p",
+                            "_datetime_p2",
                             "_timestamp",
                             "_char",
                             "_varchar",
@@ -385,7 +393,10 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                                 + "12345.110, 12345.220, 12345.330, "
                                 + "1.2345678987654322E32, 
1.2345678987654322E32, 1.2345678987654322E32, "
                                 + "11111, 22222, 33333, "
-                                + "19439, 2023-03-23T14:30:05, 
2023-03-23T15:00:10.123456, "
+                                + "19439, "
+                                + "2023-03-23T14:30:05, 
2023-03-23T14:30:05.123, 2023-03-23T14:30:05.123456, "
+                                + "2023-03-24T14:30, 2023-03-24T14:30:05.120, "
+                                + "2023-03-23T15:00:10.123456, "
                                 + "Paimon, Apache Paimon, Apache Paimon MySQL 
Test Data, "
                                 + "[98, 121, 116, 101, 115, 0, 0, 0, 0, 0], "
                                 + "[109, 111, 114, 101, 32, 98, 121, 116, 101, 
115], "
@@ -404,7 +415,10 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                                 + "NULL, NULL, NULL, "
                                 + "NULL, NULL, NULL, "
                                 + "NULL, NULL, NULL, "
+                                + "NULL, "
                                 + "NULL, NULL, NULL, "
+                                + "NULL, NULL, "
+                                + "NULL, "
                                 + "NULL, NULL, NULL, "
                                 + "NULL, NULL, NULL"
                                 + "]");
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql 
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
index d8d15f0b3..902dd9d4c 100644
--- a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
+++ b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
@@ -41,52 +41,75 @@ CREATE TABLE schema_evolution_2 (
     PRIMARY KEY (_id)
 );
 
+-- add comment lines for the convenience of reading
 CREATE TABLE all_types_table (
     _id INT,
+    -- TINYINT
     _boolean TINYINT(1),
     _tinyint TINYINT,
     _tinyint_unsigned TINYINT(2) UNSIGNED,
     _tinyint_unsigned_zerofill TINYINT(2) UNSIGNED ZEROFILL,
+    -- SMALLINT
     _smallint SMALLINT,
     _smallint_unsigned SMALLINT UNSIGNED,
     _smallint_unsigned_zerofill SMALLINT(4) UNSIGNED ZEROFILL,
+    -- MEDIUMINT
     _mediumint MEDIUMINT,
     _mediumint_unsigned MEDIUMINT UNSIGNED,
     _mediumint_unsigned_zerofill MEDIUMINT(8) UNSIGNED ZEROFILL,
+    -- INT
     _int INT,
     _int_unsigned INT UNSIGNED,
     _int_unsigned_zerofill INT(8) UNSIGNED ZEROFILL,
+    -- BIGINT
     _bigint BIGINT,
     _bigint_unsigned BIGINT UNSIGNED,
     _bigint_unsigned_zerofill BIGINT(16) UNSIGNED ZEROFILL,
     _serial SERIAL,
+    -- FLOAT
     _float FLOAT,
     _float_unsigned FLOAT UNSIGNED,
     _float_unsigned_zerofill FLOAT(4) UNSIGNED ZEROFILL,
+    -- REAL
     _real REAL,
     _real_unsigned REAL UNSIGNED,
     _real_unsigned_zerofill REAL(10, 7) UNSIGNED ZEROFILL,
+    -- DOUBLE
     _double DOUBLE,
     _double_unsigned DOUBLE UNSIGNED,
     _double_unsigned_zerofill DOUBLE(10, 7) UNSIGNED ZEROFILL,
+    -- DOUBLE PRECISION
     _double_precision DOUBLE PRECISION,
     _double_precision_unsigned DOUBLE PRECISION UNSIGNED,
     _double_precision_unsigned_zerofill DOUBLE PRECISION(10, 7) UNSIGNED 
ZEROFILL,
+    -- NUMERIC
     _numeric NUMERIC(8, 3),
     _numeric_unsigned NUMERIC(8, 3) UNSIGNED,
     _numeric_unsigned_zerofill NUMERIC(8, 3) UNSIGNED ZEROFILL,
+    -- FIXED
     _fixed FIXED(40, 3),
     _fixed_unsigned FIXED(40, 3) UNSIGNED,
     _fixed_unsigned_zerofill FIXED(40, 3) UNSIGNED ZEROFILL,
+    -- DECIMAL
     _decimal DECIMAL(8),
     _decimal_unsigned DECIMAL(8) UNSIGNED,
     _decimal_unsigned_zerofill DECIMAL(8) UNSIGNED ZEROFILL,
+    -- DATE
     _date DATE,
+    -- DATETIME
     _datetime DATETIME,
+    _datetime3 DATETIME(3),
+    _datetime6 DATETIME(6),
+    -- DATETIME precision test
+    _datetime_p DATETIME,
+    _datetime_p2 DATETIME(2),
+    -- TIMESTAMP
     _timestamp TIMESTAMP(6) DEFAULT NULL,
+    -- string
     _char CHAR(10),
     _varchar VARCHAR(20),
     _text TEXT,
+    -- BINARY
     _bin BINARY(10),
     _varbin VARBINARY(20),
     _blob BLOB,
@@ -95,20 +118,41 @@ CREATE TABLE all_types_table (
 
 INSERT INTO all_types_table VALUES (
     1,
+    -- TINYINT
     true, 1, 2, 3,
+    -- SMALLINT
     1000, 2000, 3000,
+    -- MEDIUMINT
     100000, 200000, 300000,
+    -- INT
     1000000, 2000000, 3000000,
+    -- BIGINT
     10000000000, 20000000000, 30000000000, 40000000000,
+    -- FLOAT
     1.5, 2.5, 3.5,
+    -- REAL
     1.000001, 2.000002, 3.000003,
+    -- DOUBLE
     1.000011, 2.000022, 3.000033,
+    -- DOUBLE PRECISION
     1.000111, 2.000222, 3.000333,
+    -- NUMERIC
     12345.11, 12345.22, 12345.33,
+    -- FIXED
     123456789876543212345678987654321.11, 
123456789876543212345678987654321.22, 123456789876543212345678987654321.33,
+    -- DECIMAL
     11111, 22222, 33333,
-    '2023-03-23', '2023-03-23 14:30:05', '2023-03-23 15:00:10.123456',
+    -- DATE
+    '2023-03-23',
+    -- DATETIME
+    '2023-03-23 14:30:05', '2023-03-23 14:30:05.123', '2023-03-23 
14:30:05.123456',
+    -- DATETIME precision test
+    '2023-03-24 14:30', '2023-03-24 14:30:05.12',
+    -- TIMESTAMP
+    '2023-03-23 15:00:10.123456',
+    -- string
     'Paimon', 'Apache Paimon', 'Apache Paimon MySQL Test Data',
+    -- BINARY
     'bytes', 'more bytes', 'very long bytes test data'
 ), (
     2,
@@ -124,7 +168,10 @@ INSERT INTO all_types_table VALUES (
     NULL, NULL, NULL,
     NULL, NULL, NULL,
     NULL, NULL, NULL,
+    NULL,
     NULL, NULL, NULL,
+    NULL, NULL,
+    NULL,
     NULL, NULL, NULL,
     NULL, NULL, NULL
 );

Reply via email to