This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 96ec8d450 [cdc] support bit type for flink cdc (#1708)
96ec8d450 is described below
commit 96ec8d4500d4109a2d0e5058f0456dba33a58137
Author: JunZhang <[email protected]>
AuthorDate: Mon Aug 14 17:31:32 2023 +0800
[cdc] support bit type for flink cdc (#1708)
---
paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java | 1 +
.../paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java | 4 +++-
.../org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java | 5 +++++
.../paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java | 6 ++++++
.../src/test/resources/mysql/sync_table_setup.sql | 6 ++++++
5 files changed, 21 insertions(+), 1 deletion(-)
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
index f27b89a18..e6581aa1e 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
@@ -65,6 +65,7 @@ public class TypeUtils {
case BOOLEAN:
return BinaryStringUtils.toBoolean(str);
case BINARY:
+ return s.getBytes();
case VARBINARY:
int binaryLength = DataTypeChecks.getLength(type);
byte[] bytes = s.getBytes();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index 418db64d2..35fd888c4 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -40,6 +40,7 @@ import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeRefe
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import io.debezium.data.Bits;
import io.debezium.data.geometry.Geometry;
import io.debezium.data.geometry.Point;
import io.debezium.relational.history.TableChanges;
@@ -349,7 +350,8 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
// pay attention to the temporal types
//
https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-temporal-types
- if ("bytes".equals(mySqlType) && className == null) {
+ if (("bytes".equals(mySqlType) && className == null)
+ || Bits.LOGICAL_NAME.equals(className)) {
// MySQL binary, varbinary, blob
newValue = new String(Base64.getDecoder().decode(oldValue));
} else if ("bytes".equals(mySqlType) &&
Decimal.LOGICAL_NAME.equals(className)) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
index bd288cee5..f7b696a89 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
@@ -152,6 +152,11 @@ public class MySqlTypeUtils {
Boolean tinyInt1ToBool) {
switch (type.toUpperCase()) {
case BIT:
+ if (length == null || length == 1) {
+ return DataTypes.BOOLEAN();
+ } else {
+ return DataTypes.BINARY((length + 7) / 8);
+ }
case BOOLEAN:
case BOOL:
return DataTypes.BOOLEAN();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index b4cf2131d..c4b683775 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -378,6 +378,8 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
new DataType[] {
DataTypes.INT().notNull(), // _id
DataTypes.DECIMAL(2, 1).notNull(), // pt
+ DataTypes.BOOLEAN(), // _bit1
+ DataTypes.BINARY(8), // _bit
DataTypes.BOOLEAN(), // _tinyint1
DataTypes.BOOLEAN(), // _boolean
DataTypes.BOOLEAN(), // _bool
@@ -455,6 +457,8 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
new String[] {
"_id",
"pt",
+ "_bit1",
+ "_bit",
"_tinyint1",
"_boolean",
"_bool",
@@ -534,6 +538,7 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
Arrays.asList(
"+I["
+ "1, 1.1, "
+ + "true, [-17, -65, -67, 7, 0, 0, 0, 0, 0, 0],
"
+ "true, true, false, 1, 2, 3, "
+ "1000, 2000, 3000, "
+ "100000, 200000, 300000, "
@@ -578,6 +583,7 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
+ "]",
"+I["
+ "2, 2.2, "
+ + "NULL, NULL, "
+ "NULL, NULL, NULL, NULL, NULL, NULL, "
+ "NULL, NULL, NULL, "
+ "NULL, NULL, NULL, "
diff --git
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
index e10df0afa..8421b6c67 100644
---
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
+++
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql
@@ -52,6 +52,9 @@ CREATE TABLE schema_evolution_multiple (
CREATE TABLE all_types_table (
_id INT,
pt DECIMAL(2, 1),
+ -- BIT
+ _bit1 BIT,
+ _bit BIT(64),
-- TINYINT
_tinyint1 TINYINT(1),
_boolean BOOLEAN,
@@ -152,6 +155,8 @@ CREATE TABLE all_types_table (
INSERT INTO all_types_table VALUES (
1, 1.1,
+ -- BIT
+ 1, B'11111000111',
-- TINYINT
true, true, false, 1, 2, 3,
-- SMALLINT
@@ -207,6 +212,7 @@ INSERT INTO all_types_table VALUES (
'a,b'
), (
2, 2.2,
+ NULL, NULL,
NULL, NULL, NULL, NULL, NULL, NULL,
NULL, NULL, NULL,
NULL, NULL, NULL,