This is an automated email from the ASF dual-hosted git repository.

ruanhang1993 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 07446d1f9 [FLINK-35715][common] Ignore the compact optimize for mysql 
timestamp type in BinaryRecordData (#3511)
07446d1f9 is described below

commit 07446d1f9de23b6d5e7b50de1b00ba901ffd31db
Author: Hang Ruan <[email protected]>
AuthorDate: Fri Aug 9 14:28:43 2024 +0800

    [FLINK-35715][common] Ignore the compact optimize for mysql timestamp type 
in BinaryRecordData (#3511)
---
 .../cdc/common/data/binary/BinaryRecordData.java   | 54 -------------
 .../mysql/source/MySqlFullTypesITCase.java         | 89 ++++++++++++++++++++++
 .../mysql/source/MySqlMetadataAccessorITCase.java  |  2 +-
 .../src/test/resources/ddl/column_type_test.sql    | 35 ++++++++-
 .../test/resources/ddl/column_type_test_mysql8.sql | 35 ++++++++-
 .../data/writer/AbstractBinaryWriter.java          | 56 ++++++--------
 6 files changed, 182 insertions(+), 89 deletions(-)

diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/binary/BinaryRecordData.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/binary/BinaryRecordData.java
index a2d91bbd8..0b02ad7e1 100644
--- 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/binary/BinaryRecordData.java
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/binary/BinaryRecordData.java
@@ -26,12 +26,6 @@ import org.apache.flink.cdc.common.data.RecordData;
 import org.apache.flink.cdc.common.data.StringData;
 import org.apache.flink.cdc.common.data.TimestampData;
 import org.apache.flink.cdc.common.data.ZonedTimestampData;
-import org.apache.flink.cdc.common.types.DataType;
-import org.apache.flink.cdc.common.types.DataTypeRoot;
-import org.apache.flink.cdc.common.types.DecimalType;
-import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
-import org.apache.flink.cdc.common.types.TimestampType;
-import org.apache.flink.cdc.common.types.ZonedTimestampType;
 import org.apache.flink.cdc.common.utils.Preconditions;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
@@ -72,45 +66,6 @@ public final class BinaryRecordData extends BinarySection 
implements RecordData,
         return ((arity + 63 + HEADER_SIZE_IN_BITS) / 64) * 8;
     }
 
-    public static int calculateFixPartSizeInBytes(int arity) {
-        return calculateBitSetWidthInBytes(arity) + 8 * arity;
-    }
-
-    /**
-     * If it is a fixed-length field, we can call this BinaryRecordData's 
setXX method for in-place
-     * updates. If it is variable-length field, can't use this method, because 
the underlying data
-     * is stored continuously.
-     */
-    public static boolean isInFixedLengthPart(DataType type) {
-        switch (type.getTypeRoot()) {
-            case BOOLEAN:
-            case TINYINT:
-            case SMALLINT:
-            case INTEGER:
-            case DATE:
-            case TIME_WITHOUT_TIME_ZONE:
-            case BIGINT:
-            case FLOAT:
-            case DOUBLE:
-                return true;
-            case DECIMAL:
-                return DecimalData.isCompact(((DecimalType) 
type).getPrecision());
-            case TIMESTAMP_WITHOUT_TIME_ZONE:
-                return TimestampData.isCompact(((TimestampType) 
type).getPrecision());
-            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-                return LocalZonedTimestampData.isCompact(
-                        ((LocalZonedTimestampType) type).getPrecision());
-            case TIMESTAMP_WITH_TIME_ZONE:
-                return ZonedTimestampData.isCompact(((ZonedTimestampType) 
type).getPrecision());
-            default:
-                return false;
-        }
-    }
-
-    public static boolean isMutable(DataType type) {
-        return isInFixedLengthPart(type) || type.getTypeRoot() == 
DataTypeRoot.DECIMAL;
-    }
-
     private final int arity;
     private final int nullBitsSizeInBytes;
 
@@ -213,10 +168,6 @@ public final class BinaryRecordData extends BinarySection 
implements RecordData,
     public TimestampData getTimestamp(int pos, int precision) {
         assertIndexIsValid(pos);
 
-        if (TimestampData.isCompact(precision)) {
-            return 
TimestampData.fromMillis(segments[0].getLong(getFieldOffset(pos)));
-        }
-
         int fieldOffset = getFieldOffset(pos);
         final long offsetAndNanoOfMilli = segments[0].getLong(fieldOffset);
         return BinarySegmentUtils.readTimestampData(segments, offset, 
offsetAndNanoOfMilli);
@@ -233,11 +184,6 @@ public final class BinaryRecordData extends BinarySection 
implements RecordData,
     public LocalZonedTimestampData getLocalZonedTimestampData(int pos, int 
precision) {
         assertIndexIsValid(pos);
 
-        if (LocalZonedTimestampData.isCompact(precision)) {
-            return LocalZonedTimestampData.fromEpochMillis(
-                    segments[0].getLong(getFieldOffset(pos)));
-        }
-
         int fieldOffset = getFieldOffset(pos);
         final long offsetAndNanoOfMilli = segments[0].getLong(fieldOffset);
         return BinarySegmentUtils.readLocalZonedTimestampData(
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java
index 41613a86e..91351dabf 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java
@@ -223,6 +223,95 @@ public class MySqlFullTypesITCase extends 
MySqlSourceTestBase {
                 fullTypesMySql8Database, recordType, expectedSnapshot, 
expectedStreamRecord);
     }
 
+    @Test
+    public void testMysql57PrecisionTypes() throws Throwable {
+        testMysqlPrecisionTypes(fullTypesMySql57Database);
+    }
+
+    @Test
+    public void testMysql8PrecisionTypes() throws Throwable {
+        testMysqlPrecisionTypes(fullTypesMySql8Database);
+    }
+
+    public void testMysqlPrecisionTypes(UniqueDatabase database) throws 
Throwable {
+        RowType recordType =
+                RowType.of(
+                        DataTypes.DECIMAL(20, 0).notNull(),
+                        DataTypes.DECIMAL(6, 2),
+                        DataTypes.DECIMAL(9, 4),
+                        DataTypes.DECIMAL(20, 4),
+                        DataTypes.TIME(0),
+                        DataTypes.TIME(3),
+                        DataTypes.TIME(6),
+                        DataTypes.TIMESTAMP(0),
+                        DataTypes.TIMESTAMP(3),
+                        DataTypes.TIMESTAMP(6),
+                        DataTypes.TIMESTAMP_LTZ(0),
+                        DataTypes.TIMESTAMP_LTZ(3),
+                        DataTypes.TIMESTAMP_LTZ(6),
+                        DataTypes.TIMESTAMP_LTZ(0));
+
+        Object[] expectedSnapshot =
+                new Object[] {
+                    DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0),
+                    DecimalData.fromBigDecimal(new BigDecimal("123.4"), 6, 2),
+                    DecimalData.fromBigDecimal(new BigDecimal("1234.5"), 9, 4),
+                    DecimalData.fromBigDecimal(new BigDecimal("1234.56"), 20, 
4),
+                    64800000,
+                    64822100,
+                    64822100,
+                    TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 
18:00:00")),
+                    TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 
18:00:22")),
+                    TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 
18:00:22")),
+                    LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 
18:00:00")),
+                    LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 
18:00:22")),
+                    LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 
18:00:22"))
+                };
+
+        Object[] expectedStreamRecord =
+                new Object[] {
+                    DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0),
+                    DecimalData.fromBigDecimal(new BigDecimal("123.4"), 6, 2),
+                    DecimalData.fromBigDecimal(new BigDecimal("1234.5"), 9, 4),
+                    DecimalData.fromBigDecimal(new BigDecimal("1234.56"), 20, 
4),
+                    64800000,
+                    64822100,
+                    null,
+                    TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 
18:00:00")),
+                    TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 
18:00:22")),
+                    TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 
18:00:22")),
+                    LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 
18:00:00")),
+                    LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 
18:00:22")),
+                    LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 
18:00:22"))
+                };
+
+        database.createAndInitialize();
+        CloseableIterator<Event> iterator =
+                env.fromSource(
+                                getFlinkSourceProvider(new String[] 
{"precision_types"}, database)
+                                        .getSource(),
+                                WatermarkStrategy.noWatermarks(),
+                                "Event-Source")
+                        .executeAndCollect();
+
+        // skip CreateTableEvent
+        List<Event> snapshotResults = 
MySqSourceTestUtils.fetchResults(iterator, 2);
+        RecordData snapshotRecord = ((DataChangeEvent) 
snapshotResults.get(1)).after();
+
+        Assertions.assertThat(RecordDataTestUtils.recordFields(snapshotRecord, 
recordType))
+                .isEqualTo(expectedSnapshot);
+
+        try (Connection connection = database.getJdbcConnection();
+                Statement statement = connection.createStatement()) {
+            statement.execute("UPDATE precision_types SET time_6_c = null 
WHERE id = 1;");
+        }
+
+        List<Event> streamResults = MySqSourceTestUtils.fetchResults(iterator, 
1);
+        RecordData streamRecord = ((DataChangeEvent) 
streamResults.get(0)).after();
+        Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, 
recordType))
+                .isEqualTo(expectedStreamRecord);
+    }
+
     private void testCommonDataTypes(UniqueDatabase database) throws Exception 
{
         database.createAndInitialize();
         CloseableIterator<Event> iterator =
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java
index 4b45e1079..3d3f0276b 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java
@@ -210,7 +210,7 @@ public class MySqlMetadataAccessorITCase extends 
MySqlSourceTestBase {
     private void testAccessDatabaseAndTable(UniqueDatabase database) {
         database.createAndInitialize();
 
-        String[] tables = new String[] {"common_types", "time_types"};
+        String[] tables = new String[] {"common_types", "time_types", 
"precision_types"};
         MySqlMetadataAccessor metadataAccessor = getMetadataAccessor(tables, 
database);
 
         assertThatThrownBy(metadataAccessor::listNamespaces)
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test.sql
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test.sql
index a83e8710e..08d25a714 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test.sql
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test.sql
@@ -123,4 +123,37 @@ VALUES (DEFAULT,
         '2020-07-17 18:00:22.123',
         '2020-07-17 18:00:22.123456',
         '2020-07-17 18:00:22',
-        NULL);
\ No newline at end of file
+        NULL);
+
+CREATE TABLE precision_types
+(
+    id                   SERIAL,
+    decimal_c0           DECIMAL(6, 2),
+    decimal_c1           DECIMAL(9, 4),
+    decimal_c2           DECIMAL(20, 4),
+    time_c               TIME(0),
+    time_3_c             TIME(3),
+    time_6_c             TIME(6),
+    datetime_c           DATETIME(0),
+    datetime3_c          DATETIME(3),
+    datetime6_c          DATETIME(6),
+    timestamp_c          TIMESTAMP(0) NULL,
+    timestamp3_c         TIMESTAMP(3) NULL,
+    timestamp6_c         TIMESTAMP(6) NULL,
+    PRIMARY KEY (id)
+) DEFAULT CHARSET=utf8;
+
+INSERT INTO precision_types
+VALUES (DEFAULT,
+        123.4,
+        1234.5,
+        1234.56,
+        '18:00',
+        '18:00:22.1',
+        '18:00:22.1',
+        '2020-07-17 18:00',
+        '2020-07-17 18:00:22',
+        '2020-07-17 18:00:22',
+        '2020-07-17 18:00',
+        '2020-07-17 18:00:22',
+        '2020-07-17 18:00:22');
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test_mysql8.sql
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test_mysql8.sql
index 66b75bfef..54c6c7170 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test_mysql8.sql
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test_mysql8.sql
@@ -127,4 +127,37 @@ VALUES (DEFAULT,
         '2020-07-17 18:00:22',
         '2020-07-17 18:00:22.123',
         '2020-07-17 18:00:22.123456',
-        NULL);
\ No newline at end of file
+        NULL);
+
+CREATE TABLE precision_types
+(
+    id                   SERIAL,
+    decimal_c0           DECIMAL(6, 2),
+    decimal_c1           DECIMAL(9, 4),
+    decimal_c2           DECIMAL(20, 4),
+    time_c               TIME(0),
+    time_3_c             TIME(3),
+    time_6_c             TIME(6),
+    datetime_c           DATETIME(0),
+    datetime3_c          DATETIME(3),
+    datetime6_c          DATETIME(6),
+    timestamp_c          TIMESTAMP(0),
+    timestamp3_c         TIMESTAMP(3),
+    timestamp6_c         TIMESTAMP(6),
+    PRIMARY KEY (id)
+) DEFAULT CHARSET=utf8;
+
+INSERT INTO precision_types
+VALUES (DEFAULT,
+        123.4,
+        1234.5,
+        1234.56,
+        '18:00',
+        '18:00:22.1',
+        '18:00:22.1',
+        '2020-07-17 18:00',
+        '2020-07-17 18:00:22',
+        '2020-07-17 18:00:22',
+        '2020-07-17 18:00',
+        '2020-07-17 18:00:22',
+        '2020-07-17 18:00:22');
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/AbstractBinaryWriter.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/AbstractBinaryWriter.java
index 1422935ea..44636fd7e 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/AbstractBinaryWriter.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/AbstractBinaryWriter.java
@@ -173,46 +173,38 @@ abstract class AbstractBinaryWriter implements 
BinaryWriter {
 
     @Override
     public void writeTimestamp(int pos, TimestampData value, int precision) {
-        if (TimestampData.isCompact(precision)) {
-            writeLong(pos, value.getMillisecond());
-        } else {
-            // store the nanoOfMillisecond in fixed-length part as offset and 
nanoOfMillisecond
-            ensureCapacity(8);
+        // store the nanoOfMillisecond in fixed-length part as offset and 
nanoOfMillisecond
+        ensureCapacity(8);
 
-            if (value == null) {
-                setNullBit(pos);
-                // zero-out the bytes
-                segment.putLong(cursor, 0L);
-                setOffsetAndSize(pos, cursor, 0);
-            } else {
-                segment.putLong(cursor, value.getMillisecond());
-                setOffsetAndSize(pos, cursor, value.getNanoOfMillisecond());
-            }
-
-            cursor += 8;
+        if (value == null) {
+            setNullBit(pos);
+            // zero-out the bytes
+            segment.putLong(cursor, 0L);
+            setOffsetAndSize(pos, cursor, 0);
+        } else {
+            segment.putLong(cursor, value.getMillisecond());
+            setOffsetAndSize(pos, cursor, value.getNanoOfMillisecond());
         }
+
+        cursor += 8;
     }
 
     @Override
     public void writeLocalZonedTimestamp(int pos, LocalZonedTimestampData 
value, int precision) {
-        if (LocalZonedTimestampData.isCompact(precision)) {
-            writeLong(pos, value.getEpochMillisecond());
-        } else {
-            // store the nanoOfMillisecond in fixed-length part as offset and 
nanoOfMillisecond
-            ensureCapacity(8);
+        // store the nanoOfMillisecond in fixed-length part as offset and 
nanoOfMillisecond
+        ensureCapacity(8);
 
-            if (value == null) {
-                setNullBit(pos);
-                // zero-out the bytes
-                segment.putLong(cursor, 0L);
-                setOffsetAndSize(pos, cursor, 0);
-            } else {
-                segment.putLong(cursor, value.getEpochMillisecond());
-                setOffsetAndSize(pos, cursor, 
value.getEpochNanoOfMillisecond());
-            }
-
-            cursor += 8;
+        if (value == null) {
+            setNullBit(pos);
+            // zero-out the bytes
+            segment.putLong(cursor, 0L);
+            setOffsetAndSize(pos, cursor, 0);
+        } else {
+            segment.putLong(cursor, value.getEpochMillisecond());
+            setOffsetAndSize(pos, cursor, value.getEpochNanoOfMillisecond());
         }
+
+        cursor += 8;
     }
 
     @Override

Reply via email to