This is an automated email from the ASF dual-hosted git repository.
ruanhang1993 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 07446d1f9 [FLINK-35715][common] Ignore the compact optimize for mysql
timestamp type in BinaryRecordData (#3511)
07446d1f9 is described below
commit 07446d1f9de23b6d5e7b50de1b00ba901ffd31db
Author: Hang Ruan <[email protected]>
AuthorDate: Fri Aug 9 14:28:43 2024 +0800
[FLINK-35715][common] Ignore the compact optimize for mysql timestamp type
in BinaryRecordData (#3511)
---
.../cdc/common/data/binary/BinaryRecordData.java | 54 -------------
.../mysql/source/MySqlFullTypesITCase.java | 89 ++++++++++++++++++++++
.../mysql/source/MySqlMetadataAccessorITCase.java | 2 +-
.../src/test/resources/ddl/column_type_test.sql | 35 ++++++++-
.../test/resources/ddl/column_type_test_mysql8.sql | 35 ++++++++-
.../data/writer/AbstractBinaryWriter.java | 56 ++++++--------
6 files changed, 182 insertions(+), 89 deletions(-)
diff --git
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/binary/BinaryRecordData.java
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/binary/BinaryRecordData.java
index a2d91bbd8..0b02ad7e1 100644
---
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/binary/BinaryRecordData.java
+++
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/binary/BinaryRecordData.java
@@ -26,12 +26,6 @@ import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.StringData;
import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.common.data.ZonedTimestampData;
-import org.apache.flink.cdc.common.types.DataType;
-import org.apache.flink.cdc.common.types.DataTypeRoot;
-import org.apache.flink.cdc.common.types.DecimalType;
-import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
-import org.apache.flink.cdc.common.types.TimestampType;
-import org.apache.flink.cdc.common.types.ZonedTimestampType;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
@@ -72,45 +66,6 @@ public final class BinaryRecordData extends BinarySection
implements RecordData,
return ((arity + 63 + HEADER_SIZE_IN_BITS) / 64) * 8;
}
- public static int calculateFixPartSizeInBytes(int arity) {
- return calculateBitSetWidthInBytes(arity) + 8 * arity;
- }
-
- /**
- * If it is a fixed-length field, we can call this BinaryRecordData's
setXX method for in-place
- * updates. If it is variable-length field, can't use this method, because
the underlying data
- * is stored continuously.
- */
- public static boolean isInFixedLengthPart(DataType type) {
- switch (type.getTypeRoot()) {
- case BOOLEAN:
- case TINYINT:
- case SMALLINT:
- case INTEGER:
- case DATE:
- case TIME_WITHOUT_TIME_ZONE:
- case BIGINT:
- case FLOAT:
- case DOUBLE:
- return true;
- case DECIMAL:
- return DecimalData.isCompact(((DecimalType)
type).getPrecision());
- case TIMESTAMP_WITHOUT_TIME_ZONE:
- return TimestampData.isCompact(((TimestampType)
type).getPrecision());
- case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
- return LocalZonedTimestampData.isCompact(
- ((LocalZonedTimestampType) type).getPrecision());
- case TIMESTAMP_WITH_TIME_ZONE:
- return ZonedTimestampData.isCompact(((ZonedTimestampType)
type).getPrecision());
- default:
- return false;
- }
- }
-
- public static boolean isMutable(DataType type) {
- return isInFixedLengthPart(type) || type.getTypeRoot() ==
DataTypeRoot.DECIMAL;
- }
-
private final int arity;
private final int nullBitsSizeInBytes;
@@ -213,10 +168,6 @@ public final class BinaryRecordData extends BinarySection
implements RecordData,
public TimestampData getTimestamp(int pos, int precision) {
assertIndexIsValid(pos);
- if (TimestampData.isCompact(precision)) {
- return
TimestampData.fromMillis(segments[0].getLong(getFieldOffset(pos)));
- }
-
int fieldOffset = getFieldOffset(pos);
final long offsetAndNanoOfMilli = segments[0].getLong(fieldOffset);
return BinarySegmentUtils.readTimestampData(segments, offset,
offsetAndNanoOfMilli);
@@ -233,11 +184,6 @@ public final class BinaryRecordData extends BinarySection
implements RecordData,
public LocalZonedTimestampData getLocalZonedTimestampData(int pos, int
precision) {
assertIndexIsValid(pos);
- if (LocalZonedTimestampData.isCompact(precision)) {
- return LocalZonedTimestampData.fromEpochMillis(
- segments[0].getLong(getFieldOffset(pos)));
- }
-
int fieldOffset = getFieldOffset(pos);
final long offsetAndNanoOfMilli = segments[0].getLong(fieldOffset);
return BinarySegmentUtils.readLocalZonedTimestampData(
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java
index 41613a86e..91351dabf 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java
@@ -223,6 +223,95 @@ public class MySqlFullTypesITCase extends
MySqlSourceTestBase {
fullTypesMySql8Database, recordType, expectedSnapshot,
expectedStreamRecord);
}
+ @Test
+ public void testMysql57PrecisionTypes() throws Throwable {
+ testMysqlPrecisionTypes(fullTypesMySql57Database);
+ }
+
+ @Test
+ public void testMysql8PrecisionTypes() throws Throwable {
+ testMysqlPrecisionTypes(fullTypesMySql8Database);
+ }
+
+ public void testMysqlPrecisionTypes(UniqueDatabase database) throws
Throwable {
+ RowType recordType =
+ RowType.of(
+ DataTypes.DECIMAL(20, 0).notNull(),
+ DataTypes.DECIMAL(6, 2),
+ DataTypes.DECIMAL(9, 4),
+ DataTypes.DECIMAL(20, 4),
+ DataTypes.TIME(0),
+ DataTypes.TIME(3),
+ DataTypes.TIME(6),
+ DataTypes.TIMESTAMP(0),
+ DataTypes.TIMESTAMP(3),
+ DataTypes.TIMESTAMP(6),
+ DataTypes.TIMESTAMP_LTZ(0),
+ DataTypes.TIMESTAMP_LTZ(3),
+ DataTypes.TIMESTAMP_LTZ(6),
+ DataTypes.TIMESTAMP_LTZ(0));
+
+ Object[] expectedSnapshot =
+ new Object[] {
+ DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0),
+ DecimalData.fromBigDecimal(new BigDecimal("123.4"), 6, 2),
+ DecimalData.fromBigDecimal(new BigDecimal("1234.5"), 9, 4),
+ DecimalData.fromBigDecimal(new BigDecimal("1234.56"), 20,
4),
+ 64800000,
+ 64822100,
+ 64822100,
+ TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17
18:00:00")),
+ TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17
18:00:22")),
+ TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17
18:00:22")),
+ LocalZonedTimestampData.fromInstant(toInstant("2020-07-17
18:00:00")),
+ LocalZonedTimestampData.fromInstant(toInstant("2020-07-17
18:00:22")),
+ LocalZonedTimestampData.fromInstant(toInstant("2020-07-17
18:00:22"))
+ };
+
+ Object[] expectedStreamRecord =
+ new Object[] {
+ DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0),
+ DecimalData.fromBigDecimal(new BigDecimal("123.4"), 6, 2),
+ DecimalData.fromBigDecimal(new BigDecimal("1234.5"), 9, 4),
+ DecimalData.fromBigDecimal(new BigDecimal("1234.56"), 20,
4),
+ 64800000,
+ 64822100,
+ null,
+ TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17
18:00:00")),
+ TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17
18:00:22")),
+ TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17
18:00:22")),
+ LocalZonedTimestampData.fromInstant(toInstant("2020-07-17
18:00:00")),
+ LocalZonedTimestampData.fromInstant(toInstant("2020-07-17
18:00:22")),
+ LocalZonedTimestampData.fromInstant(toInstant("2020-07-17
18:00:22"))
+ };
+
+ database.createAndInitialize();
+ CloseableIterator<Event> iterator =
+ env.fromSource(
+ getFlinkSourceProvider(new String[]
{"precision_types"}, database)
+ .getSource(),
+ WatermarkStrategy.noWatermarks(),
+ "Event-Source")
+ .executeAndCollect();
+
+ // skip CreateTableEvent
+ List<Event> snapshotResults =
MySqSourceTestUtils.fetchResults(iterator, 2);
+ RecordData snapshotRecord = ((DataChangeEvent)
snapshotResults.get(1)).after();
+
+ Assertions.assertThat(RecordDataTestUtils.recordFields(snapshotRecord,
recordType))
+ .isEqualTo(expectedSnapshot);
+
+ try (Connection connection = database.getJdbcConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("UPDATE precision_types SET time_6_c = null
WHERE id = 1;");
+ }
+
+ List<Event> streamResults = MySqSourceTestUtils.fetchResults(iterator,
1);
+ RecordData streamRecord = ((DataChangeEvent)
streamResults.get(0)).after();
+ Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord,
recordType))
+ .isEqualTo(expectedStreamRecord);
+ }
+
private void testCommonDataTypes(UniqueDatabase database) throws Exception
{
database.createAndInitialize();
CloseableIterator<Event> iterator =
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java
index 4b45e1079..3d3f0276b 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java
@@ -210,7 +210,7 @@ public class MySqlMetadataAccessorITCase extends
MySqlSourceTestBase {
private void testAccessDatabaseAndTable(UniqueDatabase database) {
database.createAndInitialize();
- String[] tables = new String[] {"common_types", "time_types"};
+ String[] tables = new String[] {"common_types", "time_types",
"precision_types"};
MySqlMetadataAccessor metadataAccessor = getMetadataAccessor(tables,
database);
assertThatThrownBy(metadataAccessor::listNamespaces)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test.sql
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test.sql
index a83e8710e..08d25a714 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test.sql
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test.sql
@@ -123,4 +123,37 @@ VALUES (DEFAULT,
'2020-07-17 18:00:22.123',
'2020-07-17 18:00:22.123456',
'2020-07-17 18:00:22',
- NULL);
\ No newline at end of file
+ NULL);
+
+CREATE TABLE precision_types
+(
+ id SERIAL,
+ decimal_c0 DECIMAL(6, 2),
+ decimal_c1 DECIMAL(9, 4),
+ decimal_c2 DECIMAL(20, 4),
+ time_c TIME(0),
+ time_3_c TIME(3),
+ time_6_c TIME(6),
+ datetime_c DATETIME(0),
+ datetime3_c DATETIME(3),
+ datetime6_c DATETIME(6),
+ timestamp_c TIMESTAMP(0) NULL,
+ timestamp3_c TIMESTAMP(3) NULL,
+ timestamp6_c TIMESTAMP(6) NULL,
+ PRIMARY KEY (id)
+) DEFAULT CHARSET=utf8;
+
+INSERT INTO precision_types
+VALUES (DEFAULT,
+ 123.4,
+ 1234.5,
+ 1234.56,
+ '18:00',
+ '18:00:22.1',
+ '18:00:22.1',
+ '2020-07-17 18:00',
+ '2020-07-17 18:00:22',
+ '2020-07-17 18:00:22',
+ '2020-07-17 18:00',
+ '2020-07-17 18:00:22',
+ '2020-07-17 18:00:22');
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test_mysql8.sql
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test_mysql8.sql
index 66b75bfef..54c6c7170 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test_mysql8.sql
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test_mysql8.sql
@@ -127,4 +127,37 @@ VALUES (DEFAULT,
'2020-07-17 18:00:22',
'2020-07-17 18:00:22.123',
'2020-07-17 18:00:22.123456',
- NULL);
\ No newline at end of file
+ NULL);
+
+CREATE TABLE precision_types
+(
+ id SERIAL,
+ decimal_c0 DECIMAL(6, 2),
+ decimal_c1 DECIMAL(9, 4),
+ decimal_c2 DECIMAL(20, 4),
+ time_c TIME(0),
+ time_3_c TIME(3),
+ time_6_c TIME(6),
+ datetime_c DATETIME(0),
+ datetime3_c DATETIME(3),
+ datetime6_c DATETIME(6),
+ timestamp_c TIMESTAMP(0),
+ timestamp3_c TIMESTAMP(3),
+ timestamp6_c TIMESTAMP(6),
+ PRIMARY KEY (id)
+) DEFAULT CHARSET=utf8;
+
+INSERT INTO precision_types
+VALUES (DEFAULT,
+ 123.4,
+ 1234.5,
+ 1234.56,
+ '18:00',
+ '18:00:22.1',
+ '18:00:22.1',
+ '2020-07-17 18:00',
+ '2020-07-17 18:00:22',
+ '2020-07-17 18:00:22',
+ '2020-07-17 18:00',
+ '2020-07-17 18:00:22',
+ '2020-07-17 18:00:22');
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/AbstractBinaryWriter.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/AbstractBinaryWriter.java
index 1422935ea..44636fd7e 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/AbstractBinaryWriter.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/AbstractBinaryWriter.java
@@ -173,46 +173,38 @@ abstract class AbstractBinaryWriter implements
BinaryWriter {
@Override
public void writeTimestamp(int pos, TimestampData value, int precision) {
- if (TimestampData.isCompact(precision)) {
- writeLong(pos, value.getMillisecond());
- } else {
- // store the nanoOfMillisecond in fixed-length part as offset and
nanoOfMillisecond
- ensureCapacity(8);
+ // store the nanoOfMillisecond in fixed-length part as offset and
nanoOfMillisecond
+ ensureCapacity(8);
- if (value == null) {
- setNullBit(pos);
- // zero-out the bytes
- segment.putLong(cursor, 0L);
- setOffsetAndSize(pos, cursor, 0);
- } else {
- segment.putLong(cursor, value.getMillisecond());
- setOffsetAndSize(pos, cursor, value.getNanoOfMillisecond());
- }
-
- cursor += 8;
+ if (value == null) {
+ setNullBit(pos);
+ // zero-out the bytes
+ segment.putLong(cursor, 0L);
+ setOffsetAndSize(pos, cursor, 0);
+ } else {
+ segment.putLong(cursor, value.getMillisecond());
+ setOffsetAndSize(pos, cursor, value.getNanoOfMillisecond());
}
+
+ cursor += 8;
}
@Override
public void writeLocalZonedTimestamp(int pos, LocalZonedTimestampData
value, int precision) {
- if (LocalZonedTimestampData.isCompact(precision)) {
- writeLong(pos, value.getEpochMillisecond());
- } else {
- // store the nanoOfMillisecond in fixed-length part as offset and
nanoOfMillisecond
- ensureCapacity(8);
+ // store the nanoOfMillisecond in fixed-length part as offset and
nanoOfMillisecond
+ ensureCapacity(8);
- if (value == null) {
- setNullBit(pos);
- // zero-out the bytes
- segment.putLong(cursor, 0L);
- setOffsetAndSize(pos, cursor, 0);
- } else {
- segment.putLong(cursor, value.getEpochMillisecond());
- setOffsetAndSize(pos, cursor,
value.getEpochNanoOfMillisecond());
- }
-
- cursor += 8;
+ if (value == null) {
+ setNullBit(pos);
+ // zero-out the bytes
+ segment.putLong(cursor, 0L);
+ setOffsetAndSize(pos, cursor, 0);
+ } else {
+ segment.putLong(cursor, value.getEpochMillisecond());
+ setOffsetAndSize(pos, cursor, value.getEpochNanoOfMillisecond());
}
+
+ cursor += 8;
}
@Override