This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5d632ed16 [flink][bug] Fix CDC Ingestion mysql DATETIME type cast
error (#909)
5d632ed16 is described below
commit 5d632ed164bf35c4aa693c11ca98020a33fb3866
Author: yuzelin <[email protected]>
AuthorDate: Mon Apr 17 09:32:58 2023 +0800
[flink][bug] Fix CDC Ingestion mysql DATETIME type cast error (#909)
---
.../org/apache/paimon/utils/DateTimeUtils.java | 72 ++++++++++++++++++++++
.../cdc/mysql/MySqlDebeziumJsonEventParser.java | 29 ++++++---
.../cdc/mysql/MySqlSyncTableActionITCase.java | 16 ++++-
.../src/test/resources/mysql/setup.sql | 49 ++++++++++++++-
4 files changed, 157 insertions(+), 9 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/DateTimeUtils.java
b/paimon-common/src/main/java/org/apache/paimon/utils/DateTimeUtils.java
index 6ed26cafd..28d88dc67 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/DateTimeUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/DateTimeUtils.java
@@ -121,6 +121,40 @@ public class DateTimeUtils {
+ time.getNano() / 1000_000;
}
+ /**
+ * Format a {@link LocalDateTime} to yyyy-MM-dd HH:mm:ss[.nano] string.
+ *
+ * @param precision how many digits of nanoseconds to be retained
+ */
+ public static String formatLocalDateTime(LocalDateTime localDateTime, int
precision) {
+ // nanosecond is range in 0 ~ 999_999_999
+ Preconditions.checkArgument(
+ precision >= 0 && precision <= 9, "precision should be in
range 0 ~ 9.");
+ // format year to second part
+ StringBuilder ymdhms =
+ ymdhms(
+ new StringBuilder(),
+ localDateTime.getYear(),
+ localDateTime.getMonthValue(),
+ localDateTime.getDayOfMonth(),
+ localDateTime.getHour(),
+ localDateTime.getMinute(),
+ localDateTime.getSecond());
+
+ // format nanosecond part
+ StringBuilder fraction = new
StringBuilder(Long.toString(localDateTime.getNano()));
+ while (fraction.length() < 9) {
+ fraction.insert(0, "0");
+ }
+ String nano = fraction.substring(0, precision);
+
+ if (nano.length() > 0) {
+ ymdhms.append(".").append(fraction);
+ }
+
+ return ymdhms.toString();
+ }
+
//
--------------------------------------------------------------------------------------------
// Java 8 time conversion
//
--------------------------------------------------------------------------------------------
@@ -450,4 +484,42 @@ public class DateTimeUtils {
long tenToTheN = (long) Math.pow(10, n);
return (l / tenToTheN) * tenToTheN;
}
+
+ /** Appends year-month-day and hour:minute:second to a buffer; assumes
they are valid. */
+ private static StringBuilder ymdhms(
+ StringBuilder b, int year, int month, int day, int h, int m, int
s) {
+ ymd(b, year, month, day);
+ b.append(' ');
+ return hms(b, h, m, s);
+ }
+
+ /** Appends year-month-day to a buffer; assumes they are valid. */
+ private static StringBuilder ymd(StringBuilder b, int year, int month, int
day) {
+ int4(b, year);
+ b.append('-');
+ int2(b, month);
+ b.append('-');
+ return int2(b, day);
+ }
+
+ /** Appends hour:minute:second to a buffer; assumes they are valid. */
+ private static StringBuilder hms(StringBuilder b, int h, int m, int s) {
+ int2(b, h);
+ b.append(':');
+ int2(b, m);
+ b.append(':');
+ return int2(b, s);
+ }
+
+ private static StringBuilder int4(StringBuilder buf, int i) {
+ buf.append((char) ('0' + (i / 1000) % 10));
+ buf.append((char) ('0' + (i / 100) % 10));
+ buf.append((char) ('0' + (i / 10) % 10));
+ return buf.append((char) ('0' + i % 10));
+ }
+
+ private static StringBuilder int2(StringBuilder buf, int i) {
+ buf.append((char) ('0' + (i / 10) % 10));
+ return buf.append((char) ('0' + i % 10));
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index ca839f142..4855eda4d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -25,6 +25,7 @@ import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeRefe
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.paimon.types.RowKind;
+import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.Preconditions;
import org.apache.kafka.connect.json.JsonConverterConfig;
@@ -33,7 +34,7 @@ import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.time.Instant;
-import java.time.LocalDate;
+import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Base64;
@@ -201,6 +202,8 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
continue;
}
+ // pay attention to the temporal types
+ //
https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-temporal-types
if ("bytes".equals(mySqlType) && className == null) {
// MySQL binary, varbinary, blob
newValue = new
String(Base64.getDecoder().decode(oldValue));
@@ -221,15 +224,27 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
}
} else if ("io.debezium.time.Date".equals(className)) {
// MySQL date
- newValue =
LocalDate.ofEpochDay(Integer.parseInt(oldValue)).toString();
+ newValue =
DateTimeUtils.toLocalDate(Integer.parseInt(oldValue)).toString();
} else if ("io.debezium.time.Timestamp".equals(className)) {
- // MySQL datetime
- newValue =
+ // MySQL datetime (precision 0-3)
+ LocalDateTime localDateTime =
Instant.ofEpochMilli(Long.parseLong(oldValue))
.atZone(serverTimeZone)
- .toLocalDateTime()
- .toString()
- .replace('T', ' ');
+ .toLocalDateTime();
+ newValue =
DateTimeUtils.formatLocalDateTime(localDateTime, 3);
+ } else if
("io.debezium.time.MicroTimestamp".equals(className)) {
+ // MySQL datetime (precision 4-6)
+ long microseconds = Long.parseLong(oldValue);
+ long microsecondsPerSecond = 1_000_000;
+ long nanosecondsPerMicros = 1_000;
+ long seconds = microseconds / microsecondsPerSecond;
+ long nanoAdjustment =
+ (microseconds % microsecondsPerSecond) *
nanosecondsPerMicros;
+ LocalDateTime localDateTime =
+ Instant.ofEpochSecond(seconds, nanoAdjustment)
+ .atZone(serverTimeZone)
+ .toLocalDateTime();
+ newValue =
DateTimeUtils.formatLocalDateTime(localDateTime, 6);
} else if
("io.debezium.time.ZonedTimestamp".equals(className)) {
// MySQL timestamp
newValue =
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 9b874bab9..ed0e9fba7 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -311,6 +311,10 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
DataTypes.DECIMAL(8, 0), //
_decimal_unsigned_zerofill
DataTypes.DATE(), // _date
DataTypes.TIMESTAMP(0), // _datetime
+ DataTypes.TIMESTAMP(3), // _datetime3
+ DataTypes.TIMESTAMP(6), // _datetime6
+ DataTypes.TIMESTAMP(0), // _datetime_p
+ DataTypes.TIMESTAMP(2), // _datetime_p2
DataTypes.TIMESTAMP(6), // _timestamp
DataTypes.CHAR(10), // _char
DataTypes.VARCHAR(20), // _varchar
@@ -361,6 +365,10 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
"_decimal_unsigned_zerofill",
"_date",
"_datetime",
+ "_datetime3",
+ "_datetime6",
+ "_datetime_p",
+ "_datetime_p2",
"_timestamp",
"_char",
"_varchar",
@@ -385,7 +393,10 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
+ "12345.110, 12345.220, 12345.330, "
+ "1.2345678987654322E32,
1.2345678987654322E32, 1.2345678987654322E32, "
+ "11111, 22222, 33333, "
- + "19439, 2023-03-23T14:30:05,
2023-03-23T15:00:10.123456, "
+ + "19439, "
+ + "2023-03-23T14:30:05,
2023-03-23T14:30:05.123, 2023-03-23T14:30:05.123456, "
+ + "2023-03-24T14:30, 2023-03-24T14:30:05.120, "
+ + "2023-03-23T15:00:10.123456, "
+ "Paimon, Apache Paimon, Apache Paimon MySQL
Test Data, "
+ "[98, 121, 116, 101, 115, 0, 0, 0, 0, 0], "
+ "[109, 111, 114, 101, 32, 98, 121, 116, 101,
115], "
@@ -404,7 +415,10 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
+ "NULL, NULL, NULL, "
+ "NULL, NULL, NULL, "
+ "NULL, NULL, NULL, "
+ + "NULL, "
+ "NULL, NULL, NULL, "
+ + "NULL, NULL, "
+ + "NULL, "
+ "NULL, NULL, NULL, "
+ "NULL, NULL, NULL"
+ "]");
diff --git
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
index d8d15f0b3..902dd9d4c 100644
--- a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
+++ b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
@@ -41,52 +41,75 @@ CREATE TABLE schema_evolution_2 (
PRIMARY KEY (_id)
);
+-- add comment lines for the convenience of reading
CREATE TABLE all_types_table (
_id INT,
+ -- TINYINT
_boolean TINYINT(1),
_tinyint TINYINT,
_tinyint_unsigned TINYINT(2) UNSIGNED,
_tinyint_unsigned_zerofill TINYINT(2) UNSIGNED ZEROFILL,
+ -- SMALLINT
_smallint SMALLINT,
_smallint_unsigned SMALLINT UNSIGNED,
_smallint_unsigned_zerofill SMALLINT(4) UNSIGNED ZEROFILL,
+ -- MEDIUMINT
_mediumint MEDIUMINT,
_mediumint_unsigned MEDIUMINT UNSIGNED,
_mediumint_unsigned_zerofill MEDIUMINT(8) UNSIGNED ZEROFILL,
+ -- INT
_int INT,
_int_unsigned INT UNSIGNED,
_int_unsigned_zerofill INT(8) UNSIGNED ZEROFILL,
+ -- BIGINT
_bigint BIGINT,
_bigint_unsigned BIGINT UNSIGNED,
_bigint_unsigned_zerofill BIGINT(16) UNSIGNED ZEROFILL,
_serial SERIAL,
+ -- FLOAT
_float FLOAT,
_float_unsigned FLOAT UNSIGNED,
_float_unsigned_zerofill FLOAT(4) UNSIGNED ZEROFILL,
+ -- REAL
_real REAL,
_real_unsigned REAL UNSIGNED,
_real_unsigned_zerofill REAL(10, 7) UNSIGNED ZEROFILL,
+ -- DOUBLE
_double DOUBLE,
_double_unsigned DOUBLE UNSIGNED,
_double_unsigned_zerofill DOUBLE(10, 7) UNSIGNED ZEROFILL,
+ -- DOUBLE PRECISION
_double_precision DOUBLE PRECISION,
_double_precision_unsigned DOUBLE PRECISION UNSIGNED,
_double_precision_unsigned_zerofill DOUBLE PRECISION(10, 7) UNSIGNED
ZEROFILL,
+ -- NUMERIC
_numeric NUMERIC(8, 3),
_numeric_unsigned NUMERIC(8, 3) UNSIGNED,
_numeric_unsigned_zerofill NUMERIC(8, 3) UNSIGNED ZEROFILL,
+ -- FIXED
_fixed FIXED(40, 3),
_fixed_unsigned FIXED(40, 3) UNSIGNED,
_fixed_unsigned_zerofill FIXED(40, 3) UNSIGNED ZEROFILL,
+ -- DECIMAL
_decimal DECIMAL(8),
_decimal_unsigned DECIMAL(8) UNSIGNED,
_decimal_unsigned_zerofill DECIMAL(8) UNSIGNED ZEROFILL,
+ -- DATE
_date DATE,
+ -- DATETIME
_datetime DATETIME,
+ _datetime3 DATETIME(3),
+ _datetime6 DATETIME(6),
+ -- DATETIME precision test
+ _datetime_p DATETIME,
+ _datetime_p2 DATETIME(2),
+ -- TIMESTAMP
_timestamp TIMESTAMP(6) DEFAULT NULL,
+ -- string
_char CHAR(10),
_varchar VARCHAR(20),
_text TEXT,
+ -- BINARY
_bin BINARY(10),
_varbin VARBINARY(20),
_blob BLOB,
@@ -95,20 +118,41 @@ CREATE TABLE all_types_table (
INSERT INTO all_types_table VALUES (
1,
+ -- TINYINT
true, 1, 2, 3,
+ -- SMALLINT
1000, 2000, 3000,
+ -- MEDIUMINT
100000, 200000, 300000,
+ -- INT
1000000, 2000000, 3000000,
+ -- BIGINT
10000000000, 20000000000, 30000000000, 40000000000,
+ -- FLOAT
1.5, 2.5, 3.5,
+ -- REAL
1.000001, 2.000002, 3.000003,
+ -- DOUBLE
1.000011, 2.000022, 3.000033,
+ -- DOUBLE PRECISION
1.000111, 2.000222, 3.000333,
+ -- NUMERIC
12345.11, 12345.22, 12345.33,
+ -- FIXED
123456789876543212345678987654321.11,
123456789876543212345678987654321.22, 123456789876543212345678987654321.33,
+ -- DECIMAL
11111, 22222, 33333,
- '2023-03-23', '2023-03-23 14:30:05', '2023-03-23 15:00:10.123456',
+ -- DATE
+ '2023-03-23',
+ -- DATETIME
+ '2023-03-23 14:30:05', '2023-03-23 14:30:05.123', '2023-03-23
14:30:05.123456',
+ -- DATETIME precision test
+ '2023-03-24 14:30', '2023-03-24 14:30:05.12',
+ -- TIMESTAMP
+ '2023-03-23 15:00:10.123456',
+ -- string
'Paimon', 'Apache Paimon', 'Apache Paimon MySQL Test Data',
+ -- BINARY
'bytes', 'more bytes', 'very long bytes test data'
), (
2,
@@ -124,7 +168,10 @@ INSERT INTO all_types_table VALUES (
NULL, NULL, NULL,
NULL, NULL, NULL,
NULL, NULL, NULL,
+ NULL,
NULL, NULL, NULL,
+ NULL, NULL,
+ NULL,
NULL, NULL, NULL,
NULL, NULL, NULL
);