This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4baefcdf1 [flink][mysql-cdc] Map MySQL BINARY to Paimon VARBINARY and
fix parse of MySQL BIT(N) value (#1844)
4baefcdf1 is described below
commit 4baefcdf16fecf082e3c766085c6c8ef9db96232
Author: yuzelin <[email protected]>
AuthorDate: Wed Aug 23 20:47:32 2023 +0800
[flink][mysql-cdc] Map MySQL BINARY to Paimon VARBINARY and fix parse of
MySQL BIT(N) value (#1844)
---
docs/content/how-to/cdc-ingestion.md | 2 ++
.../java/org/apache/paimon/utils/StringUtils.java | 15 +++++++++++++
.../java/org/apache/paimon/utils/TypeUtils.java | 16 ++++++++++++--
.../cdc/mysql/MySqlDebeziumJsonEventParser.java | 25 +++++++++++++++++-----
.../flink/action/cdc/mysql/MySqlTypeUtils.java | 6 ++----
.../paimon/flink/sink/cdc/CdcRecordUtils.java | 4 ++--
.../cdc/kafka/KafkaCanalSyncTableActionITCase.java | 2 +-
.../cdc/mysql/MySqlCdcTypeMappingITCase.java | 8 +++----
.../cdc/mysql/MySqlSyncTableActionITCase.java | 8 +++++--
.../resources/mysql/type_mapping_test_setup.sql | 6 +++---
10 files changed, 69 insertions(+), 23 deletions(-)
diff --git a/docs/content/how-to/cdc-ingestion.md
b/docs/content/how-to/cdc-ingestion.md
index 662772f5b..6032280b5 100644
--- a/docs/content/how-to/cdc-ingestion.md
+++ b/docs/content/how-to/cdc-ingestion.md
@@ -605,6 +605,8 @@ you can specify type mapping option `tinyint1-not-bool`,
then the column will be
3. You can use type mapping option `to-string` to map all MySQL data type to
STRING.
4. MySQL BIT(1) type will be mapped to Boolean.
5. When using Hive catalog, MySQL TIME type will be mapped to STRING.
+6. MySQL BINARY will be mapped to Paimon VARBINARY. This is because the binary
value is passed as bytes in binlog, so it
+should be mapped to byte type (BYTES or VARBINARY). We choose VARBINARY
because it can retain the length information.
## FAQ
1. Chinese characters in records ingested from MySQL are garbled.
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
b/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
index d319ad642..9bd5e3d9e 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
@@ -142,6 +142,21 @@ public class StringUtils {
return byteToHexString(bytes, 0, bytes.length);
}
+ /**
+ * Given an array of bytes it will convert the bytes to a binary (0-1)
string representation of
+ * the bytes.
+ *
+ * @param bytes the bytes to be converted
+ * @return binary string representation of the byte array
+ */
+ public static String bytesToBinaryString(final byte[] bytes) {
+ StringBuilder result = new StringBuilder();
+ for (byte b : bytes) {
+ result.append(String.format("%8s", Integer.toBinaryString(b &
0xFF)).replace(' ', '0'));
+ }
+ return result.toString();
+ }
+
/**
* Creates a random string with a length within the given interval. The
string contains only
* characters that can be represented as a single code point.
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
index e6581aa1e..5e1cac68b 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
@@ -32,7 +32,9 @@ import org.apache.paimon.types.TimestampType;
import org.apache.paimon.types.VarCharType;
import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
+import java.util.Base64;
import java.util.List;
import java.util.stream.Collectors;
@@ -50,6 +52,14 @@ public class TypeUtils {
}
public static Object castFromString(String s, DataType type) {
+ return castFromStringInternal(s, type, false);
+ }
+
+ public static Object castFromCdcValueString(String s, DataType type) {
+ return castFromStringInternal(s, type, true);
+ }
+
+ private static Object castFromStringInternal(String s, DataType type,
boolean isCdcValue) {
BinaryString str = BinaryString.fromString(s);
switch (type.getTypeRoot()) {
case CHAR:
@@ -65,10 +75,12 @@ public class TypeUtils {
case BOOLEAN:
return BinaryStringUtils.toBoolean(str);
case BINARY:
- return s.getBytes();
+ return isCdcValue
+ ? Base64.getDecoder().decode(s)
+ : s.getBytes(StandardCharsets.UTF_8);
case VARBINARY:
int binaryLength = DataTypeChecks.getLength(type);
- byte[] bytes = s.getBytes();
+ byte[] bytes = s.getBytes(StandardCharsets.UTF_8);
if (bytes.length > binaryLength) {
throw new IllegalArgumentException(
String.format(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index 7377050de..498e5f099 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -36,6 +36,7 @@ import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.StringUtils;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -74,6 +75,7 @@ import java.util.Set;
import java.util.regex.Pattern;
import static
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_NULLABLE;
+import static
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** {@link EventParser} for MySQL Debezium JSON. */
@@ -349,10 +351,20 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
String oldValue = objectValue.toString();
String newValue = oldValue;
- // pay attention to the temporal types
- //
https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-temporal-types
- if (("bytes".equals(mySqlType) && className == null)
- || Bits.LOGICAL_NAME.equals(className)) {
+ if (Bits.LOGICAL_NAME.equals(className)) {
+ // transform little-endian form to normal order
+ //
https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-data-types
+ byte[] littleEndian = Base64.getDecoder().decode(oldValue);
+ byte[] bigEndian = new byte[littleEndian.length];
+ for (int i = 0; i < littleEndian.length; i++) {
+ bigEndian[i] = littleEndian[littleEndian.length - 1 - i];
+ }
+ if (typeMapping.containsMode(TO_STRING)) {
+ newValue = StringUtils.bytesToBinaryString(bigEndian);
+ } else {
+ newValue = Base64.getEncoder().encodeToString(bigEndian);
+ }
+ } else if (("bytes".equals(mySqlType) && className == null)) {
// MySQL binary, varbinary, blob
newValue = new String(Base64.getDecoder().decode(oldValue));
} else if ("bytes".equals(mySqlType) &&
Decimal.LOGICAL_NAME.equals(className)) {
@@ -369,7 +381,10 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
+ "' to 'numeric'",
e);
}
- } else if (Date.SCHEMA_NAME.equals(className)) {
+ }
+ // pay attention to the temporal types
+ //
https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-temporal-types
+ else if (Date.SCHEMA_NAME.equals(className)) {
// MySQL date
newValue =
DateTimeUtils.toLocalDate(Integer.parseInt(oldValue)).toString();
} else if (Timestamp.SCHEMA_NAME.equals(className)) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
index 80ba982bc..b64fc9cad 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
@@ -24,7 +24,6 @@
package org.apache.paimon.flink.action.cdc.mysql;
import org.apache.paimon.flink.action.cdc.TypeMapping;
-import org.apache.paimon.types.BinaryType;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.TimestampType;
@@ -279,10 +278,9 @@ public class MySqlTypeUtils {
case MULTIPOLYGON:
case GEOMETRYCOLLECTION:
return DataTypes.STRING();
+ // MySQL BINARY and VARBINARY are stored as bytes in JSON. We
convert them to
+ // DataTypes.VARBINARY to retain the length information
case BINARY:
- return length == null || length == 0
- ? DataTypes.BINARY(BinaryType.DEFAULT_LENGTH)
- : DataTypes.BINARY(length);
case VARBINARY:
return length == null || length == 0
? DataTypes.VARBINARY(VarBinaryType.DEFAULT_LENGTH)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java
index d0c422c08..1c9322b1d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java
@@ -56,7 +56,7 @@ public class CdcRecordUtils {
DataField dataField = dataFields.get(i);
genericRow.setField(
i,
- TypeUtils.castFromString(
+ TypeUtils.castFromCdcValueString(
record.fields().get(dataField.name()),
dataField.type()));
}
return genericRow;
@@ -100,7 +100,7 @@ public class CdcRecordUtils {
// TODO TypeUtils.castFromString cannot deal with complex types
like arrays and
// maps. Change type of CdcRecord#field if needed.
try {
- genericRow.setField(idx, TypeUtils.castFromString(value,
type));
+ genericRow.setField(idx,
TypeUtils.castFromCdcValueString(value, type));
} catch (Exception e) {
LOG.info(
"Failed to convert value "
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
index d812d8419..78d8f6197 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
@@ -427,7 +427,7 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaActionITCaseBase {
DataTypes.STRING(), // _text
DataTypes.STRING(), // _mediumtext
DataTypes.STRING(), // _longtext
- DataTypes.BINARY(10), // _bin
+ DataTypes.VARBINARY(10), // _bin
DataTypes.VARBINARY(20), // _varbin
DataTypes.BYTES(), // _tinyblob
DataTypes.BYTES(), // _blob
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlCdcTypeMappingITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlCdcTypeMappingITCase.java
index 26e1638d9..9427698e4 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlCdcTypeMappingITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlCdcTypeMappingITCase.java
@@ -111,7 +111,6 @@ public class MySqlCdcTypeMappingITCase extends
MySqlActionITCaseBase {
// --------------------------------------- all-to-string
---------------------------------------
- // TODO: test BIT(n) after fix bit type
@Test
@Timeout(60)
public void testReadAllTypes() throws Exception {
@@ -126,7 +125,7 @@ public class MySqlCdcTypeMappingITCase extends
MySqlActionITCaseBase {
.withTypeMapping(new
TypeMapping(Collections.singleton(TO_STRING)));
runActionWithDefaultEnv(action);
- int allTypeNums = 76;
+ int allTypeNums = 77;
DataType[] types =
IntStream.range(0, allTypeNums)
.mapToObj(i -> DataTypes.STRING())
@@ -141,6 +140,7 @@ public class MySqlCdcTypeMappingITCase extends
MySqlActionITCaseBase {
"_id",
"pt",
"_bit1",
+ "_bit",
"_tinyint1",
"_boolean",
"_bool",
@@ -220,7 +220,7 @@ public class MySqlCdcTypeMappingITCase extends
MySqlActionITCaseBase {
Arrays.asList(
"+I["
+ "1, 1.1, "
- + "true, "
+ + "true,
0000000000000000000000000000000000000000000000000000011111000111, "
+ "1, 1, 0, 1, 2, 3, "
+ "1000, 2000, 3000, "
+ "100000, 200000, 300000, "
@@ -260,7 +260,7 @@ public class MySqlCdcTypeMappingITCase extends
MySqlActionITCaseBase {
+ "]",
"+I["
+ "2, 2.2, "
- + "NULL, "
+ + "NULL, NULL, "
+ "NULL, NULL, NULL, NULL, NULL, NULL, "
+ "NULL, NULL, NULL, "
+ "NULL, NULL, NULL, "
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 7f747b1b8..7725f1385 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -402,7 +402,7 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
DataTypes.STRING(), // _text
DataTypes.STRING(), // _mediumtext
DataTypes.STRING(), // _longtext
- DataTypes.BINARY(10), // _bin
+ DataTypes.VARBINARY(10), // _bin
DataTypes.VARBINARY(20), // _varbin
DataTypes.BYTES(), // _tinyblob
DataTypes.BYTES(), // _blob
@@ -502,11 +502,15 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
"_set",
});
FileStoreTable table = getFileStoreTable();
+ // BIT(64) data: 0B11111000111 -> 0B00000111_11000111
+ String bits =
+ Arrays.toString(
+ new byte[] {0, 0, 0, 0, 0, 0, (byte) 0B00000111,
(byte) 0B11000111});
List<String> expected =
Arrays.asList(
"+I["
+ "1, 1.1, "
- + "true, [-17, -65, -67, 7, 0, 0, 0, 0, 0, 0],
"
+ + String.format("true, %s, ", bits)
+ "true, true, false, 1, 2, 3, "
+ "1000, 2000, 3000, "
+ "100000, 200000, 300000, "
diff --git
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/type_mapping_test_setup.sql
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/type_mapping_test_setup.sql
index baf76b9d5..2bd004f2d 100644
---
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/type_mapping_test_setup.sql
+++
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/type_mapping_test_setup.sql
@@ -51,7 +51,7 @@ CREATE TABLE all_types_table (
pt DECIMAL(2, 1),
-- BIT
_bit1 BIT,
- -- _bit BIT(64), TODO
+ _bit BIT(64),
-- TINYINT
_tinyint1 TINYINT(1),
_boolean BOOLEAN,
@@ -152,7 +152,7 @@ CREATE TABLE all_types_table (
INSERT INTO all_types_table VALUES (
1, 1.1,
-- BIT
- 1, -- B'11111000111', TODO
+ 1, B'11111000111',
-- TINYINT
true, true, false, 1, 2, 3,
-- SMALLINT
@@ -208,7 +208,7 @@ INSERT INTO all_types_table VALUES (
'a,b'
), (
2, 2.2,
- NULL,
+ NULL, NULL,
NULL, NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL,
NULL, NULL, NULL,