This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.8 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 636b9fa58af76f0e2ba704f35d89cc727cb80344 Author: MOBIN <[email protected]> AuthorDate: Mon May 13 16:29:44 2024 +0800 [cdc] Add more checker for the timestamp type. (#3312) --- .../cdc/UpdatedDataFieldsProcessFunctionBase.java | 11 +++++++++++ .../cdc/UpdatedDataFieldsProcessFunctionBaseTest.java | 19 +++++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java index bc31a05e2..8fefcab63 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java @@ -63,6 +63,9 @@ public abstract class UpdatedDataFieldsProcessFunctionBase<I, O> extends Process private static final List<DataTypeRoot> DECIMAL_TYPES = Arrays.asList(DataTypeRoot.DECIMAL); + private static final List<DataTypeRoot> TIMESTAMP_TYPES = + Arrays.asList(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE); + protected UpdatedDataFieldsProcessFunctionBase(Catalog.Loader catalogLoader) { this.catalogLoader = catalogLoader; } @@ -174,6 +177,14 @@ public abstract class UpdatedDataFieldsProcessFunctionBase<I, O> extends Process : ConvertAction.CONVERT; } + oldIdx = TIMESTAMP_TYPES.indexOf(oldType.getTypeRoot()); + newIdx = TIMESTAMP_TYPES.indexOf(newType.getTypeRoot()); + if (oldIdx >= 0 && newIdx >= 0) { + return DataTypeChecks.getPrecision(oldType) <= DataTypeChecks.getPrecision(newType) + ? ConvertAction.CONVERT + : ConvertAction.IGNORE; + } + return ConvertAction.EXCEPTION; } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBaseTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBaseTest.java index 6b1ed71ec..08d9ac975 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBaseTest.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBaseTest.java @@ -22,6 +22,7 @@ import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.DecimalType; import org.apache.paimon.types.IntType; import org.apache.paimon.types.SmallIntType; +import org.apache.paimon.types.TimestampType; import org.apache.paimon.types.VarCharType; import org.junit.Assert; @@ -79,4 +80,22 @@ public class UpdatedDataFieldsProcessFunctionBaseTest { Assert.assertEquals( UpdatedDataFieldsProcessFunctionBase.ConvertAction.IGNORE, convertAction); } + + @Test + public void testCanConvertTimestamp() { + TimestampType oldType = new TimestampType(true, 3); + TimestampType biggerLengthTimestamp = new TimestampType(true, 5); + TimestampType smallerLengthTimestamp = new TimestampType(true, 2); + + UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction = null; + convertAction = + UpdatedDataFieldsProcessFunctionBase.canConvert(oldType, biggerLengthTimestamp); + Assert.assertEquals( + UpdatedDataFieldsProcessFunctionBase.ConvertAction.CONVERT, convertAction); + convertAction = + UpdatedDataFieldsProcessFunctionBase.canConvert(oldType, smallerLengthTimestamp); + + Assert.assertEquals( + UpdatedDataFieldsProcessFunctionBase.ConvertAction.IGNORE, convertAction); + } }
