This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4baefcdf1 [flink][mysql-cdc] Map MySQL BINARY to Paimon VARBINARY and 
fix parse of MySQL BIT(N) value (#1844)
4baefcdf1 is described below

commit 4baefcdf16fecf082e3c766085c6c8ef9db96232
Author: yuzelin <[email protected]>
AuthorDate: Wed Aug 23 20:47:32 2023 +0800

    [flink][mysql-cdc] Map MySQL BINARY to Paimon VARBINARY and fix parse of 
MySQL BIT(N) value (#1844)
---
 docs/content/how-to/cdc-ingestion.md               |  2 ++
 .../java/org/apache/paimon/utils/StringUtils.java  | 15 +++++++++++++
 .../java/org/apache/paimon/utils/TypeUtils.java    | 16 ++++++++++++--
 .../cdc/mysql/MySqlDebeziumJsonEventParser.java    | 25 +++++++++++++++++-----
 .../flink/action/cdc/mysql/MySqlTypeUtils.java     |  6 ++----
 .../paimon/flink/sink/cdc/CdcRecordUtils.java      |  4 ++--
 .../cdc/kafka/KafkaCanalSyncTableActionITCase.java |  2 +-
 .../cdc/mysql/MySqlCdcTypeMappingITCase.java       |  8 +++----
 .../cdc/mysql/MySqlSyncTableActionITCase.java      |  8 +++++--
 .../resources/mysql/type_mapping_test_setup.sql    |  6 +++---
 10 files changed, 69 insertions(+), 23 deletions(-)

diff --git a/docs/content/how-to/cdc-ingestion.md 
b/docs/content/how-to/cdc-ingestion.md
index 662772f5b..6032280b5 100644
--- a/docs/content/how-to/cdc-ingestion.md
+++ b/docs/content/how-to/cdc-ingestion.md
@@ -605,6 +605,8 @@ you can specify type mapping option `tinyint1-not-bool`, 
then the column will be
 3. You can use type mapping option `to-string` to map all MySQL data type to 
STRING.
 4. MySQL BIT(1) type will be mapped to Boolean.
 5. When using Hive catalog, MySQL TIME type will be mapped to STRING.
+6. MySQL BINARY will be mapped to Paimon VARBINARY. This is because the binary 
value is passed as bytes in binlog, so it 
+should be mapped to byte type (BYTES or VARBINARY). We choose VARBINARY 
because it can retain the length information.
 
 ## FAQ
 1. Chinese characters in records ingested from MySQL are garbled.
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
index d319ad642..9bd5e3d9e 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
@@ -142,6 +142,21 @@ public class StringUtils {
         return byteToHexString(bytes, 0, bytes.length);
     }
 
+    /**
+     * Given an array of bytes it will convert the bytes to a binary (0-1) 
string representation of
+     * the bytes.
+     *
+     * @param bytes the bytes to be converted
+     * @return binary string representation of the byte array
+     */
+    public static String bytesToBinaryString(final byte[] bytes) {
+        StringBuilder result = new StringBuilder();
+        for (byte b : bytes) {
+            result.append(String.format("%8s", Integer.toBinaryString(b & 
0xFF)).replace(' ', '0'));
+        }
+        return result.toString();
+    }
+
     /**
      * Creates a random string with a length within the given interval. The 
string contains only
      * characters that can be represented as a single code point.
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
index e6581aa1e..5e1cac68b 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
@@ -32,7 +32,9 @@ import org.apache.paimon.types.TimestampType;
 import org.apache.paimon.types.VarCharType;
 
 import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
+import java.util.Base64;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -50,6 +52,14 @@ public class TypeUtils {
     }
 
     public static Object castFromString(String s, DataType type) {
+        return castFromStringInternal(s, type, false);
+    }
+
+    public static Object castFromCdcValueString(String s, DataType type) {
+        return castFromStringInternal(s, type, true);
+    }
+
+    private static Object castFromStringInternal(String s, DataType type, 
boolean isCdcValue) {
         BinaryString str = BinaryString.fromString(s);
         switch (type.getTypeRoot()) {
             case CHAR:
@@ -65,10 +75,12 @@ public class TypeUtils {
             case BOOLEAN:
                 return BinaryStringUtils.toBoolean(str);
             case BINARY:
-                return s.getBytes();
+                return isCdcValue
+                        ? Base64.getDecoder().decode(s)
+                        : s.getBytes(StandardCharsets.UTF_8);
             case VARBINARY:
                 int binaryLength = DataTypeChecks.getLength(type);
-                byte[] bytes = s.getBytes();
+                byte[] bytes = s.getBytes(StandardCharsets.UTF_8);
                 if (bytes.length > binaryLength) {
                     throw new IllegalArgumentException(
                             String.format(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index 7377050de..498e5f099 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -36,6 +36,7 @@ import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.utils.DateTimeUtils;
 import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.StringUtils;
 
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -74,6 +75,7 @@ import java.util.Set;
 import java.util.regex.Pattern;
 
 import static 
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_NULLABLE;
+import static 
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** {@link EventParser} for MySQL Debezium JSON. */
@@ -349,10 +351,20 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
             String oldValue = objectValue.toString();
             String newValue = oldValue;
 
-            // pay attention to the temporal types
-            // 
https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-temporal-types
-            if (("bytes".equals(mySqlType) && className == null)
-                    || Bits.LOGICAL_NAME.equals(className)) {
+            if (Bits.LOGICAL_NAME.equals(className)) {
+                // transform little-endian form to normal order
+                // 
https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-data-types
+                byte[] littleEndian = Base64.getDecoder().decode(oldValue);
+                byte[] bigEndian = new byte[littleEndian.length];
+                for (int i = 0; i < littleEndian.length; i++) {
+                    bigEndian[i] = littleEndian[littleEndian.length - 1 - i];
+                }
+                if (typeMapping.containsMode(TO_STRING)) {
+                    newValue = StringUtils.bytesToBinaryString(bigEndian);
+                } else {
+                    newValue = Base64.getEncoder().encodeToString(bigEndian);
+                }
+            } else if (("bytes".equals(mySqlType) && className == null)) {
                 // MySQL binary, varbinary, blob
                 newValue = new String(Base64.getDecoder().decode(oldValue));
             } else if ("bytes".equals(mySqlType) && 
Decimal.LOGICAL_NAME.equals(className)) {
@@ -369,7 +381,10 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
                                     + "' to 'numeric'",
                             e);
                 }
-            } else if (Date.SCHEMA_NAME.equals(className)) {
+            }
+            // pay attention to the temporal types
+            // 
https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-temporal-types
+            else if (Date.SCHEMA_NAME.equals(className)) {
                 // MySQL date
                 newValue = 
DateTimeUtils.toLocalDate(Integer.parseInt(oldValue)).toString();
             } else if (Timestamp.SCHEMA_NAME.equals(className)) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
index 80ba982bc..b64fc9cad 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
@@ -24,7 +24,6 @@
 package org.apache.paimon.flink.action.cdc.mysql;
 
 import org.apache.paimon.flink.action.cdc.TypeMapping;
-import org.apache.paimon.types.BinaryType;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.TimestampType;
@@ -279,10 +278,9 @@ public class MySqlTypeUtils {
             case MULTIPOLYGON:
             case GEOMETRYCOLLECTION:
                 return DataTypes.STRING();
+                // MySQL BINARY and VARBINARY are stored as bytes in JSON. We 
convert them to
+                // DataTypes.VARBINARY to retain the length information
             case BINARY:
-                return length == null || length == 0
-                        ? DataTypes.BINARY(BinaryType.DEFAULT_LENGTH)
-                        : DataTypes.BINARY(length);
             case VARBINARY:
                 return length == null || length == 0
                         ? DataTypes.VARBINARY(VarBinaryType.DEFAULT_LENGTH)
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java
index d0c422c08..1c9322b1d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java
@@ -56,7 +56,7 @@ public class CdcRecordUtils {
             DataField dataField = dataFields.get(i);
             genericRow.setField(
                     i,
-                    TypeUtils.castFromString(
+                    TypeUtils.castFromCdcValueString(
                             record.fields().get(dataField.name()), 
dataField.type()));
         }
         return genericRow;
@@ -100,7 +100,7 @@ public class CdcRecordUtils {
             // TODO TypeUtils.castFromString cannot deal with complex types 
like arrays and
             //  maps. Change type of CdcRecord#field if needed.
             try {
-                genericRow.setField(idx, TypeUtils.castFromString(value, 
type));
+                genericRow.setField(idx, 
TypeUtils.castFromCdcValueString(value, type));
             } catch (Exception e) {
                 LOG.info(
                         "Failed to convert value "
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
index d812d8419..78d8f6197 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
@@ -427,7 +427,7 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaActionITCaseBase {
                             DataTypes.STRING(), // _text
                             DataTypes.STRING(), // _mediumtext
                             DataTypes.STRING(), // _longtext
-                            DataTypes.BINARY(10), // _bin
+                            DataTypes.VARBINARY(10), // _bin
                             DataTypes.VARBINARY(20), // _varbin
                             DataTypes.BYTES(), // _tinyblob
                             DataTypes.BYTES(), // _blob
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlCdcTypeMappingITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlCdcTypeMappingITCase.java
index 26e1638d9..9427698e4 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlCdcTypeMappingITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlCdcTypeMappingITCase.java
@@ -111,7 +111,6 @@ public class MySqlCdcTypeMappingITCase extends 
MySqlActionITCaseBase {
 
     // --------------------------------------- all-to-string 
---------------------------------------
 
-    // TODO: test BIT(n) after fix bit type
     @Test
     @Timeout(60)
     public void testReadAllTypes() throws Exception {
@@ -126,7 +125,7 @@ public class MySqlCdcTypeMappingITCase extends 
MySqlActionITCaseBase {
                         .withTypeMapping(new 
TypeMapping(Collections.singleton(TO_STRING)));
         runActionWithDefaultEnv(action);
 
-        int allTypeNums = 76;
+        int allTypeNums = 77;
         DataType[] types =
                 IntStream.range(0, allTypeNums)
                         .mapToObj(i -> DataTypes.STRING())
@@ -141,6 +140,7 @@ public class MySqlCdcTypeMappingITCase extends 
MySqlActionITCaseBase {
                             "_id",
                             "pt",
                             "_bit1",
+                            "_bit",
                             "_tinyint1",
                             "_boolean",
                             "_bool",
@@ -220,7 +220,7 @@ public class MySqlCdcTypeMappingITCase extends 
MySqlActionITCaseBase {
                 Arrays.asList(
                         "+I["
                                 + "1, 1.1, "
-                                + "true, "
+                                + "true, 
0000000000000000000000000000000000000000000000000000011111000111, "
                                 + "1, 1, 0, 1, 2, 3, "
                                 + "1000, 2000, 3000, "
                                 + "100000, 200000, 300000, "
@@ -260,7 +260,7 @@ public class MySqlCdcTypeMappingITCase extends 
MySqlActionITCaseBase {
                                 + "]",
                         "+I["
                                 + "2, 2.2, "
-                                + "NULL, "
+                                + "NULL, NULL, "
                                 + "NULL, NULL, NULL, NULL, NULL, NULL, "
                                 + "NULL, NULL, NULL, "
                                 + "NULL, NULL, NULL, "
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 7f747b1b8..7725f1385 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -402,7 +402,7 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                             DataTypes.STRING(), // _text
                             DataTypes.STRING(), // _mediumtext
                             DataTypes.STRING(), // _longtext
-                            DataTypes.BINARY(10), // _bin
+                            DataTypes.VARBINARY(10), // _bin
                             DataTypes.VARBINARY(20), // _varbin
                             DataTypes.BYTES(), // _tinyblob
                             DataTypes.BYTES(), // _blob
@@ -502,11 +502,15 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                             "_set",
                         });
         FileStoreTable table = getFileStoreTable();
+        // BIT(64) data: 0B11111000111 -> 0B00000111_11000111
+        String bits =
+                Arrays.toString(
+                        new byte[] {0, 0, 0, 0, 0, 0, (byte) 0B00000111, 
(byte) 0B11000111});
         List<String> expected =
                 Arrays.asList(
                         "+I["
                                 + "1, 1.1, "
-                                + "true, [-17, -65, -67, 7, 0, 0, 0, 0, 0, 0], 
"
+                                + String.format("true, %s, ", bits)
                                 + "true, true, false, 1, 2, 3, "
                                 + "1000, 2000, 3000, "
                                 + "100000, 200000, 300000, "
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/type_mapping_test_setup.sql
 
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/type_mapping_test_setup.sql
index baf76b9d5..2bd004f2d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/type_mapping_test_setup.sql
+++ 
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/type_mapping_test_setup.sql
@@ -51,7 +51,7 @@ CREATE TABLE all_types_table (
     pt DECIMAL(2, 1),
     -- BIT
     _bit1 BIT,
-    -- _bit BIT(64), TODO
+    _bit BIT(64),
     -- TINYINT
     _tinyint1 TINYINT(1),
     _boolean BOOLEAN,
@@ -152,7 +152,7 @@ CREATE TABLE all_types_table (
 INSERT INTO all_types_table VALUES (
     1, 1.1,
     -- BIT
-    1, -- B'11111000111', TODO
+    1, B'11111000111',
     -- TINYINT
     true, true, false, 1, 2, 3,
     -- SMALLINT
@@ -208,7 +208,7 @@ INSERT INTO all_types_table VALUES (
     'a,b'
 ), (
     2, 2.2,
-    NULL,
+    NULL, NULL,
     NULL, NULL, NULL, NULL, NULL, NULL,
     NULL, NULL, NULL,
     NULL, NULL, NULL,

Reply via email to