This is an automated email from the ASF dual-hosted git repository. kunni pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push: new 10e84ca99 [FLINK-38248][pipeline-connector][paimon][starrocks][doris] The default value of '0000-00-00 00:00:00' for MySQL TIMESTAMP fields is not supported in downstream systems. (#4096) 10e84ca99 is described below commit 10e84ca99f6b4ef8ab64a8fd5d9283d2c3022faa Author: suhwan <52690419+suhwan-ch...@users.noreply.github.com> AuthorDate: Mon Aug 18 20:50:42 2025 +0900 [FLINK-38248][pipeline-connector][paimon][starrocks][doris] The default value of '0000-00-00 00:00:00' for MySQL TIMESTAMP fields is not supported in downstream systems. (#4096) --- .../doris/sink/DorisMetadataApplier.java | 23 ++++- .../connectors/doris/utils/DorisSchemaUtils.java | 2 + .../doris/sink/DorisMetadataApplierITCase.java | 103 +++++++++++++++++++++ .../paimon/sink/SchemaChangeProvider.java | 32 ++++++- .../paimon/sink/PaimonMetadataApplierTest.java | 50 ++++++++++ .../starrocks/sink/StarRocksMetadataApplier.java | 4 +- .../connectors/starrocks/sink/StarRocksUtils.java | 25 ++++- .../sink/StarRocksMetadataApplierITCase.java | 101 ++++++++++++++++++++ 8 files changed, 334 insertions(+), 6 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java index 559b8557b..f5d4adb93 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java @@ -200,7 +200,8 @@ public class DorisMetadataApplier implements MetadataApplier { new FieldSchema( column.getName(), typeString, - column.getDefaultValueExpression(), + convertInvalidTimestampDefaultValue( + column.getDefaultValueExpression(), column.getType()), column.getComment())); } return fieldSchemaMap; @@ -237,7 +238,8 @@ public class DorisMetadataApplier implements MetadataApplier { new FieldSchema( column.getName(), buildTypeString(column.getType()), - column.getDefaultValueExpression(), + convertInvalidTimestampDefaultValue( + column.getDefaultValueExpression(), column.getType()), column.getComment()); schemaChangeManager.addColumn( tableId.getSchemaName(), tableId.getTableName(), addFieldSchema); @@ -316,4 +318,21 @@ public class DorisMetadataApplier implements MetadataApplier { throw new SchemaEvolveException(dropTableEvent, "fail to drop table", e); } } + + private String convertInvalidTimestampDefaultValue(String defaultValue, DataType dataType) { + if (defaultValue == null) { + return null; + } + + if (dataType instanceof LocalZonedTimestampType + || dataType instanceof TimestampType + || dataType instanceof ZonedTimestampType) { + + if (DorisSchemaUtils.INVALID_OR_MISSING_DATATIME.equals(defaultValue)) { + return DorisSchemaUtils.DEFAULT_DATETIME; + } + } + + return defaultValue; + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/utils/DorisSchemaUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/utils/DorisSchemaUtils.java index 649e0af1c..ba9ba9232 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/utils/DorisSchemaUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/utils/DorisSchemaUtils.java @@ -46,6 +46,8 @@ public class DorisSchemaUtils { public static final String DEFAULT_DATE = "1970-01-01"; public static final String DEFAULT_DATETIME = "1970-01-01 00:00:00"; + public static final String INVALID_OR_MISSING_DATATIME = "0000-00-00 00:00:00"; + /** * Get partition info by config. Currently only supports DATE/TIMESTAMP AUTO RANGE PARTITION and * doris version should greater than 2.1.6 diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java index e9859eefc..bf8ff7e52 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java @@ -46,6 +46,7 @@ import org.apache.flink.cdc.composer.flink.translator.OperatorUidGenerator; import org.apache.flink.cdc.composer.flink.translator.SchemaOperatorTranslator; import org.apache.flink.cdc.connectors.doris.sink.utils.DorisContainer; import org.apache.flink.cdc.connectors.doris.sink.utils.DorisSinkTestBase; +import org.apache.flink.cdc.connectors.doris.utils.DorisSchemaUtils; import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.streaming.api.datastream.DataStream; @@ -527,6 +528,108 @@ class DorisMetadataApplierITCase extends DorisSinkTestBase { tableId.getTableName())); } + @ParameterizedTest(name = "batchMode: {0}") + @ValueSource(booleans = {true, false}) + void testMysqlDefaultTimestampValueConversionInCreateTable(boolean batchMode) throws Exception { + TableId tableId = + TableId.tableId( + DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME); + + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(50), null)) + .column( + new PhysicalColumn( + "created_time", + DataTypes.TIMESTAMP(), + null, + DorisSchemaUtils.INVALID_OR_MISSING_DATATIME)) + .column( + new PhysicalColumn( + "updated_time", + DataTypes.TIMESTAMP_LTZ(), + null, + DorisSchemaUtils.INVALID_OR_MISSING_DATATIME)) + .primaryKey("id") + .build(); + + runJobWithEvents( + Collections.singletonList(new CreateTableEvent(tableId, schema)), batchMode); + + List<String> actual = inspectTableSchema(tableId); + + List<String> expected = + Arrays.asList( + "id | INT | Yes | true | null", + "name | VARCHAR(150) | Yes | false | null", + "created_time | DATETIME(6) | Yes | false | " + + DorisSchemaUtils.DEFAULT_DATETIME, + "updated_time | DATETIME(6) | Yes | false | " + + DorisSchemaUtils.DEFAULT_DATETIME); + + assertEqualsInOrder(expected, actual); + } + + @ParameterizedTest(name = "batchMode: {0}") + @ValueSource(booleans = {true, false}) + void testMysqlDefaultTimestampValueConversionInAddColumn(boolean batchMode) throws Exception { + TableId tableId = + TableId.tableId( + DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME); + + Schema initialSchema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(50), null)) + .primaryKey("id") + .build(); + + List<Event> events = new ArrayList<>(); + events.add(new CreateTableEvent(tableId, initialSchema)); + + PhysicalColumn createdTimeCol = + new PhysicalColumn( + "created_time", + DataTypes.TIMESTAMP(), + null, + DorisSchemaUtils.INVALID_OR_MISSING_DATATIME); + + PhysicalColumn updatedTimeCol = + new PhysicalColumn( + "updated_time", + DataTypes.TIMESTAMP_LTZ(), + null, + DorisSchemaUtils.INVALID_OR_MISSING_DATATIME); + + events.add( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition(createdTimeCol)))); + + events.add( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition(updatedTimeCol)))); + + runJobWithEvents(events, batchMode); + + List<String> actual = inspectTableSchema(tableId); + + List<String> expected = + Arrays.asList( + "id | INT | Yes | true | null", + "name | VARCHAR(150) | Yes | false | null", + "created_time | DATETIME(6) | Yes | false | " + + DorisSchemaUtils.DEFAULT_DATETIME, + "updated_time | DATETIME(6) | Yes | false | " + + DorisSchemaUtils.DEFAULT_DATETIME); + + assertEqualsInOrder(expected, actual); + } + private void runJobWithEvents(List<Event> events, boolean batchMode) throws Exception { DataStream<Event> stream = env.fromCollection(events, TypeInformation.of(Event.class)).setParallelism(1); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java index a2cee6673..9e486693b 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java @@ -20,6 +20,9 @@ package org.apache.flink.cdc.connectors.paimon.sink; import org.apache.flink.cdc.common.event.AddColumnEvent; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.LocalZonedTimestampType; +import org.apache.flink.cdc.common.types.TimestampType; +import org.apache.flink.cdc.common.types.ZonedTimestampType; import org.apache.flink.cdc.common.types.utils.DataTypeUtils; import org.apache.paimon.flink.LogicalTypeConversion; @@ -35,6 +38,9 @@ import java.util.Optional; * represent different types of schema modifications. */ public class SchemaChangeProvider { + + public static final String DEFAULT_DATETIME = "1970-01-01 00:00:00"; + public static final String INVALID_OR_MISSING_DATATIME = "0000-00-00 00:00:00"; /** * Creates a SchemaChange object for adding a column without specifying its position. * @@ -55,7 +61,9 @@ public class SchemaChangeProvider { // if default value express exists, we need to set the default value to the table // option Column column = columnWithPosition.getAddColumn(); - Optional.ofNullable(column.getDefaultValueExpression()) + Optional.ofNullable( + convertInvalidTimestampDefaultValue( + column.getDefaultValueExpression(), column.getType())) .ifPresent( value -> { result.add( @@ -89,7 +97,9 @@ public class SchemaChangeProvider { // if default value express exists, we need to set the default value to the table // option Column column = columnWithPosition.getAddColumn(); - Optional.ofNullable(column.getDefaultValueExpression()) + Optional.ofNullable( + convertInvalidTimestampDefaultValue( + column.getDefaultValueExpression(), column.getType())) .ifPresent( value -> { result.add( @@ -149,4 +159,22 @@ public class SchemaChangeProvider { public static SchemaChange setOption(String key, String value) { return SchemaChange.setOption(key, value); } + + private static String convertInvalidTimestampDefaultValue( + String defaultValue, DataType dataType) { + if (defaultValue == null) { + return null; + } + + if (dataType instanceof LocalZonedTimestampType + || dataType instanceof TimestampType + || dataType instanceof ZonedTimestampType) { + + if (INVALID_OR_MISSING_DATATIME.equals(defaultValue)) { + return DEFAULT_DATETIME; + } + } + + return defaultValue; + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java index 9dfb1d524..1dd55673b 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java @@ -37,6 +37,7 @@ import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -581,4 +582,53 @@ class PaimonMetadataApplierTest { Assertions.assertThat(table.options()).containsEntry("bucket", "-1"); Assertions.assertThat(table.comment()).contains("comment of table_with_comment"); } + + @Test + public void testMysqlDefaultTimestampValueConversionInAddColumn() + throws SchemaEvolveException, Catalog.TableNotExistException, + Catalog.DatabaseNotEmptyException, Catalog.DatabaseNotExistException { + initialize("filesystem"); + Map<String, String> tableOptions = new HashMap<>(); + tableOptions.put("bucket", "-1"); + MetadataApplier metadataApplier = + new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>()); + + CreateTableEvent createTableEvent = + new CreateTableEvent( + TableId.parse("test.timestamp_test"), + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "id", + org.apache.flink.cdc.common.types.DataTypes.INT().notNull()) + .physicalColumn( + "name", + org.apache.flink.cdc.common.types.DataTypes.STRING()) + .primaryKey("id") + .build()); + metadataApplier.applySchemaChange(createTableEvent); + + List<AddColumnEvent.ColumnWithPosition> addedColumns = new ArrayList<>(); + addedColumns.add( + AddColumnEvent.last( + Column.physicalColumn( + "created_time", + org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP(), + null, + SchemaChangeProvider.INVALID_OR_MISSING_DATATIME))); + addedColumns.add( + AddColumnEvent.last( + Column.physicalColumn( + "updated_time", + org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP_LTZ(), + null, + SchemaChangeProvider.INVALID_OR_MISSING_DATATIME))); + + AddColumnEvent addColumnEvent = + new AddColumnEvent(TableId.parse("test.timestamp_test"), addedColumns); + metadataApplier.applySchemaChange(addColumnEvent); + + Table table = catalog.getTable(Identifier.fromString("test.timestamp_test")); + + Assertions.assertThat(table).isNotNull(); + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java index 4204dbf9c..b0da50c84 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java @@ -169,7 +169,9 @@ public class StarRocksMetadataApplier implements MetadataApplier { .setColumnName(column.getName()) .setOrdinalPosition(-1) .setColumnComment(column.getComment()) - .setDefaultValue(column.getDefaultValueExpression()); + .setDefaultValue( + StarRocksUtils.convertInvalidTimestampDefaultValue( + column.getDefaultValueExpression(), column.getType())); toStarRocksDataType(column, false, builder); addColumns.add(builder.build()); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java index 50dab2ac4..92e921856 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java @@ -53,6 +53,9 @@ import static org.apache.flink.cdc.common.types.DataTypeChecks.getScale; /** Utilities for conversion from source table to StarRocks table. */ public class StarRocksUtils { + public static final String DEFAULT_DATETIME = "1970-01-01 00:00:00"; + public static final String INVALID_OR_MISSING_DATATIME = "0000-00-00 00:00:00"; + /** Convert a source table to {@link StarRocksTable}. */ public static StarRocksTable toStarRocksTable( TableId tableId, Schema schema, TableCreateConfig tableCreateConfig) { @@ -85,7 +88,9 @@ public class StarRocksUtils { .setColumnName(column.getName()) .setOrdinalPosition(i) .setColumnComment(column.getComment()) - .setDefaultValue(column.getDefaultValueExpression()); + .setDefaultValue( + convertInvalidTimestampDefaultValue( + column.getDefaultValueExpression(), column.getType())); toStarRocksDataType(column, i < primaryKeyCount, builder); starRocksColumns.add(builder.build()); } @@ -386,4 +391,22 @@ public class StarRocksUtils { throw new UnsupportedOperationException("Unsupported CDC data type " + dataType); } } + + public static String convertInvalidTimestampDefaultValue( + String defaultValue, org.apache.flink.cdc.common.types.DataType dataType) { + if (defaultValue == null) { + return null; + } + + if (dataType instanceof org.apache.flink.cdc.common.types.LocalZonedTimestampType + || dataType instanceof org.apache.flink.cdc.common.types.TimestampType + || dataType instanceof org.apache.flink.cdc.common.types.ZonedTimestampType) { + + if (INVALID_OR_MISSING_DATATIME.equals(defaultValue)) { + return DEFAULT_DATETIME; + } + } + + return defaultValue; + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java index 2fea38f07..60dbb0966 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java @@ -480,4 +480,105 @@ class StarRocksMetadataApplierITCase extends StarRocksSinkTestBase { : e) .toArray()); } + + @Test + void testMysqlDefaultTimestampValueConversionInCreateTable() throws Exception { + TableId tableId = + TableId.tableId( + StarRocksContainer.STARROCKS_DATABASE_NAME, + StarRocksContainer.STARROCKS_TABLE_NAME); + + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(50), null)) + .column( + new PhysicalColumn( + "created_time", + DataTypes.TIMESTAMP(), + null, + StarRocksUtils.INVALID_OR_MISSING_DATATIME)) + .column( + new PhysicalColumn( + "updated_time", + DataTypes.TIMESTAMP_LTZ(), + null, + StarRocksUtils.INVALID_OR_MISSING_DATATIME)) + .primaryKey("id") + .build(); + + runJobWithEvents(Collections.singletonList(new CreateTableEvent(tableId, schema))); + + List<String> actual = inspectTableSchema(tableId); + + List<String> expected = + Arrays.asList( + "id | int | NO | true | null", + "name | varchar(150) | YES | false | null", + "created_time | datetime | YES | false | " + + StarRocksUtils.DEFAULT_DATETIME, + "updated_time | datetime | YES | false | " + + StarRocksUtils.DEFAULT_DATETIME); + + assertEqualsInOrder(expected, actual); + } + + @Test + void testMysqlDefaultTimestampValueConversionInAddColumn() throws Exception { + TableId tableId = + TableId.tableId( + StarRocksContainer.STARROCKS_DATABASE_NAME, + StarRocksContainer.STARROCKS_TABLE_NAME); + + Schema initialSchema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(50), null)) + .primaryKey("id") + .build(); + + List<Event> events = new ArrayList<>(); + events.add(new CreateTableEvent(tableId, initialSchema)); + + PhysicalColumn createdTimeCol = + new PhysicalColumn( + "created_time", + DataTypes.TIMESTAMP(), + null, + StarRocksUtils.INVALID_OR_MISSING_DATATIME); + + PhysicalColumn updatedTimeCol = + new PhysicalColumn( + "updated_time", + DataTypes.TIMESTAMP_LTZ(), + null, + StarRocksUtils.INVALID_OR_MISSING_DATATIME); + + events.add( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition(createdTimeCol)))); + + events.add( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition(updatedTimeCol)))); + + runJobWithEvents(events); + + List<String> actual = inspectTableSchema(tableId); + + List<String> expected = + Arrays.asList( + "id | int | NO | true | null", + "name | varchar(150) | YES | false | null", + "created_time | datetime | YES | false | " + + StarRocksUtils.DEFAULT_DATETIME, + "updated_time | datetime | YES | false | " + + StarRocksUtils.DEFAULT_DATETIME); + + assertEqualsInOrder(expected, actual); + } }