This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 69dae39e7 [FLINK-37485][starrocks] Add support for TIME type (#4253)
69dae39e7 is described below

commit 69dae39e7b2eebc53ea23b0c54026b8ec7fe8142
Author: Jia Fan <[email protected]>
AuthorDate: Tue Mar 3 19:06:01 2026 +0800

    [FLINK-37485][starrocks] Add support for TIME type (#4253)
---
 .../connectors/pipeline-connectors/starrocks.md    |  18 +-
 .../connectors/pipeline-connectors/starrocks.md    |  16 +-
 .../cdc/connectors/fluss/FlussPipelineITCase.java  |  26 ++-
 .../connectors/starrocks/sink/StarRocksUtils.java  |  57 +++++-
 .../sink/EventRecordSerializationSchemaTest.java   | 224 +++++++++++++++++++++
 .../sink/StarRocksMetadataApplierITCase.java       |  11 +-
 .../sink/StarRocksMetadataApplierTest.java         | 147 ++++++++++++++
 7 files changed, 488 insertions(+), 11 deletions(-)

diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/starrocks.md 
b/docs/content.zh/docs/connectors/pipeline-connectors/starrocks.md
index 33b7d96ae..98f79f86a 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/starrocks.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/starrocks.md
@@ -128,15 +128,15 @@ pipeline:
       <td>sink.connect.timeout-ms</td>
       <td>optional</td>
       <td style="word-wrap: break-word;">30000</td>
-      <td>String</td>
+      <td>Integer</td>
       <td>与 FE 建立 HTTP 连接的超时时间。取值范围:[100, 60000]。</td>
     </tr>
     <tr>
       <td>sink.wait-for-continue.timeout-ms</td>
       <td>optional</td>
       <td style="word-wrap: break-word;">30000</td>
-      <td>String</td>
-      <td>等待 FE HTTP 100-continue 应答的超时时间。取值范围:[3000, 60000]。</td>
+      <td>Integer</td>
+      <td>等待 FE HTTP 100-continue 应答的超时时间。取值范围:[3000, 600000]。</td>
     </tr>
     <tr>
       <td>sink.buffer-flush.max-bytes</td>
@@ -174,6 +174,13 @@ pipeline:
       <td>Boolean</td>
       <td>at-least-once 下是否使用 transaction stream load。</td>
     </tr>
+    <tr>
+      <td>sink.metric.histogram-window-size</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">100</td>
+      <td>Integer</td>
+      <td>直方图指标的窗口大小。</td>
+    </tr>
     <tr>
       <td>sink.properties.*</td>
       <td>optional</td>
@@ -297,6 +304,11 @@ pipeline:
       <td>DATE</td>
       <td></td>
     </tr>
+    <tr>
+      <td>TIME</td>
+      <td>VARCHAR</td>
+      <td>StarRocks 不支持 TIME 类型,因此映射为 VARCHAR。TIME(p) 值以字符串形式存储:当 p = 0 时格式为 
"HH:mm:ss",当 p > 0 时格式为 "HH:mm:ss.&lt;p 位小数&gt;"(例如 p = 3 时为 
"HH:mm:ss.SSS")。</td>
+    </tr>
     <tr>
       <td>TIMESTAMP</td>
       <td>DATETIME</td>
diff --git a/docs/content/docs/connectors/pipeline-connectors/starrocks.md 
b/docs/content/docs/connectors/pipeline-connectors/starrocks.md
index 7a979d019..67fbbf046 100644
--- a/docs/content/docs/connectors/pipeline-connectors/starrocks.md
+++ b/docs/content/docs/connectors/pipeline-connectors/starrocks.md
@@ -128,14 +128,14 @@ pipeline:
       <td>sink.connect.timeout-ms</td>
       <td>optional</td>
       <td style="word-wrap: break-word;">30000</td>
-      <td>String</td>
+      <td>Integer</td>
       <td>The timeout for establishing HTTP connection. Valid values: 100 to 
60000.</td>
     </tr>
     <tr>
       <td>sink.wait-for-continue.timeout-ms</td>
       <td>optional</td>
       <td style="word-wrap: break-word;">30000</td>
-      <td>String</td>
+      <td>Integer</td>
       <td>Timeout in millisecond to wait for 100-continue response from FE 
http server.
             Valid values: 3000 to 600000.</td>
     </tr>
@@ -177,6 +177,13 @@ pipeline:
       <td>Boolean</td>
       <td>Whether to use transaction stream load for at-least-once when it's 
available.</td>
     </tr>
+    <tr>
+      <td>sink.metric.histogram-window-size</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">100</td>
+      <td>Integer</td>
+      <td>Window size of histogram metrics.</td>
+    </tr>
     <tr>
       <td>sink.properties.*</td>
       <td>optional</td>
@@ -306,6 +313,11 @@ pipeline:
       <td>DATE</td>
       <td></td>
     </tr>
+    <tr>
+      <td>TIME</td>
+      <td>VARCHAR</td>
+      <td>StarRocks does not support TIME type, so it is mapped to VARCHAR. 
TIME values are stored as strings in format "HH:mm:ss" when the precision p = 
0, or "HH:mm:ss.&lt;p digits&gt;" when p &gt; 0 (for example, p = 3 uses 
"HH:mm:ss.SSS").</td>
+    </tr>
     <tr>
       <td>TIMESTAMP</td>
       <td>DATETIME</td>
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/FlussPipelineITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/FlussPipelineITCase.java
index a625f0fd9..ec2d2b4a2 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/FlussPipelineITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/FlussPipelineITCase.java
@@ -45,6 +45,8 @@ import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
 
+import com.alibaba.fluss.client.Connection;
+import com.alibaba.fluss.client.ConnectionFactory;
 import com.alibaba.fluss.config.ConfigOptions;
 import com.alibaba.fluss.config.MemorySize;
 import com.alibaba.fluss.metadata.DataLakeFormat;
@@ -117,7 +119,8 @@ public class FlussPipelineITCase {
     protected TableEnvironment tBatchEnv;
 
     @BeforeEach
-    void before() {
+    void before() throws Exception {
+        waitForFlussClusterReady();
         // open a catalog so that we can get table from the catalog
         String bootstrapServers = 
FLUSS_CLUSTER_EXTENSION.getBootstrapServers();
 
@@ -137,6 +140,27 @@ public class FlussPipelineITCase {
         tBatchEnv.useDatabase(DEFAULT_DB);
     }
 
+    private void waitForFlussClusterReady() throws Exception {
+        int maxRetries = 30;
+        int retryIntervalMs = 1000;
+        Exception lastException = null;
+
+        for (int i = 0; i < maxRetries; i++) {
+            try (Connection connection =
+                    
ConnectionFactory.createConnection(FLUSS_CLUSTER_EXTENSION.getClientConfig())) {
+                // Connection successful, cluster is ready
+                return;
+            } catch (Exception e) {
+                lastException = e;
+                Thread.sleep(retryIntervalMs);
+            }
+        }
+
+        throw new IllegalStateException(
+                "Failed to connect to Fluss cluster after " + maxRetries + " 
attempts",
+                lastException);
+    }
+
     @AfterEach
     void after() {
         tBatchEnv.useDatabase(BUILTIN_DATABASE);
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
index d302f297e..22237a5f3 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
@@ -33,6 +33,7 @@ import org.apache.flink.cdc.common.types.FloatType;
 import org.apache.flink.cdc.common.types.IntType;
 import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
 import org.apache.flink.cdc.common.types.SmallIntType;
+import org.apache.flink.cdc.common.types.TimeType;
 import org.apache.flink.cdc.common.types.TimestampType;
 import org.apache.flink.cdc.common.types.TinyIntType;
 import org.apache.flink.cdc.common.types.VarCharType;
@@ -43,6 +44,8 @@ import com.starrocks.connector.flink.catalog.StarRocksTable;
 import java.time.ZoneId;
 import java.time.ZonedDateTime;
 import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.temporal.ChronoField;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -132,6 +135,35 @@ public class StarRocksUtils {
     private static final DateTimeFormatter DATETIME_FORMATTER =
             DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
 
+    /** Format TIME type data. */
+    private static final DateTimeFormatter TIME_FORMATTER =
+            new 
DateTimeFormatterBuilder().appendPattern("HH:mm:ss").toFormatter();
+
+    private static final DateTimeFormatter[] TIME_FORMATTERS = new 
DateTimeFormatter[10];
+
+    private static DateTimeFormatter timeFormatter(int precision) {
+        if (precision <= 0) {
+            return TIME_FORMATTER;
+        }
+        if (precision < TIME_FORMATTERS.length) {
+            DateTimeFormatter formatter = TIME_FORMATTERS[precision];
+            if (formatter == null) {
+                formatter =
+                        new DateTimeFormatterBuilder()
+                                .appendPattern("HH:mm:ss")
+                                .appendFraction(
+                                        ChronoField.NANO_OF_SECOND, precision, 
precision, true)
+                                .toFormatter();
+                TIME_FORMATTERS[precision] = formatter;
+            }
+            return formatter;
+        }
+        return new DateTimeFormatterBuilder()
+                .appendPattern("HH:mm:ss")
+                .appendFraction(ChronoField.NANO_OF_SECOND, precision, 
precision, true)
+                .toFormatter();
+    }
+
     /**
      * Creates an accessor for getting elements in an internal RecordData 
structure at the given
      * position.
@@ -183,6 +215,13 @@ public class StarRocksUtils {
                 fieldGetter =
                         record -> 
record.getDate(fieldPos).toLocalDate().format(DATE_FORMATTER);
                 break;
+            case TIME_WITHOUT_TIME_ZONE:
+                fieldGetter =
+                        record ->
+                                record.getTime(fieldPos)
+                                        .toLocalTime()
+                                        
.format(timeFormatter(getPrecision(fieldType)));
+                break;
             case TIMESTAMP_WITHOUT_TIME_ZONE:
                 fieldGetter =
                         record ->
@@ -374,6 +413,21 @@ public class StarRocksUtils {
             return builder;
         }
 
+        @Override
+        public StarRocksColumn.Builder visit(TimeType timeType) {
+            // StarRocks does not support TIME type, so map it to VARCHAR.
+            // Format: HH:mm:ss for precision 0, HH:mm:ss.<p digits> for 
precision > 0
+            // Maximum length: 8 (HH:mm:ss) + 1 (.) + precision = 8 + 1 + 
precision
+            // For precision 0: "HH:mm:ss" = 8 characters
+            // For precision > 0: "HH:mm:ss." + precision digits
+            builder.setDataType(VARCHAR);
+            builder.setNullable(timeType.isNullable());
+            int precision = timeType.getPrecision();
+            int length = precision > 0 ? 8 + 1 + precision : 8;
+            builder.setColumnSize(length);
+            return builder;
+        }
+
         @Override
         public StarRocksColumn.Builder visit(TimestampType timestampType) {
             builder.setDataType(DATETIME);
@@ -404,7 +458,8 @@ public class StarRocksUtils {
                 || dataType instanceof 
org.apache.flink.cdc.common.types.TimestampType
                 || dataType instanceof 
org.apache.flink.cdc.common.types.ZonedTimestampType) {
 
-            if (INVALID_OR_MISSING_DATATIME.equals(defaultValue)) {
+            if (INVALID_OR_MISSING_DATATIME.equals(defaultValue)
+                    || defaultValue.startsWith(INVALID_OR_MISSING_DATATIME)) {
                 return DEFAULT_DATETIME;
             }
         }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/EventRecordSerializationSchemaTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/EventRecordSerializationSchemaTest.java
index 5d830b935..a9c6f240e 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/EventRecordSerializationSchemaTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/EventRecordSerializationSchemaTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.cdc.common.data.DateData;
 import org.apache.flink.cdc.common.data.DecimalData;
 import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
+import org.apache.flink.cdc.common.data.TimeData;
 import org.apache.flink.cdc.common.data.TimestampData;
 import org.apache.flink.cdc.common.data.binary.BinaryStringData;
 import org.apache.flink.cdc.common.event.AddColumnEvent;
@@ -45,6 +46,7 @@ import org.apache.flink.cdc.common.types.FloatType;
 import org.apache.flink.cdc.common.types.IntType;
 import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
 import org.apache.flink.cdc.common.types.SmallIntType;
+import org.apache.flink.cdc.common.types.TimeType;
 import org.apache.flink.cdc.common.types.TimestampType;
 import org.apache.flink.cdc.common.types.VarCharType;
 import org.apache.flink.cdc.common.utils.SchemaUtils;
@@ -71,6 +73,7 @@ import java.math.BigDecimal;
 import java.sql.Timestamp;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
+import java.time.LocalTime;
 import java.time.ZoneId;
 import java.time.ZoneOffset;
 import java.util.Arrays;
@@ -260,6 +263,227 @@ class EventRecordSerializationSchemaTest {
                 Objects.requireNonNull(serializer.serialize(insertEvent3)));
     }
 
+    @Test
+    void testTimeTypeSerialization() throws Exception {
+        TableId tableId = TableId.parse("test.time_table");
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("id", new IntType())
+                        .physicalColumn("start_time", new TimeType())
+                        .physicalColumn(
+                                "end_time", new TimeType(3)) // TIME with 
millisecond precision
+                        .primaryKey("id")
+                        .build();
+
+        // Create table
+        CreateTableEvent createTableEvent = new CreateTableEvent(tableId, 
schema);
+        Assertions.assertThat(serializer.serialize(createTableEvent)).isNull();
+
+        BinaryRecordDataGenerator generator =
+                new 
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
+
+        // Test insert with TIME values
+        DataChangeEvent insertEvent =
+                DataChangeEvent.insertEvent(
+                        tableId,
+                        generator.generate(
+                                new Object[] {
+                                    1,
+                                    TimeData.fromLocalTime(LocalTime.of(9, 30, 
15)), // 09:30:15
+                                    TimeData.fromLocalTime(
+                                            LocalTime.of(17, 45, 30, 
123000000)) // 17:45:30.123
+                                }));
+
+        StarRocksRowData result = serializer.serialize(insertEvent);
+        Assertions.assertThat(result).isNotNull();
+
+        verifySerializeResult(
+                tableId,
+                
"{\"id\":1,\"start_time\":\"09:30:15\",\"end_time\":\"17:45:30.123\",\"__op\":0}",
+                result);
+    }
+
+    @Test
+    void testTimeTypeZeroSecondsFormat() throws Exception {
+        TableId tableId = TableId.parse("test.time_zero_seconds_table");
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("id", new IntType())
+                        .physicalColumn("zero_time", new TimeType())
+                        .primaryKey("id")
+                        .build();
+
+        CreateTableEvent createTableEvent = new CreateTableEvent(tableId, 
schema);
+        Assertions.assertThat(serializer.serialize(createTableEvent)).isNull();
+
+        BinaryRecordDataGenerator generator =
+                new 
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
+
+        DataChangeEvent insertEvent =
+                DataChangeEvent.insertEvent(
+                        tableId,
+                        generator.generate(
+                                new Object[] {1, 
TimeData.fromLocalTime(LocalTime.of(16, 0, 0))}));
+
+        StarRocksRowData result = serializer.serialize(insertEvent);
+        Assertions.assertThat(result).isNotNull();
+
+        verifySerializeResult(tableId, 
"{\"id\":1,\"zero_time\":\"16:00:00\",\"__op\":0}", result);
+    }
+
+    @Test
+    void testTimeTypeWithSchemaEvolution() throws Exception {
+        TableId tableId = TableId.parse("test.time_evolution_table");
+        Schema initialSchema =
+                Schema.newBuilder()
+                        .physicalColumn("id", new IntType())
+                        .physicalColumn("name", new VarCharType(20))
+                        .primaryKey("id")
+                        .build();
+
+        // Create initial table
+        CreateTableEvent createTableEvent = new CreateTableEvent(tableId, 
initialSchema);
+        Assertions.assertThat(serializer.serialize(createTableEvent)).isNull();
+
+        BinaryRecordDataGenerator initialGenerator =
+                new BinaryRecordDataGenerator(
+                        initialSchema.getColumnDataTypes().toArray(new 
DataType[0]));
+
+        // Insert initial data
+        DataChangeEvent initialInsert =
+                DataChangeEvent.insertEvent(
+                        tableId,
+                        initialGenerator.generate(
+                                new Object[] {1, 
BinaryStringData.fromString("Initial Record")}));
+
+        StarRocksRowData initialResult = serializer.serialize(initialInsert);
+        Assertions.assertThat(initialResult).isNotNull();
+
+        verifySerializeResult(
+                tableId, "{\"id\":1,\"name\":\"Initial Record\",\"__op\":0}", 
initialResult);
+
+        // Simulate schema evolution: add TIME column
+        Schema evolvedSchema =
+                Schema.newBuilder()
+                        .physicalColumn("id", new IntType())
+                        .physicalColumn("name", new VarCharType(20))
+                        .physicalColumn("created_time", new TimeType())
+                        .primaryKey("id")
+                        .build();
+
+        // Create AddColumnEvent to simulate schema evolution
+        AddColumnEvent addColumnEvent =
+                new AddColumnEvent(
+                        tableId,
+                        Arrays.asList(
+                                new AddColumnEvent.ColumnWithPosition(
+                                        Column.physicalColumn("created_time", 
new TimeType()),
+                                        AddColumnEvent.ColumnPosition.LAST,
+                                        null)));
+        serializer.serialize(addColumnEvent);
+
+        // Insert data with TIME column after schema evolution
+        BinaryRecordDataGenerator evolvedGenerator =
+                new BinaryRecordDataGenerator(
+                        evolvedSchema.getColumnDataTypes().toArray(new 
DataType[0]));
+
+        DataChangeEvent evolvedInsert =
+                DataChangeEvent.insertEvent(
+                        tableId,
+                        evolvedGenerator.generate(
+                                new Object[] {
+                                    2,
+                                    BinaryStringData.fromString("Evolved 
Record"),
+                                    TimeData.fromLocalTime(LocalTime.of(14, 
30, 0)) // 14:30:00
+                                }));
+
+        StarRocksRowData evolvedResult = serializer.serialize(evolvedInsert);
+        Assertions.assertThat(evolvedResult).isNotNull();
+
+        verifySerializeResult(
+                tableId,
+                "{\"id\":2,\"name\":\"Evolved 
Record\",\"created_time\":\"14:30:00\",\"__op\":0}",
+                evolvedResult);
+    }
+
+    @Test
+    void testTimeTypeBoundaryValues() throws Exception {
+        TableId tableId = TableId.parse("test.time_boundary_table");
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("id", new IntType())
+                        .physicalColumn("min_time", new TimeType())
+                        .physicalColumn("max_time", new TimeType())
+                        .physicalColumn("midnight", new TimeType())
+                        .primaryKey("id")
+                        .build();
+
+        CreateTableEvent createTableEvent = new CreateTableEvent(tableId, 
schema);
+        Assertions.assertThat(serializer.serialize(createTableEvent)).isNull();
+
+        BinaryRecordDataGenerator generator =
+                new 
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
+
+        // Test boundary TIME values
+        DataChangeEvent insertEvent =
+                DataChangeEvent.insertEvent(
+                        tableId,
+                        generator.generate(
+                                new Object[] {
+                                    1,
+                                    TimeData.fromLocalTime(LocalTime.MIN), // 
00:00:00
+                                    TimeData.fromLocalTime(
+                                            LocalTime
+                                                    .MAX), // 23:59:59.999 
(truncated to millisecond
+                                    // precision)
+                                    TimeData.fromLocalTime(LocalTime.MIDNIGHT) 
// 00:00:00
+                                }));
+
+        StarRocksRowData result = serializer.serialize(insertEvent);
+        Assertions.assertThat(result).isNotNull();
+
+        verifySerializeResult(
+                tableId,
+                
"{\"id\":1,\"min_time\":\"00:00:00\",\"max_time\":\"23:59:59\",\"midnight\":\"00:00:00\",\"__op\":0}",
+                result);
+    }
+
+    @Test
+    void testTimeTypeWithNullValues() throws Exception {
+        TableId tableId = TableId.parse("test.time_null_table");
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("id", new IntType())
+                        .physicalColumn("nullable_time", new TimeType())
+                        .physicalColumn("not_null_time", new 
TimeType().notNull())
+                        .primaryKey("id")
+                        .build();
+
+        CreateTableEvent createTableEvent = new CreateTableEvent(tableId, 
schema);
+        Assertions.assertThat(serializer.serialize(createTableEvent)).isNull();
+
+        BinaryRecordDataGenerator generator =
+                new 
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
+
+        // Test TIME values with null
+        DataChangeEvent insertEvent =
+                DataChangeEvent.insertEvent(
+                        tableId,
+                        generator.generate(
+                                new Object[] {
+                                    1,
+                                    null, // Null value for nullable column
+                                    TimeData.fromLocalTime(
+                                            LocalTime.of(12, 0, 0)) // Not 
null column
+                                }));
+
+        StarRocksRowData result = serializer.serialize(insertEvent);
+        Assertions.assertThat(result).isNotNull();
+
+        verifySerializeResult(
+                tableId, 
"{\"id\":1,\"not_null_time\":\"12:00:00\",\"__op\":0}", result);
+    }
+
     private void verifySerializeResult(
             TableId expectTable, String expectRow, StarRocksRowData 
actualRowData)
             throws Exception {
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
index e2dc50551..6501ba03c 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
@@ -213,10 +213,10 @@ class StarRocksMetadataApplierITCase extends 
StarRocksSinkTestBase {
                         .column(new PhysicalColumn("string", 
DataTypes.STRING(), "String"))
                         .column(new PhysicalColumn("decimal", 
DataTypes.DECIMAL(17, 7), "Decimal"))
                         .column(new PhysicalColumn("date", DataTypes.DATE(), 
"Date"))
-                        // StarRocks sink doesn't support TIME type yet.
-                        // .column(new PhysicalColumn("time", 
DataTypes.TIME(), "Time"))
-                        // .column(new PhysicalColumn("time_3", 
DataTypes.TIME(3), "Time With
-                        // Precision"))
+                        .column(new PhysicalColumn("time", DataTypes.TIME(), 
"Time"))
+                        .column(
+                                new PhysicalColumn(
+                                        "time_3", DataTypes.TIME(3), "Time 
With Precision"))
                         .column(new PhysicalColumn("timestamp", 
DataTypes.TIMESTAMP(), "Timestamp"))
                         .column(
                                 new PhysicalColumn(
@@ -256,6 +256,9 @@ class StarRocksMetadataApplierITCase extends 
StarRocksSinkTestBase {
                         "string | varchar(1048576) | YES | false | null",
                         "decimal | decimal(17,7) | YES | false | null",
                         "date | date | YES | false | null",
+                        // TIME type mapped to VARCHAR since StarRocks doesn't 
support TIME type
+                        "time | varchar(8) | YES | false | null",
+                        "time_3 | varchar(12) | YES | false | null",
                         "timestamp | datetime | YES | false | null",
                         "timestamp_3 | datetime | YES | false | null",
                         "timestampltz | datetime | YES | false | null",
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierTest.java
index 28d7e9404..2da34d6cf 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.cdc.common.types.BooleanType;
 import org.apache.flink.cdc.common.types.DecimalType;
 import org.apache.flink.cdc.common.types.IntType;
 import org.apache.flink.cdc.common.types.SmallIntType;
+import org.apache.flink.cdc.common.types.TimeType;
 import org.apache.flink.cdc.common.types.TimestampType;
 
 import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
@@ -226,4 +227,150 @@ class StarRocksMetadataApplierTest {
                         .build();
         Assertions.assertThat(actualTable).isEqualTo(expectTable);
     }
+
+    @Test
+    void testCreateTableWithTimeType() throws Exception {
+        TableId tableId = TableId.parse("test.time_table");
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("id", new IntType())
+                        .physicalColumn("start_time", new TimeType())
+                        .physicalColumn(
+                                "end_time", new TimeType(3)) // TIME with 
millisecond precision
+                        .primaryKey("id")
+                        .build();
+        CreateTableEvent createTableEvent = new CreateTableEvent(tableId, 
schema);
+        metadataApplier.applySchemaChange(createTableEvent);
+
+        StarRocksTable actualTable =
+                catalog.getTable(tableId.getSchemaName(), 
tableId.getTableName()).orElse(null);
+        Assertions.assertThat(actualTable).isNotNull();
+
+        List<StarRocksColumn> columns = new ArrayList<>();
+        columns.add(
+                new StarRocksColumn.Builder()
+                        .setColumnName("id")
+                        .setOrdinalPosition(0)
+                        .setDataType("int")
+                        .setNullable(true)
+                        .build());
+        columns.add(
+                new StarRocksColumn.Builder()
+                        .setColumnName("start_time")
+                        .setOrdinalPosition(1)
+                        .setDataType("varchar")
+                        .setNullable(true)
+                        .setColumnSize(8)
+                        .build());
+        columns.add(
+                new StarRocksColumn.Builder()
+                        .setColumnName("end_time")
+                        .setOrdinalPosition(2)
+                        .setDataType("varchar")
+                        .setNullable(true)
+                        .setColumnSize(12)
+                        .build());
+        StarRocksTable expectTable =
+                new StarRocksTable.Builder()
+                        .setDatabaseName(tableId.getSchemaName())
+                        .setTableName(tableId.getTableName())
+                        .setTableType(StarRocksTable.TableType.PRIMARY_KEY)
+                        .setColumns(columns)
+                        .setTableKeys(schema.primaryKeys())
+                        .setDistributionKeys(schema.primaryKeys())
+                        .setNumBuckets(10)
+                        
.setTableProperties(Collections.singletonMap("replication_num", "5"))
+                        .build();
+        Assertions.assertThat(actualTable).isEqualTo(expectTable);
+    }
+
+    @Test
+    void testAddTimeTypeColumn() throws Exception {
+        TableId tableId = TableId.parse("test.add_time_column");
+        Schema schema =
+                Schema.newBuilder().physicalColumn("id", new 
IntType()).primaryKey("id").build();
+        CreateTableEvent createTableEvent = new CreateTableEvent(tableId, 
schema);
+        metadataApplier.applySchemaChange(createTableEvent);
+
+        // Add TIME type column through schema evolution
+        AddColumnEvent addColumnEvent =
+                new AddColumnEvent(
+                        tableId,
+                        Arrays.asList(
+                                new AddColumnEvent.ColumnWithPosition(
+                                        Column.physicalColumn("duration", new 
TimeType())),
+                                new AddColumnEvent.ColumnWithPosition(
+                                        
Column.physicalColumn("precision_time", new TimeType(3)))));
+        metadataApplier.applySchemaChange(addColumnEvent);
+
+        StarRocksTable actualTable =
+                catalog.getTable(tableId.getSchemaName(), 
tableId.getTableName()).orElse(null);
+        Assertions.assertThat(actualTable).isNotNull();
+
+        List<StarRocksColumn> columns = new ArrayList<>();
+        columns.add(
+                new StarRocksColumn.Builder()
+                        .setColumnName("id")
+                        .setOrdinalPosition(0)
+                        .setDataType("int")
+                        .setNullable(true)
+                        .build());
+        columns.add(
+                new StarRocksColumn.Builder()
+                        .setColumnName("duration")
+                        .setOrdinalPosition(1)
+                        .setDataType("varchar")
+                        .setNullable(true)
+                        .setColumnSize(8)
+                        .build());
+        columns.add(
+                new StarRocksColumn.Builder()
+                        .setColumnName("precision_time")
+                        .setOrdinalPosition(2)
+                        .setDataType("varchar")
+                        .setNullable(true)
+                        .setColumnSize(12)
+                        .build());
+        StarRocksTable expectTable =
+                new StarRocksTable.Builder()
+                        .setDatabaseName(tableId.getSchemaName())
+                        .setTableName(tableId.getTableName())
+                        .setTableType(StarRocksTable.TableType.PRIMARY_KEY)
+                        .setColumns(columns)
+                        .setTableKeys(schema.primaryKeys())
+                        .setDistributionKeys(schema.primaryKeys())
+                        .setNumBuckets(10)
+                        
.setTableProperties(Collections.singletonMap("replication_num", "5"))
+                        .build();
+        Assertions.assertThat(actualTable).isEqualTo(expectTable);
+    }
+
+    @Test
+    void testTimeTypeWithDifferentPrecisions() throws Exception {
+        TableId tableId = TableId.parse("test.time_precision_table");
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("id", new IntType())
+                        .physicalColumn("time_default", new TimeType()) // 
Default precision
+                        .physicalColumn("time_0", new TimeType(0)) // Second 
precision
+                        .physicalColumn("time_3", new TimeType(3)) // 
Millisecond precision
+                        .physicalColumn("time_max", new TimeType(3)) // 
Example precision 3
+                        .primaryKey("id")
+                        .build();
+        CreateTableEvent createTableEvent = new CreateTableEvent(tableId, 
schema);
+        metadataApplier.applySchemaChange(createTableEvent);
+
+        StarRocksTable actualTable =
+                catalog.getTable(tableId.getSchemaName(), 
tableId.getTableName()).orElse(null);
+        Assertions.assertThat(actualTable).isNotNull();
+
+        // Verify all TIME columns are correctly mapped to StarRocks VARCHAR 
type
+        // since StarRocks doesn't support TIME type
+        List<String> timeColumns = Arrays.asList("time_default", "time_0", 
"time_3", "time_max");
+        for (StarRocksColumn column : actualTable.getColumns()) {
+            if (timeColumns.contains(column.getColumnName())) {
+                
Assertions.assertThat(column.getDataType().toLowerCase()).isEqualTo("varchar");
+            }
+        }
+    }
 }


Reply via email to