This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 96ec8d450 [cdc] support bit type for flink cdc (#1708)
96ec8d450 is described below

commit 96ec8d4500d4109a2d0e5058f0456dba33a58137
Author: JunZhang <[email protected]>
AuthorDate: Mon Aug 14 17:31:32 2023 +0800

    [cdc] support bit type for flink cdc (#1708)
---
 paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java  | 1 +
 .../paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java | 4 +++-
 .../org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java    | 5 +++++
 .../paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java   | 6 ++++++
 .../src/test/resources/mysql/sync_table_setup.sql                   | 6 ++++++
 5 files changed, 21 insertions(+), 1 deletion(-)

diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
index f27b89a18..e6581aa1e 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
@@ -65,6 +65,7 @@ public class TypeUtils {
             case BOOLEAN:
                 return BinaryStringUtils.toBoolean(str);
             case BINARY:
+                return s.getBytes();
             case VARBINARY:
                 int binaryLength = DataTypeChecks.getLength(type);
                 byte[] bytes = s.getBytes();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index 418db64d2..35fd888c4 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -40,6 +40,7 @@ import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeRefe
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
+import io.debezium.data.Bits;
 import io.debezium.data.geometry.Geometry;
 import io.debezium.data.geometry.Point;
 import io.debezium.relational.history.TableChanges;
@@ -349,7 +350,8 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
 
             // pay attention to the temporal types
             // 
https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-temporal-types
-            if ("bytes".equals(mySqlType) && className == null) {
+            if (("bytes".equals(mySqlType) && className == null)
+                    || Bits.LOGICAL_NAME.equals(className)) {
                 // MySQL binary, varbinary, blob
                 newValue = new String(Base64.getDecoder().decode(oldValue));
             } else if ("bytes".equals(mySqlType) && 
Decimal.LOGICAL_NAME.equals(className)) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
index bd288cee5..f7b696a89 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
@@ -152,6 +152,11 @@ public class MySqlTypeUtils {
             Boolean tinyInt1ToBool) {
         switch (type.toUpperCase()) {
             case BIT:
+                if (length == null || length == 1) {
+                    return DataTypes.BOOLEAN();
+                } else {
+                    return DataTypes.BINARY((length + 7) / 8);
+                }
             case BOOLEAN:
             case BOOL:
                 return DataTypes.BOOLEAN();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index b4cf2131d..c4b683775 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -378,6 +378,8 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                         new DataType[] {
                             DataTypes.INT().notNull(), // _id
                             DataTypes.DECIMAL(2, 1).notNull(), // pt
+                            DataTypes.BOOLEAN(), // _bit1
+                            DataTypes.BINARY(8), // _bit
                             DataTypes.BOOLEAN(), // _tinyint1
                             DataTypes.BOOLEAN(), // _boolean
                             DataTypes.BOOLEAN(), // _bool
@@ -455,6 +457,8 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                         new String[] {
                             "_id",
                             "pt",
+                            "_bit1",
+                            "_bit",
                             "_tinyint1",
                             "_boolean",
                             "_bool",
@@ -534,6 +538,7 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                 Arrays.asList(
                         "+I["
                                 + "1, 1.1, "
+                                + "true, [-17, -65, -67, 7, 0, 0, 0, 0, 0, 0], 
"
                                 + "true, true, false, 1, 2, 3, "
                                 + "1000, 2000, 3000, "
                                 + "100000, 200000, 300000, "
@@ -578,6 +583,7 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                                 + "]",
                         "+I["
                                 + "2, 2.2, "
+                                + "NULL, NULL, "
                                 + "NULL, NULL, NULL, NULL, NULL, NULL, "
                                 + "NULL, NULL, NULL, "
                                 + "NULL, NULL, NULL, "
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
 
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
index e10df0afa..8421b6c67 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
+++ 
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
@@ -52,6 +52,9 @@ CREATE TABLE schema_evolution_multiple (
 CREATE TABLE all_types_table (
     _id INT,
     pt DECIMAL(2, 1),
+    -- BIT
+    _bit1 BIT,
+    _bit BIT(64),
     -- TINYINT
     _tinyint1 TINYINT(1),
     _boolean BOOLEAN,
@@ -152,6 +155,8 @@ CREATE TABLE all_types_table (
 
 INSERT INTO all_types_table VALUES (
     1, 1.1,
+    -- BIT
+    1, B'11111000111',
     -- TINYINT
     true, true, false, 1, 2, 3,
     -- SMALLINT
@@ -207,6 +212,7 @@ INSERT INTO all_types_table VALUES (
     'a,b'
 ), (
     2, 2.2,
+    NULL, NULL,
     NULL, NULL, NULL, NULL, NULL, NULL,
     NULL, NULL, NULL,
     NULL, NULL, NULL,

Reply via email to