This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 10e84ca99 [FLINK-38248][pipeline-connector][paimon][starrocks][doris] 
The default value of '0000-00-00 00:00:00' for MySQL TIMESTAMP fields is not 
supported in downstream systems. (#4096)
10e84ca99 is described below

commit 10e84ca99f6b4ef8ab64a8fd5d9283d2c3022faa
Author: suhwan <52690419+suhwan-ch...@users.noreply.github.com>
AuthorDate: Mon Aug 18 20:50:42 2025 +0900

    [FLINK-38248][pipeline-connector][paimon][starrocks][doris] The default 
value of '0000-00-00 00:00:00' for MySQL TIMESTAMP fields is not supported in 
downstream systems. (#4096)
---
 .../doris/sink/DorisMetadataApplier.java           |  23 ++++-
 .../connectors/doris/utils/DorisSchemaUtils.java   |   2 +
 .../doris/sink/DorisMetadataApplierITCase.java     | 103 +++++++++++++++++++++
 .../paimon/sink/SchemaChangeProvider.java          |  32 ++++++-
 .../paimon/sink/PaimonMetadataApplierTest.java     |  50 ++++++++++
 .../starrocks/sink/StarRocksMetadataApplier.java   |   4 +-
 .../connectors/starrocks/sink/StarRocksUtils.java  |  25 ++++-
 .../sink/StarRocksMetadataApplierITCase.java       | 101 ++++++++++++++++++++
 8 files changed, 334 insertions(+), 6 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
index 559b8557b..f5d4adb93 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
@@ -200,7 +200,8 @@ public class DorisMetadataApplier implements 
MetadataApplier {
                     new FieldSchema(
                             column.getName(),
                             typeString,
-                            column.getDefaultValueExpression(),
+                            convertInvalidTimestampDefaultValue(
+                                    column.getDefaultValueExpression(), 
column.getType()),
                             column.getComment()));
         }
         return fieldSchemaMap;
@@ -237,7 +238,8 @@ public class DorisMetadataApplier implements 
MetadataApplier {
                         new FieldSchema(
                                 column.getName(),
                                 buildTypeString(column.getType()),
-                                column.getDefaultValueExpression(),
+                                convertInvalidTimestampDefaultValue(
+                                        column.getDefaultValueExpression(), 
column.getType()),
                                 column.getComment());
                 schemaChangeManager.addColumn(
                         tableId.getSchemaName(), tableId.getTableName(), 
addFieldSchema);
@@ -316,4 +318,21 @@ public class DorisMetadataApplier implements 
MetadataApplier {
             throw new SchemaEvolveException(dropTableEvent, "fail to drop 
table", e);
         }
     }
+
+    private String convertInvalidTimestampDefaultValue(String defaultValue, 
DataType dataType) {
+        if (defaultValue == null) {
+            return null;
+        }
+
+        if (dataType instanceof LocalZonedTimestampType
+                || dataType instanceof TimestampType
+                || dataType instanceof ZonedTimestampType) {
+
+            if 
(DorisSchemaUtils.INVALID_OR_MISSING_DATATIME.equals(defaultValue)) {
+                return DorisSchemaUtils.DEFAULT_DATETIME;
+            }
+        }
+
+        return defaultValue;
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/utils/DorisSchemaUtils.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/utils/DorisSchemaUtils.java
index 649e0af1c..ba9ba9232 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/utils/DorisSchemaUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/utils/DorisSchemaUtils.java
@@ -46,6 +46,8 @@ public class DorisSchemaUtils {
     public static final String DEFAULT_DATE = "1970-01-01";
     public static final String DEFAULT_DATETIME = "1970-01-01 00:00:00";
 
+    public static final String INVALID_OR_MISSING_DATATIME = "0000-00-00 
00:00:00";
+
     /**
      * Get partition info by config. Currently only supports DATE/TIMESTAMP 
AUTO RANGE PARTITION and
      * doris version should greater than 2.1.6
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java
index e9859eefc..bf8ff7e52 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java
@@ -46,6 +46,7 @@ import 
org.apache.flink.cdc.composer.flink.translator.OperatorUidGenerator;
 import org.apache.flink.cdc.composer.flink.translator.SchemaOperatorTranslator;
 import org.apache.flink.cdc.connectors.doris.sink.utils.DorisContainer;
 import org.apache.flink.cdc.connectors.doris.sink.utils.DorisSinkTestBase;
+import org.apache.flink.cdc.connectors.doris.utils.DorisSchemaUtils;
 import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -527,6 +528,108 @@ class DorisMetadataApplierITCase extends 
DorisSinkTestBase {
                                 tableId.getTableName()));
     }
 
+    @ParameterizedTest(name = "batchMode: {0}")
+    @ValueSource(booleans = {true, false})
+    void testMysqlDefaultTimestampValueConversionInCreateTable(boolean 
batchMode) throws Exception {
+        TableId tableId =
+                TableId.tableId(
+                        DorisContainer.DORIS_DATABASE_NAME, 
DorisContainer.DORIS_TABLE_NAME);
+
+        Schema schema =
+                Schema.newBuilder()
+                        .column(new PhysicalColumn("id", 
DataTypes.INT().notNull(), null))
+                        .column(new PhysicalColumn("name", 
DataTypes.VARCHAR(50), null))
+                        .column(
+                                new PhysicalColumn(
+                                        "created_time",
+                                        DataTypes.TIMESTAMP(),
+                                        null,
+                                        
DorisSchemaUtils.INVALID_OR_MISSING_DATATIME))
+                        .column(
+                                new PhysicalColumn(
+                                        "updated_time",
+                                        DataTypes.TIMESTAMP_LTZ(),
+                                        null,
+                                        
DorisSchemaUtils.INVALID_OR_MISSING_DATATIME))
+                        .primaryKey("id")
+                        .build();
+
+        runJobWithEvents(
+                Collections.singletonList(new CreateTableEvent(tableId, 
schema)), batchMode);
+
+        List<String> actual = inspectTableSchema(tableId);
+
+        List<String> expected =
+                Arrays.asList(
+                        "id | INT | Yes | true | null",
+                        "name | VARCHAR(150) | Yes | false | null",
+                        "created_time | DATETIME(6) | Yes | false | "
+                                + DorisSchemaUtils.DEFAULT_DATETIME,
+                        "updated_time | DATETIME(6) | Yes | false | "
+                                + DorisSchemaUtils.DEFAULT_DATETIME);
+
+        assertEqualsInOrder(expected, actual);
+    }
+
+    @ParameterizedTest(name = "batchMode: {0}")
+    @ValueSource(booleans = {true, false})
+    void testMysqlDefaultTimestampValueConversionInAddColumn(boolean 
batchMode) throws Exception {
+        TableId tableId =
+                TableId.tableId(
+                        DorisContainer.DORIS_DATABASE_NAME, 
DorisContainer.DORIS_TABLE_NAME);
+
+        Schema initialSchema =
+                Schema.newBuilder()
+                        .column(new PhysicalColumn("id", 
DataTypes.INT().notNull(), null))
+                        .column(new PhysicalColumn("name", 
DataTypes.VARCHAR(50), null))
+                        .primaryKey("id")
+                        .build();
+
+        List<Event> events = new ArrayList<>();
+        events.add(new CreateTableEvent(tableId, initialSchema));
+
+        PhysicalColumn createdTimeCol =
+                new PhysicalColumn(
+                        "created_time",
+                        DataTypes.TIMESTAMP(),
+                        null,
+                        DorisSchemaUtils.INVALID_OR_MISSING_DATATIME);
+
+        PhysicalColumn updatedTimeCol =
+                new PhysicalColumn(
+                        "updated_time",
+                        DataTypes.TIMESTAMP_LTZ(),
+                        null,
+                        DorisSchemaUtils.INVALID_OR_MISSING_DATATIME);
+
+        events.add(
+                new AddColumnEvent(
+                        tableId,
+                        Collections.singletonList(
+                                new 
AddColumnEvent.ColumnWithPosition(createdTimeCol))));
+
+        events.add(
+                new AddColumnEvent(
+                        tableId,
+                        Collections.singletonList(
+                                new 
AddColumnEvent.ColumnWithPosition(updatedTimeCol))));
+
+        runJobWithEvents(events, batchMode);
+
+        List<String> actual = inspectTableSchema(tableId);
+
+        List<String> expected =
+                Arrays.asList(
+                        "id | INT | Yes | true | null",
+                        "name | VARCHAR(150) | Yes | false | null",
+                        "created_time | DATETIME(6) | Yes | false | "
+                                + DorisSchemaUtils.DEFAULT_DATETIME,
+                        "updated_time | DATETIME(6) | Yes | false | "
+                                + DorisSchemaUtils.DEFAULT_DATETIME);
+
+        assertEqualsInOrder(expected, actual);
+    }
+
     private void runJobWithEvents(List<Event> events, boolean batchMode) 
throws Exception {
         DataStream<Event> stream =
                 env.fromCollection(events, 
TypeInformation.of(Event.class)).setParallelism(1);
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java
index a2cee6673..9e486693b 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java
@@ -20,6 +20,9 @@ package org.apache.flink.cdc.connectors.paimon.sink;
 import org.apache.flink.cdc.common.event.AddColumnEvent;
 import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
+import org.apache.flink.cdc.common.types.TimestampType;
+import org.apache.flink.cdc.common.types.ZonedTimestampType;
 import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
 
 import org.apache.paimon.flink.LogicalTypeConversion;
@@ -35,6 +38,9 @@ import java.util.Optional;
  * represent different types of schema modifications.
  */
 public class SchemaChangeProvider {
+
+    public static final String DEFAULT_DATETIME = "1970-01-01 00:00:00";
+    public static final String INVALID_OR_MISSING_DATATIME = "0000-00-00 
00:00:00";
     /**
      * Creates a SchemaChange object for adding a column without specifying 
its position.
      *
@@ -55,7 +61,9 @@ public class SchemaChangeProvider {
         // if default value express exists, we need to set the default value 
to the table
         // option
         Column column = columnWithPosition.getAddColumn();
-        Optional.ofNullable(column.getDefaultValueExpression())
+        Optional.ofNullable(
+                        convertInvalidTimestampDefaultValue(
+                                column.getDefaultValueExpression(), 
column.getType()))
                 .ifPresent(
                         value -> {
                             result.add(
@@ -89,7 +97,9 @@ public class SchemaChangeProvider {
         // if default value express exists, we need to set the default value 
to the table
         // option
         Column column = columnWithPosition.getAddColumn();
-        Optional.ofNullable(column.getDefaultValueExpression())
+        Optional.ofNullable(
+                        convertInvalidTimestampDefaultValue(
+                                column.getDefaultValueExpression(), 
column.getType()))
                 .ifPresent(
                         value -> {
                             result.add(
@@ -149,4 +159,22 @@ public class SchemaChangeProvider {
     public static SchemaChange setOption(String key, String value) {
         return SchemaChange.setOption(key, value);
     }
+
+    private static String convertInvalidTimestampDefaultValue(
+            String defaultValue, DataType dataType) {
+        if (defaultValue == null) {
+            return null;
+        }
+
+        if (dataType instanceof LocalZonedTimestampType
+                || dataType instanceof TimestampType
+                || dataType instanceof ZonedTimestampType) {
+
+            if (INVALID_OR_MISSING_DATATIME.equals(defaultValue)) {
+                return DEFAULT_DATETIME;
+            }
+        }
+
+        return defaultValue;
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
index 9dfb1d524..1dd55673b 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
@@ -37,6 +37,7 @@ import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
@@ -581,4 +582,53 @@ class PaimonMetadataApplierTest {
         Assertions.assertThat(table.options()).containsEntry("bucket", "-1");
         Assertions.assertThat(table.comment()).contains("comment of 
table_with_comment");
     }
+
+    @Test
+    public void testMysqlDefaultTimestampValueConversionInAddColumn()
+            throws SchemaEvolveException, Catalog.TableNotExistException,
+                    Catalog.DatabaseNotEmptyException, 
Catalog.DatabaseNotExistException {
+        initialize("filesystem");
+        Map<String, String> tableOptions = new HashMap<>();
+        tableOptions.put("bucket", "-1");
+        MetadataApplier metadataApplier =
+                new PaimonMetadataApplier(catalogOptions, tableOptions, new 
HashMap<>());
+
+        CreateTableEvent createTableEvent =
+                new CreateTableEvent(
+                        TableId.parse("test.timestamp_test"),
+                        org.apache.flink.cdc.common.schema.Schema.newBuilder()
+                                .physicalColumn(
+                                        "id",
+                                        
org.apache.flink.cdc.common.types.DataTypes.INT().notNull())
+                                .physicalColumn(
+                                        "name",
+                                        
org.apache.flink.cdc.common.types.DataTypes.STRING())
+                                .primaryKey("id")
+                                .build());
+        metadataApplier.applySchemaChange(createTableEvent);
+
+        List<AddColumnEvent.ColumnWithPosition> addedColumns = new 
ArrayList<>();
+        addedColumns.add(
+                AddColumnEvent.last(
+                        Column.physicalColumn(
+                                "created_time",
+                                
org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP(),
+                                null,
+                                
SchemaChangeProvider.INVALID_OR_MISSING_DATATIME)));
+        addedColumns.add(
+                AddColumnEvent.last(
+                        Column.physicalColumn(
+                                "updated_time",
+                                
org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP_LTZ(),
+                                null,
+                                
SchemaChangeProvider.INVALID_OR_MISSING_DATATIME)));
+
+        AddColumnEvent addColumnEvent =
+                new AddColumnEvent(TableId.parse("test.timestamp_test"), 
addedColumns);
+        metadataApplier.applySchemaChange(addColumnEvent);
+
+        Table table = 
catalog.getTable(Identifier.fromString("test.timestamp_test"));
+
+        Assertions.assertThat(table).isNotNull();
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java
index 4204dbf9c..b0da50c84 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java
@@ -169,7 +169,9 @@ public class StarRocksMetadataApplier implements 
MetadataApplier {
                             .setColumnName(column.getName())
                             .setOrdinalPosition(-1)
                             .setColumnComment(column.getComment())
-                            
.setDefaultValue(column.getDefaultValueExpression());
+                            .setDefaultValue(
+                                    
StarRocksUtils.convertInvalidTimestampDefaultValue(
+                                            
column.getDefaultValueExpression(), column.getType()));
             toStarRocksDataType(column, false, builder);
             addColumns.add(builder.build());
         }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
index 50dab2ac4..92e921856 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
@@ -53,6 +53,9 @@ import static 
org.apache.flink.cdc.common.types.DataTypeChecks.getScale;
 /** Utilities for conversion from source table to StarRocks table. */
 public class StarRocksUtils {
 
+    public static final String DEFAULT_DATETIME = "1970-01-01 00:00:00";
+    public static final String INVALID_OR_MISSING_DATATIME = "0000-00-00 
00:00:00";
+
     /** Convert a source table to {@link StarRocksTable}. */
     public static StarRocksTable toStarRocksTable(
             TableId tableId, Schema schema, TableCreateConfig 
tableCreateConfig) {
@@ -85,7 +88,9 @@ public class StarRocksUtils {
                             .setColumnName(column.getName())
                             .setOrdinalPosition(i)
                             .setColumnComment(column.getComment())
-                            
.setDefaultValue(column.getDefaultValueExpression());
+                            .setDefaultValue(
+                                    convertInvalidTimestampDefaultValue(
+                                            
column.getDefaultValueExpression(), column.getType()));
             toStarRocksDataType(column, i < primaryKeyCount, builder);
             starRocksColumns.add(builder.build());
         }
@@ -386,4 +391,22 @@ public class StarRocksUtils {
             throw new UnsupportedOperationException("Unsupported CDC data type 
" + dataType);
         }
     }
+
+    public static String convertInvalidTimestampDefaultValue(
+            String defaultValue, org.apache.flink.cdc.common.types.DataType 
dataType) {
+        if (defaultValue == null) {
+            return null;
+        }
+
+        if (dataType instanceof 
org.apache.flink.cdc.common.types.LocalZonedTimestampType
+                || dataType instanceof 
org.apache.flink.cdc.common.types.TimestampType
+                || dataType instanceof 
org.apache.flink.cdc.common.types.ZonedTimestampType) {
+
+            if (INVALID_OR_MISSING_DATATIME.equals(defaultValue)) {
+                return DEFAULT_DATETIME;
+            }
+        }
+
+        return defaultValue;
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
index 2fea38f07..60dbb0966 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
@@ -480,4 +480,105 @@ class StarRocksMetadataApplierITCase extends 
StarRocksSinkTestBase {
                                                         : e)
                                 .toArray());
     }
+
+    @Test
+    void testMysqlDefaultTimestampValueConversionInCreateTable() throws 
Exception {
+        TableId tableId =
+                TableId.tableId(
+                        StarRocksContainer.STARROCKS_DATABASE_NAME,
+                        StarRocksContainer.STARROCKS_TABLE_NAME);
+
+        Schema schema =
+                Schema.newBuilder()
+                        .column(new PhysicalColumn("id", 
DataTypes.INT().notNull(), null))
+                        .column(new PhysicalColumn("name", 
DataTypes.VARCHAR(50), null))
+                        .column(
+                                new PhysicalColumn(
+                                        "created_time",
+                                        DataTypes.TIMESTAMP(),
+                                        null,
+                                        
StarRocksUtils.INVALID_OR_MISSING_DATATIME))
+                        .column(
+                                new PhysicalColumn(
+                                        "updated_time",
+                                        DataTypes.TIMESTAMP_LTZ(),
+                                        null,
+                                        
StarRocksUtils.INVALID_OR_MISSING_DATATIME))
+                        .primaryKey("id")
+                        .build();
+
+        runJobWithEvents(Collections.singletonList(new 
CreateTableEvent(tableId, schema)));
+
+        List<String> actual = inspectTableSchema(tableId);
+
+        List<String> expected =
+                Arrays.asList(
+                        "id | int | NO | true | null",
+                        "name | varchar(150) | YES | false | null",
+                        "created_time | datetime | YES | false | "
+                                + StarRocksUtils.DEFAULT_DATETIME,
+                        "updated_time | datetime | YES | false | "
+                                + StarRocksUtils.DEFAULT_DATETIME);
+
+        assertEqualsInOrder(expected, actual);
+    }
+
+    @Test
+    void testMysqlDefaultTimestampValueConversionInAddColumn() throws 
Exception {
+        TableId tableId =
+                TableId.tableId(
+                        StarRocksContainer.STARROCKS_DATABASE_NAME,
+                        StarRocksContainer.STARROCKS_TABLE_NAME);
+
+        Schema initialSchema =
+                Schema.newBuilder()
+                        .column(new PhysicalColumn("id", 
DataTypes.INT().notNull(), null))
+                        .column(new PhysicalColumn("name", 
DataTypes.VARCHAR(50), null))
+                        .primaryKey("id")
+                        .build();
+
+        List<Event> events = new ArrayList<>();
+        events.add(new CreateTableEvent(tableId, initialSchema));
+
+        PhysicalColumn createdTimeCol =
+                new PhysicalColumn(
+                        "created_time",
+                        DataTypes.TIMESTAMP(),
+                        null,
+                        StarRocksUtils.INVALID_OR_MISSING_DATATIME);
+
+        PhysicalColumn updatedTimeCol =
+                new PhysicalColumn(
+                        "updated_time",
+                        DataTypes.TIMESTAMP_LTZ(),
+                        null,
+                        StarRocksUtils.INVALID_OR_MISSING_DATATIME);
+
+        events.add(
+                new AddColumnEvent(
+                        tableId,
+                        Collections.singletonList(
+                                new 
AddColumnEvent.ColumnWithPosition(createdTimeCol))));
+
+        events.add(
+                new AddColumnEvent(
+                        tableId,
+                        Collections.singletonList(
+                                new 
AddColumnEvent.ColumnWithPosition(updatedTimeCol))));
+
+        runJobWithEvents(events);
+
+        List<String> actual = inspectTableSchema(tableId);
+
+        List<String> expected =
+                Arrays.asList(
+                        "id | int | NO | true | null",
+                        "name | varchar(150) | YES | false | null",
+                        "created_time | datetime | YES | false | "
+                                + StarRocksUtils.DEFAULT_DATETIME,
+                        "updated_time | datetime | YES | false | "
+                                + StarRocksUtils.DEFAULT_DATETIME);
+
+        assertEqualsInOrder(expected, actual);
+    }
 }

Reply via email to