This is an automated email from the ASF dual-hosted git repository.

renqs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 18dd51fe2 [FLINK-34905][cdc-connector][mysql] Fix the default length 
of CHAR/BINARY data type of Add column DDL (#3145)
18dd51fe2 is described below

commit 18dd51fe22c26347bc9988d2fb8b6a78c2e922b5
Author: Qishang Zhong <[email protected]>
AuthorDate: Thu Apr 11 13:42:30 2024 +0800

    [FLINK-34905][cdc-connector][mysql] Fix the default length of CHAR/BINARY 
data type of Add column DDL (#3145)
---
 .../apache/flink/cdc/common/types/BinaryType.java  | 21 ++++++++++-
 .../apache/flink/cdc/common/types/CharType.java    | 21 ++++++++++-
 .../cdc/connectors/mysql/utils/MySqlTypeUtils.java | 10 ++++-
 .../mysql/source/MySqlPipelineITCase.java          | 44 ++++++++++++++++++++++
 .../serializer/schema/DataTypeSerializer.java      | 10 ++++-
 5 files changed, 100 insertions(+), 6 deletions(-)

diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/BinaryType.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/BinaryType.java
index df478da61..491f1b14a 100644
--- 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/BinaryType.java
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/BinaryType.java
@@ -65,13 +65,32 @@ public final class BinaryType extends DataType {
         this(DEFAULT_LENGTH);
     }
 
+    /** Helper constructor for {@link #ofEmptyLiteral()} and {@link 
#copy(boolean)}. */
+    private BinaryType(int length, boolean isNullable) {
+        super(isNullable, DataTypeRoot.BINARY);
+        this.length = length;
+    }
+
+    /**
+     * The SQL standard defines that character string literals are allowed to 
be zero-length strings
+     * (i.e., to contain no characters) even though it is not permitted to 
declare a type that is
+     * zero. For consistent behavior, the same logic applies to binary strings.
+     *
+     * <p>This method enables this special kind of binary string.
+     *
+     * <p>Zero-length binary strings have no serializable string 
representation.
+     */
+    public static BinaryType ofEmptyLiteral() {
+        return new BinaryType(EMPTY_LITERAL_LENGTH, false);
+    }
+
     public int getLength() {
         return length;
     }
 
     @Override
     public DataType copy(boolean isNullable) {
-        return new BinaryType(isNullable, length);
+        return new BinaryType(length, isNullable);
     }
 
     @Override
diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/CharType.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/CharType.java
index a8ccaa322..8c2079649 100644
--- 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/CharType.java
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/CharType.java
@@ -64,13 +64,32 @@ public class CharType extends DataType {
         this(DEFAULT_LENGTH);
     }
 
+    /** Helper constructor for {@link #ofEmptyLiteral()} and {@link 
#copy(boolean)}. */
+    private CharType(int length, boolean isNullable) {
+        super(isNullable, DataTypeRoot.CHAR);
+        this.length = length;
+    }
+
+    /**
+     * The SQL standard defines that character string literals are allowed to 
be zero-length strings
+     * (i.e., to contain no characters) even though it is not permitted to 
declare a type that is
+     * zero.
+     *
+     * <p>This method enables this special kind of character string.
+     *
+     * <p>Zero-length character strings have no serializable string 
representation.
+     */
+    public static CharType ofEmptyLiteral() {
+        return new CharType(EMPTY_LITERAL_LENGTH, false);
+    }
+
     public int getLength() {
         return length;
     }
 
     @Override
     public DataType copy(boolean isNullable) {
-        return new CharType(isNullable, length);
+        return new CharType(length, isNullable);
     }
 
     @Override
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java
index a3bb16ea3..0e70ed6d9 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.cdc.connectors.mysql.utils;
 
+import org.apache.flink.cdc.common.types.BinaryType;
+import org.apache.flink.cdc.common.types.CharType;
 import org.apache.flink.cdc.common.types.DataType;
 import org.apache.flink.cdc.common.types.DataTypes;
 
@@ -198,7 +200,9 @@ public class MySqlTypeUtils {
                         ? DataTypes.TIMESTAMP_LTZ(column.length())
                         : DataTypes.TIMESTAMP_LTZ(0);
             case CHAR:
-                return DataTypes.CHAR(column.length());
+                return column.length() > 0
+                        ? DataTypes.CHAR(column.length())
+                        : column.length() == 0 ? CharType.ofEmptyLiteral() : 
DataTypes.CHAR(1);
             case VARCHAR:
                 return DataTypes.VARCHAR(column.length());
             case TINYTEXT:
@@ -218,7 +222,9 @@ public class MySqlTypeUtils {
             case MULTILINESTRING:
                 return DataTypes.STRING();
             case BINARY:
-                return DataTypes.BINARY(column.length());
+                return column.length() > 0
+                        ? DataTypes.BINARY(column.length())
+                        : column.length() == 0 ? BinaryType.ofEmptyLiteral() : 
DataTypes.BINARY(1);
             case VARBINARY:
                 return DataTypes.VARBINARY(column.length());
             case TINYBLOB:
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
index e09ac58b4..5eb6ce0e5 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
@@ -31,6 +31,8 @@ import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.source.FlinkSourceProvider;
+import org.apache.flink.cdc.common.types.BinaryType;
+import org.apache.flink.cdc.common.types.CharType;
 import org.apache.flink.cdc.common.types.DataType;
 import org.apache.flink.cdc.common.types.DataTypes;
 import org.apache.flink.cdc.common.types.RowType;
@@ -301,6 +303,48 @@ public class MySqlPipelineITCase extends 
MySqlSourceTestBase {
                             Collections.singletonList(
                                     new AddColumnEvent.ColumnWithPosition(
                                             Column.physicalColumn("cols5", 
DataTypes.BOOLEAN())))));
+            statement.execute(
+                    String.format(
+                            "ALTER TABLE `%s`.`products` ADD COLUMN `cols6` 
BINARY(0) NULL;",
+                            inventoryDatabase.getDatabaseName()));
+            expected.add(
+                    new AddColumnEvent(
+                            tableId,
+                            Collections.singletonList(
+                                    new AddColumnEvent.ColumnWithPosition(
+                                            Column.physicalColumn(
+                                                    "cols6", 
BinaryType.ofEmptyLiteral())))));
+            statement.execute(
+                    String.format(
+                            "ALTER TABLE `%s`.`products` ADD COLUMN `cols7` 
BINARY NULL;",
+                            inventoryDatabase.getDatabaseName()));
+            expected.add(
+                    new AddColumnEvent(
+                            tableId,
+                            Collections.singletonList(
+                                    new AddColumnEvent.ColumnWithPosition(
+                                            Column.physicalColumn("cols7", 
DataTypes.BINARY(1))))));
+            statement.execute(
+                    String.format(
+                            "ALTER TABLE `%s`.`products` ADD COLUMN `cols8` 
CHAR(0) NULL;",
+                            inventoryDatabase.getDatabaseName()));
+            expected.add(
+                    new AddColumnEvent(
+                            tableId,
+                            Collections.singletonList(
+                                    new AddColumnEvent.ColumnWithPosition(
+                                            Column.physicalColumn(
+                                                    "cols8", 
CharType.ofEmptyLiteral())))));
+            statement.execute(
+                    String.format(
+                            "ALTER TABLE `%s`.`products` ADD COLUMN `cols9` 
CHAR NULL;",
+                            inventoryDatabase.getDatabaseName()));
+            expected.add(
+                    new AddColumnEvent(
+                            tableId,
+                            Collections.singletonList(
+                                    new AddColumnEvent.ColumnWithPosition(
+                                            Column.physicalColumn("cols9", 
DataTypes.CHAR(1))))));
         }
         List<Event> actual = fetchResults(events, expected.size());
         assertThat(actual).isEqualTo(expected);
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/DataTypeSerializer.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/DataTypeSerializer.java
index d977ef276..5d6779cc0 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/DataTypeSerializer.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/DataTypeSerializer.java
@@ -183,7 +183,10 @@ public class DataTypeSerializer extends 
TypeSerializer<DataType> {
         boolean isNullable = source.readBoolean();
         switch (dataTypeClass) {
             case BINARY:
-                return new BinaryType(isNullable, source.readInt());
+                int binaryLength = source.readInt();
+                return binaryLength == 0
+                        ? BinaryType.ofEmptyLiteral()
+                        : new BinaryType(isNullable, binaryLength);
             case ARRAY:
                 return new ArrayType(isNullable, this.deserialize(source));
             case BOOLEAN:
@@ -197,7 +200,10 @@ public class DataTypeSerializer extends 
TypeSerializer<DataType> {
             case VARBINARY:
                 return new VarBinaryType(isNullable, source.readInt());
             case CHAR:
-                return new CharType(isNullable, source.readInt());
+                int charLength = source.readInt();
+                return charLength == 0
+                        ? CharType.ofEmptyLiteral()
+                        : new CharType(isNullable, charLength);
             case SMALLINT:
                 return new SmallIntType(isNullable);
             case TIMESTAMP:

Reply via email to