Copilot commented on code in PR #4253:
URL: https://github.com/apache/flink-cdc/pull/4253#discussion_r2780469447


##########
docs/content.zh/docs/connectors/pipeline-connectors/starrocks.md:
##########
@@ -297,6 +304,11 @@ pipeline:
       <td>DATE</td>
       <td></td>
     </tr>
+    <tr>
+      <td>TIME</td>
+      <td>VARCHAR</td>
+      <td>StarRocks 不支持 TIME 类型,因此映射为 VARCHAR。TIME 值以字符串形式存储,格式为 
"HH:mm:ss"(精度为 0)或 "HH:mm:ss.SSS"(精度 > 0)。</td>

Review Comment:
   同英文文档类似:这里写 TIME 精度 > 0 时格式为 "HH:mm:ss.SSS",但实际上支持 TIME(p)(p 可到 9),输出的小数位数应为 
p 位。建议改成更通用的描述(p=0 为 "HH:mm:ss",p>0 为 "HH:mm:ss.<p 位小数>",并给出 p=3 的示例)。
   ```suggestion
         <td>StarRocks 不支持 TIME 类型,因此映射为 VARCHAR。TIME(p) 值以字符串形式存储:当 p = 0 时格式为 
"HH:mm:ss",当 p > 0 时格式为 "HH:mm:ss.&lt;p 位小数&gt;"(例如 p = 3 时为 
"HH:mm:ss.SSS")。</td>
   ```



##########
docs/content/docs/connectors/pipeline-connectors/starrocks.md:
##########
@@ -306,6 +313,11 @@ pipeline:
       <td>DATE</td>
       <td></td>
     </tr>
+    <tr>
+      <td>TIME</td>
+      <td>VARCHAR</td>
+      <td>StarRocks does not support TIME type, so it is mapped to VARCHAR. 
TIME values are stored as strings in format "HH:mm:ss" (precision 0) or 
"HH:mm:ss.SSS" (precision > 0).</td>

Review Comment:
   The docs state TIME values are stored as "HH:mm:ss" (precision 0) or 
"HH:mm:ss.SSS" (precision > 0). However, the connector supports TIME(p) with p 
up to 9, and formatting uses exactly p fractional digits. Please update the 
text to describe the general format (e.g., "HH:mm:ss" for p=0 and "HH:mm:ss.<p 
digits>" for p>0, with an example such as p=3 -> "HH:mm:ss.SSS").
   ```suggestion
         <td>StarRocks does not support TIME type, so it is mapped to VARCHAR. 
TIME values are stored as strings in format "HH:mm:ss" when the precision p = 
0, or "HH:mm:ss.&lt;p digits&gt;" when p &gt; 0 (for example, p = 3 uses 
"HH:mm:ss.SSS").</td>
   ```



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/FlussPipelineITCase.java:
##########
@@ -137,6 +140,27 @@ void before() {
         tBatchEnv.useDatabase(DEFAULT_DB);
     }
 
+    private void waitForFlussClusterReady() throws Exception {
+        int maxRetries = 30;
+        int retryIntervalMs = 1000;
+        Exception lastException = null;
+
+        for (int i = 0; i < maxRetries; i++) {
+            try (Connection connection =
+                    
ConnectionFactory.createConnection(FLUSS_CLUSTER_EXTENSION.getClientConfig())) {
+                // Connection successful, cluster is ready
+                return;
+            } catch (Exception e) {
+                lastException = e;
+                Thread.sleep(retryIntervalMs);

Review Comment:
   waitForFlussClusterReady() catches all Exceptions and then calls 
Thread.sleep(). If the thread is interrupted, the InterruptedException will be 
swallowed and the interrupted flag cleared, which can lead to hung/slow test 
teardown. Please catch InterruptedException separately, restore the interrupt 
status (Thread.currentThread().interrupt()), and fail fast (or rethrow) instead 
of continuing retries.
   ```suggestion
               } catch (InterruptedException ie) {
                   // Preserve interrupt status and fail fast instead of 
continuing retries
                   Thread.currentThread().interrupt();
                   throw new IllegalStateException(
                           "Thread was interrupted while waiting for Fluss 
cluster to become ready",
                           ie);
               } catch (Exception e) {
                   lastException = e;
                   try {
                       Thread.sleep(retryIntervalMs);
                   } catch (InterruptedException ie) {
                       // Preserve interrupt status and fail fast instead of 
continuing retries
                       Thread.currentThread().interrupt();
                       throw new IllegalStateException(
                               "Thread was interrupted while waiting for Fluss 
cluster to become ready",
                               ie);
                   }
   ```



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java:
##########
@@ -374,6 +413,21 @@ public StarRocksColumn.Builder visit(DateType dateType) {
             return builder;
         }
 
+        @Override
+        public StarRocksColumn.Builder visit(TimeType timeType) {
+            // StarRocks does not support TIME type, so map it to VARCHAR.
+            // Format: HH:mm:ss for precision 0, HH:mm:ss.SSS for precision > 0
+            // Maximum length: 8 (HH:mm:ss) + 1 (.) + precision = 8 + 1 + 
precision
+            // For precision 0: "HH:mm:ss" = 8 characters
+            // For precision > 0: "HH:mm:ss." + precision digits
+            builder.setDataType(VARCHAR);

Review Comment:
   The comment describes the TIME string format as "HH:mm:ss.SSS" for any 
precision > 0, but the implementation actually formats with an arbitrary number 
of fractional digits equal to the declared precision (0–9) via 
appendFraction(..., precision, precision, ...). Please update the comment to 
reflect the actual behavior (e.g., "HH:mm:ss" or "HH:mm:ss.<fraction with p 
digits>") to avoid misleading future maintainers.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierTest.java:
##########
@@ -226,4 +227,150 @@ void testDropColumn() throws Exception {
                         .build();
         Assertions.assertThat(actualTable).isEqualTo(expectTable);
     }
+
+    @Test
+    void testCreateTableWithTimeType() throws Exception {
+        TableId tableId = TableId.parse("test.time_table");
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("id", new IntType())
+                        .physicalColumn("start_time", new TimeType())
+                        .physicalColumn(
+                                "end_time", new TimeType(3)) // TIME with 
millisecond precision
+                        .primaryKey("id")
+                        .build();
+        CreateTableEvent createTableEvent = new CreateTableEvent(tableId, 
schema);
+        metadataApplier.applySchemaChange(createTableEvent);
+
+        StarRocksTable actualTable =
+                catalog.getTable(tableId.getSchemaName(), 
tableId.getTableName()).orElse(null);
+        Assertions.assertThat(actualTable).isNotNull();
+
+        List<StarRocksColumn> columns = new ArrayList<>();
+        columns.add(
+                new StarRocksColumn.Builder()
+                        .setColumnName("id")
+                        .setOrdinalPosition(0)
+                        .setDataType("int")
+                        .setNullable(true)
+                        .build());
+        columns.add(
+                new StarRocksColumn.Builder()
+                        .setColumnName("start_time")
+                        .setOrdinalPosition(1)
+                        .setDataType("varchar")
+                        .setNullable(true)
+                        .setColumnSize(8)
+                        .build());
+        columns.add(
+                new StarRocksColumn.Builder()
+                        .setColumnName("end_time")
+                        .setOrdinalPosition(2)
+                        .setDataType("varchar")
+                        .setNullable(true)
+                        .setColumnSize(12)
+                        .build());
+        StarRocksTable expectTable =
+                new StarRocksTable.Builder()
+                        .setDatabaseName(tableId.getSchemaName())
+                        .setTableName(tableId.getTableName())
+                        .setTableType(StarRocksTable.TableType.PRIMARY_KEY)
+                        .setColumns(columns)
+                        .setTableKeys(schema.primaryKeys())
+                        .setDistributionKeys(schema.primaryKeys())
+                        .setNumBuckets(10)
+                        
.setTableProperties(Collections.singletonMap("replication_num", "5"))
+                        .build();
+        Assertions.assertThat(actualTable).isEqualTo(expectTable);
+    }
+
+    @Test
+    void testAddTimeTypeColumn() throws Exception {
+        TableId tableId = TableId.parse("test.add_time_column");
+        Schema schema =
+                Schema.newBuilder().physicalColumn("id", new 
IntType()).primaryKey("id").build();
+        CreateTableEvent createTableEvent = new CreateTableEvent(tableId, 
schema);
+        metadataApplier.applySchemaChange(createTableEvent);
+
+        // Add TIME type column through schema evolution
+        AddColumnEvent addColumnEvent =
+                new AddColumnEvent(
+                        tableId,
+                        Arrays.asList(
+                                new AddColumnEvent.ColumnWithPosition(
+                                        Column.physicalColumn("duration", new 
TimeType())),
+                                new AddColumnEvent.ColumnWithPosition(
+                                        
Column.physicalColumn("precision_time", new TimeType(3)))));
+        metadataApplier.applySchemaChange(addColumnEvent);
+
+        StarRocksTable actualTable =
+                catalog.getTable(tableId.getSchemaName(), 
tableId.getTableName()).orElse(null);
+        Assertions.assertThat(actualTable).isNotNull();
+
+        List<StarRocksColumn> columns = new ArrayList<>();
+        columns.add(
+                new StarRocksColumn.Builder()
+                        .setColumnName("id")
+                        .setOrdinalPosition(0)
+                        .setDataType("int")
+                        .setNullable(true)
+                        .build());
+        columns.add(
+                new StarRocksColumn.Builder()
+                        .setColumnName("duration")
+                        .setOrdinalPosition(1)
+                        .setDataType("varchar")
+                        .setNullable(true)
+                        .setColumnSize(8)
+                        .build());
+        columns.add(
+                new StarRocksColumn.Builder()
+                        .setColumnName("precision_time")
+                        .setOrdinalPosition(2)
+                        .setDataType("varchar")
+                        .setNullable(true)
+                        .setColumnSize(12)
+                        .build());
+        StarRocksTable expectTable =
+                new StarRocksTable.Builder()
+                        .setDatabaseName(tableId.getSchemaName())
+                        .setTableName(tableId.getTableName())
+                        .setTableType(StarRocksTable.TableType.PRIMARY_KEY)
+                        .setColumns(columns)
+                        .setTableKeys(schema.primaryKeys())
+                        .setDistributionKeys(schema.primaryKeys())
+                        .setNumBuckets(10)
+                        
.setTableProperties(Collections.singletonMap("replication_num", "5"))
+                        .build();
+        Assertions.assertThat(actualTable).isEqualTo(expectTable);
+    }
+
+    @Test
+    void testTimeTypeWithDifferentPrecisions() throws Exception {
+        TableId tableId = TableId.parse("test.time_precision_table");
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("id", new IntType())
+                        .physicalColumn("time_default", new TimeType()) // 
Default precision
+                        .physicalColumn("time_0", new TimeType(0)) // Second 
precision
+                        .physicalColumn("time_3", new TimeType(3)) // 
Millisecond precision
+                        .physicalColumn("time_max", new TimeType(3)) // 
Maximum supported precision

Review Comment:
   The inline comment says "Maximum supported precision" but the code uses new 
TimeType(3). In this codebase, TimeType.MAX_PRECISION is 9, so this comment is 
incorrect. Either update the comment (e.g., "example precision 3") or change 
the test to actually use precision 9 if that’s what you want to validate.
   ```suggestion
                           .physicalColumn("time_max", new TimeType(3)) // 
Example precision 3
   ```



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/EventRecordSerializationSchemaTest.java:
##########
@@ -260,6 +263,224 @@ void testMixedSchemaAndDataChanges() throws Exception {
                 Objects.requireNonNull(serializer.serialize(insertEvent3)));
     }
 
+    @Test
+    void testTimeTypeSerialization() throws Exception {
+        TableId tableId = TableId.parse("test.time_table");
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("id", new IntType())
+                        .physicalColumn("start_time", new TimeType())
+                        .physicalColumn(
+                                "end_time", new TimeType(3)) // TIME with 
millisecond precision
+                        .primaryKey("id")
+                        .build();
+
+        // Create table
+        CreateTableEvent createTableEvent = new CreateTableEvent(tableId, 
schema);
+        Assertions.assertThat(serializer.serialize(createTableEvent)).isNull();
+
+        BinaryRecordDataGenerator generator =
+                new 
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
+
+        // Test insert with TIME values
+        DataChangeEvent insertEvent =
+                DataChangeEvent.insertEvent(
+                        tableId,
+                        generator.generate(
+                                new Object[] {
+                                    1,
+                                    TimeData.fromLocalTime(LocalTime.of(9, 30, 
15)), // 09:30:15
+                                    TimeData.fromLocalTime(
+                                            LocalTime.of(17, 45, 30, 
123000000)) // 17:45:30.123
+                                }));
+
+        StarRocksRowData result = serializer.serialize(insertEvent);
+        Assertions.assertThat(result).isNotNull();
+
+        verifySerializeResult(
+                tableId,
+                
"{\"id\":1,\"start_time\":\"09:30:15\",\"end_time\":\"17:45:30.123\",\"__op\":0}",
+                result);
+    }
+
+    @Test
+    void testTimeTypeZeroSecondsFormat() throws Exception {
+        TableId tableId = TableId.parse("test.time_zero_seconds_table");
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("id", new IntType())
+                        .physicalColumn("zero_time", new TimeType())
+                        .primaryKey("id")
+                        .build();
+
+        CreateTableEvent createTableEvent = new CreateTableEvent(tableId, 
schema);
+        Assertions.assertThat(serializer.serialize(createTableEvent)).isNull();
+
+        BinaryRecordDataGenerator generator =
+                new 
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
+
+        DataChangeEvent insertEvent =
+                DataChangeEvent.insertEvent(
+                        tableId,
+                        generator.generate(
+                                new Object[] {1, 
TimeData.fromLocalTime(LocalTime.of(16, 0, 0))}));
+
+        StarRocksRowData result = serializer.serialize(insertEvent);
+        Assertions.assertThat(result).isNotNull();
+
+        verifySerializeResult(tableId, 
"{\"id\":1,\"zero_time\":\"16:00:00\",\"__op\":0}", result);
+    }
+
+    @Test
+    void testTimeTypeWithSchemaEvolution() throws Exception {
+        TableId tableId = TableId.parse("test.time_evolution_table");
+        Schema initialSchema =
+                Schema.newBuilder()
+                        .physicalColumn("id", new IntType())
+                        .physicalColumn("name", new VarCharType(20))
+                        .primaryKey("id")
+                        .build();
+
+        // Create initial table
+        CreateTableEvent createTableEvent = new CreateTableEvent(tableId, 
initialSchema);
+        Assertions.assertThat(serializer.serialize(createTableEvent)).isNull();
+
+        BinaryRecordDataGenerator initialGenerator =
+                new BinaryRecordDataGenerator(
+                        initialSchema.getColumnDataTypes().toArray(new 
DataType[0]));
+
+        // Insert initial data
+        DataChangeEvent initialInsert =
+                DataChangeEvent.insertEvent(
+                        tableId,
+                        initialGenerator.generate(
+                                new Object[] {1, 
BinaryStringData.fromString("Initial Record")}));
+
+        StarRocksRowData initialResult = serializer.serialize(initialInsert);
+        Assertions.assertThat(initialResult).isNotNull();
+
+        verifySerializeResult(
+                tableId, "{\"id\":1,\"name\":\"Initial Record\",\"__op\":0}", 
initialResult);
+
+        // Simulate schema evolution: add TIME column
+        Schema evolvedSchema =
+                Schema.newBuilder()
+                        .physicalColumn("id", new IntType())
+                        .physicalColumn("name", new VarCharType(20))
+                        .physicalColumn("created_time", new TimeType())
+                        .primaryKey("id")
+                        .build();
+
+        // Create AddColumnEvent to simulate schema evolution
+        AddColumnEvent addColumnEvent =
+                new AddColumnEvent(
+                        tableId,
+                        Arrays.asList(
+                                new AddColumnEvent.ColumnWithPosition(
+                                        Column.physicalColumn("created_time", 
new TimeType()),
+                                        AddColumnEvent.ColumnPosition.LAST,
+                                        null)));
+        serializer.serialize(addColumnEvent);
+
+        // Insert data with TIME column after schema evolution
+        BinaryRecordDataGenerator evolvedGenerator =
+                new BinaryRecordDataGenerator(
+                        evolvedSchema.getColumnDataTypes().toArray(new 
DataType[0]));
+
+        DataChangeEvent evolvedInsert =
+                DataChangeEvent.insertEvent(
+                        tableId,
+                        evolvedGenerator.generate(
+                                new Object[] {
+                                    2,
+                                    BinaryStringData.fromString("Evolved 
Record"),
+                                    TimeData.fromLocalTime(LocalTime.of(14, 
30, 0)) // 14:30:00
+                                }));
+
+        StarRocksRowData evolvedResult = serializer.serialize(evolvedInsert);
+        Assertions.assertThat(evolvedResult).isNotNull();
+
+        verifySerializeResult(
+                tableId,
+                "{\"id\":2,\"name\":\"Evolved 
Record\",\"created_time\":\"14:30:00\",\"__op\":0}",
+                evolvedResult);
+    }
+
+    @Test
+    void testTimeTypeBoundaryValues() throws Exception {
+        TableId tableId = TableId.parse("test.time_boundary_table");
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("id", new IntType())
+                        .physicalColumn("min_time", new TimeType())
+                        .physicalColumn("max_time", new TimeType())
+                        .physicalColumn("midnight", new TimeType())
+                        .primaryKey("id")
+                        .build();
+
+        CreateTableEvent createTableEvent = new CreateTableEvent(tableId, 
schema);
+        Assertions.assertThat(serializer.serialize(createTableEvent)).isNull();
+
+        BinaryRecordDataGenerator generator =
+                new 
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
+
+        // Test boundary TIME values
+        DataChangeEvent insertEvent =
+                DataChangeEvent.insertEvent(
+                        tableId,
+                        generator.generate(
+                                new Object[] {
+                                    1,
+                                    TimeData.fromLocalTime(LocalTime.MIN), // 
00:00:00
+                                    TimeData.fromLocalTime(LocalTime.MAX), // 
23:59:59.999999999

Review Comment:
   The comment claims LocalTime.MAX corresponds to "23:59:59.999999999", but 
TimeData.fromLocalTime() truncates to millisecond precision (it stores 
millisOfDay). This makes the comment inaccurate and can confuse readers of the 
test; please adjust it to reflect the actual precision retained (e.g., 
"23:59:59.999" before formatting).
   ```suggestion
                                       TimeData.fromLocalTime(LocalTime.MAX), 
// 23:59:59.999 (truncated to millisecond precision)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to