This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 41997b217 [cdc] replace the string class name with constant (#1710)
41997b217 is described below

commit 41997b2170dcfa32aa867bbc4573989911f05f61
Author: JunZhang <[email protected]>
AuthorDate: Wed Aug 2 15:41:48 2023 +0800

    [cdc] replace the string class name with constant (#1710)
---
 .../cdc/mysql/MySqlDebeziumJsonEventParser.java    | 25 ++++++++++++++--------
 1 file changed, 16 insertions(+), 9 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index dfde301c3..2053d5fc4 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -39,7 +39,15 @@ import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeRefe
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
+import io.debezium.data.geometry.Geometry;
+import io.debezium.data.geometry.Point;
 import io.debezium.relational.history.TableChanges;
+import io.debezium.time.Date;
+import io.debezium.time.MicroTime;
+import io.debezium.time.MicroTimestamp;
+import io.debezium.time.Timestamp;
+import io.debezium.time.ZonedTimestamp;
+import org.apache.kafka.connect.data.Decimal;
 import org.apache.kafka.connect.json.JsonConverterConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -337,8 +345,7 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
             if ("bytes".equals(mySqlType) && className == null) {
                 // MySQL binary, varbinary, blob
                 newValue = new String(Base64.getDecoder().decode(oldValue));
-            } else if ("bytes".equals(mySqlType)
-                    && 
"org.apache.kafka.connect.data.Decimal".equals(className)) {
+            } else if ("bytes".equals(mySqlType) && 
Decimal.LOGICAL_NAME.equals(className)) {
                 // MySQL numeric, fixed, decimal
                 try {
                     new BigDecimal(oldValue);
@@ -352,10 +359,10 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
                                     + "' to 'numeric'",
                             e);
                 }
-            } else if ("io.debezium.time.Date".equals(className)) {
+            } else if (Date.SCHEMA_NAME.equals(className)) {
                 // MySQL date
                 newValue = 
DateTimeUtils.toLocalDate(Integer.parseInt(oldValue)).toString();
-            } else if ("io.debezium.time.Timestamp".equals(className)) {
+            } else if (Timestamp.SCHEMA_NAME.equals(className)) {
                 // MySQL datetime (precision 0-3)
 
                 // display value of datetime is not affected by timezone, see
@@ -365,7 +372,7 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
                 LocalDateTime localDateTime =
                         
DateTimeUtils.toLocalDateTime(Long.parseLong(oldValue), ZoneOffset.UTC);
                 newValue = DateTimeUtils.formatLocalDateTime(localDateTime, 3);
-            } else if ("io.debezium.time.MicroTimestamp".equals(className)) {
+            } else if (MicroTimestamp.SCHEMA_NAME.equals(className)) {
                 // MySQL datetime (precision 4-6)
                 long microseconds = Long.parseLong(oldValue);
                 long microsecondsPerSecond = 1_000_000;
@@ -382,7 +389,7 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
                                 .atZone(ZoneOffset.UTC)
                                 .toLocalDateTime();
                 newValue = DateTimeUtils.formatLocalDateTime(localDateTime, 6);
-            } else if ("io.debezium.time.ZonedTimestamp".equals(className)) {
+            } else if (ZonedTimestamp.SCHEMA_NAME.equals(className)) {
                 // MySQL timestamp
 
                 // display value of timestamp is affected by timezone, see
@@ -392,7 +399,7 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
                 LocalDateTime localDateTime =
                         
Instant.parse(oldValue).atZone(serverTimeZone).toLocalDateTime();
                 newValue = DateTimeUtils.formatLocalDateTime(localDateTime, 6);
-            } else if ("io.debezium.time.MicroTime".equals(className)) {
+            } else if (MicroTime.SCHEMA_NAME.equals(className)) {
                 long microseconds = Long.parseLong(oldValue);
                 long microsecondsPerSecond = 1_000_000;
                 long nanosecondsPerMicros = 1_000;
@@ -404,8 +411,8 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
                                 .atZone(ZoneOffset.UTC)
                                 .toLocalTime()
                                 .toString();
-            } else if ("io.debezium.data.geometry.Point".equals(className)
-                    || "io.debezium.data.geometry.Geometry".equals(className)) 
{
+            } else if (Point.LOGICAL_NAME.equals(className)
+                    || Geometry.LOGICAL_NAME.equals(className)) {
                 JsonNode jsonNode = recordRow.get(fieldName);
                 try {
                     byte[] wkb = jsonNode.get("wkb").binaryValue();

Reply via email to