Copilot commented on code in PR #4253:
URL: https://github.com/apache/flink-cdc/pull/4253#discussion_r2780469447
##########
docs/content.zh/docs/connectors/pipeline-connectors/starrocks.md:
##########
@@ -297,6 +304,11 @@ pipeline:
<td>DATE</td>
<td></td>
</tr>
+ <tr>
+ <td>TIME</td>
+ <td>VARCHAR</td>
+ <td>StarRocks 不支持 TIME 类型,因此映射为 VARCHAR。TIME 值以字符串形式存储,格式为
"HH:mm:ss"(精度为 0)或 "HH:mm:ss.SSS"(精度 > 0)。</td>
Review Comment:
同英文文档类似:这里写 TIME 精度 > 0 时格式为 "HH:mm:ss.SSS",但实际上支持 TIME(p)(p 可到 9),输出的小数位数应为
p 位。建议改成更通用的描述(p=0 为 "HH:mm:ss",p>0 为 "HH:mm:ss.<p 位小数>",并给出 p=3 的示例)。
```suggestion
<td>StarRocks 不支持 TIME 类型,因此映射为 VARCHAR。TIME(p) 值以字符串形式存储:当 p = 0 时格式为
"HH:mm:ss",当 p > 0 时格式为 "HH:mm:ss.<p 位小数>"(例如 p = 3 时为
"HH:mm:ss.SSS")。</td>
```
##########
docs/content/docs/connectors/pipeline-connectors/starrocks.md:
##########
@@ -306,6 +313,11 @@ pipeline:
<td>DATE</td>
<td></td>
</tr>
+ <tr>
+ <td>TIME</td>
+ <td>VARCHAR</td>
+ <td>StarRocks does not support TIME type, so it is mapped to VARCHAR.
TIME values are stored as strings in format "HH:mm:ss" (precision 0) or
"HH:mm:ss.SSS" (precision > 0).</td>
Review Comment:
The docs state TIME values are stored as "HH:mm:ss" (precision 0) or
"HH:mm:ss.SSS" (precision > 0). However, the connector supports TIME(p) with p
up to 9, and formatting uses exactly p fractional digits. Please update the
text to describe the general format (e.g., "HH:mm:ss" for p=0 and "HH:mm:ss.<p
digits>" for p>0, with an example such as p=3 -> "HH:mm:ss.SSS").
```suggestion
<td>StarRocks does not support TIME type, so it is mapped to VARCHAR.
TIME values are stored as strings in format "HH:mm:ss" when the precision p =
0, or "HH:mm:ss.<p digits>" when p > 0 (for example, p = 3 uses
"HH:mm:ss.SSS").</td>
```
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/FlussPipelineITCase.java:
##########
@@ -137,6 +140,27 @@ void before() {
tBatchEnv.useDatabase(DEFAULT_DB);
}
+ private void waitForFlussClusterReady() throws Exception {
+ int maxRetries = 30;
+ int retryIntervalMs = 1000;
+ Exception lastException = null;
+
+ for (int i = 0; i < maxRetries; i++) {
+ try (Connection connection =
+
ConnectionFactory.createConnection(FLUSS_CLUSTER_EXTENSION.getClientConfig())) {
+ // Connection successful, cluster is ready
+ return;
+ } catch (Exception e) {
+ lastException = e;
+ Thread.sleep(retryIntervalMs);
Review Comment:
waitForFlussClusterReady() catches all Exceptions and then calls
Thread.sleep(). If the thread is interrupted, the InterruptedException will be
swallowed and the interrupted flag cleared, which can lead to hung/slow test
teardown. Please catch InterruptedException separately, restore the interrupt
status (Thread.currentThread().interrupt()), and fail fast (or rethrow) instead
of continuing retries.
```suggestion
} catch (InterruptedException ie) {
// Preserve interrupt status and fail fast instead of
continuing retries
Thread.currentThread().interrupt();
throw new IllegalStateException(
"Thread was interrupted while waiting for Fluss
cluster to become ready",
ie);
} catch (Exception e) {
lastException = e;
try {
Thread.sleep(retryIntervalMs);
} catch (InterruptedException ie) {
// Preserve interrupt status and fail fast instead of
continuing retries
Thread.currentThread().interrupt();
throw new IllegalStateException(
"Thread was interrupted while waiting for Fluss
cluster to become ready",
ie);
}
```
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java:
##########
@@ -374,6 +413,21 @@ public StarRocksColumn.Builder visit(DateType dateType) {
return builder;
}
+ @Override
+ public StarRocksColumn.Builder visit(TimeType timeType) {
+ // StarRocks does not support TIME type, so map it to VARCHAR.
+ // Format: HH:mm:ss for precision 0, HH:mm:ss.SSS for precision > 0
+ // Maximum length: 8 (HH:mm:ss) + 1 (.) + precision = 8 + 1 +
precision
+ // For precision 0: "HH:mm:ss" = 8 characters
+ // For precision > 0: "HH:mm:ss." + precision digits
+ builder.setDataType(VARCHAR);
Review Comment:
The comment describes the TIME string format as "HH:mm:ss.SSS" for any
precision > 0, but the implementation actually formats with an arbitrary number
of fractional digits equal to the declared precision (0–9) via
appendFraction(..., precision, precision, ...). Please update the comment to
reflect the actual behavior (e.g., "HH:mm:ss" or "HH:mm:ss.<fraction with p
digits>") to avoid misleading future maintainers.
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierTest.java:
##########
@@ -226,4 +227,150 @@ void testDropColumn() throws Exception {
.build();
Assertions.assertThat(actualTable).isEqualTo(expectTable);
}
+
+ @Test
+ void testCreateTableWithTimeType() throws Exception {
+ TableId tableId = TableId.parse("test.time_table");
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("id", new IntType())
+ .physicalColumn("start_time", new TimeType())
+ .physicalColumn(
+ "end_time", new TimeType(3)) // TIME with
millisecond precision
+ .primaryKey("id")
+ .build();
+ CreateTableEvent createTableEvent = new CreateTableEvent(tableId,
schema);
+ metadataApplier.applySchemaChange(createTableEvent);
+
+ StarRocksTable actualTable =
+ catalog.getTable(tableId.getSchemaName(),
tableId.getTableName()).orElse(null);
+ Assertions.assertThat(actualTable).isNotNull();
+
+ List<StarRocksColumn> columns = new ArrayList<>();
+ columns.add(
+ new StarRocksColumn.Builder()
+ .setColumnName("id")
+ .setOrdinalPosition(0)
+ .setDataType("int")
+ .setNullable(true)
+ .build());
+ columns.add(
+ new StarRocksColumn.Builder()
+ .setColumnName("start_time")
+ .setOrdinalPosition(1)
+ .setDataType("varchar")
+ .setNullable(true)
+ .setColumnSize(8)
+ .build());
+ columns.add(
+ new StarRocksColumn.Builder()
+ .setColumnName("end_time")
+ .setOrdinalPosition(2)
+ .setDataType("varchar")
+ .setNullable(true)
+ .setColumnSize(12)
+ .build());
+ StarRocksTable expectTable =
+ new StarRocksTable.Builder()
+ .setDatabaseName(tableId.getSchemaName())
+ .setTableName(tableId.getTableName())
+ .setTableType(StarRocksTable.TableType.PRIMARY_KEY)
+ .setColumns(columns)
+ .setTableKeys(schema.primaryKeys())
+ .setDistributionKeys(schema.primaryKeys())
+ .setNumBuckets(10)
+
.setTableProperties(Collections.singletonMap("replication_num", "5"))
+ .build();
+ Assertions.assertThat(actualTable).isEqualTo(expectTable);
+ }
+
+ @Test
+ void testAddTimeTypeColumn() throws Exception {
+ TableId tableId = TableId.parse("test.add_time_column");
+ Schema schema =
+ Schema.newBuilder().physicalColumn("id", new
IntType()).primaryKey("id").build();
+ CreateTableEvent createTableEvent = new CreateTableEvent(tableId,
schema);
+ metadataApplier.applySchemaChange(createTableEvent);
+
+ // Add TIME type column through schema evolution
+ AddColumnEvent addColumnEvent =
+ new AddColumnEvent(
+ tableId,
+ Arrays.asList(
+ new AddColumnEvent.ColumnWithPosition(
+ Column.physicalColumn("duration", new
TimeType())),
+ new AddColumnEvent.ColumnWithPosition(
+
Column.physicalColumn("precision_time", new TimeType(3)))));
+ metadataApplier.applySchemaChange(addColumnEvent);
+
+ StarRocksTable actualTable =
+ catalog.getTable(tableId.getSchemaName(),
tableId.getTableName()).orElse(null);
+ Assertions.assertThat(actualTable).isNotNull();
+
+ List<StarRocksColumn> columns = new ArrayList<>();
+ columns.add(
+ new StarRocksColumn.Builder()
+ .setColumnName("id")
+ .setOrdinalPosition(0)
+ .setDataType("int")
+ .setNullable(true)
+ .build());
+ columns.add(
+ new StarRocksColumn.Builder()
+ .setColumnName("duration")
+ .setOrdinalPosition(1)
+ .setDataType("varchar")
+ .setNullable(true)
+ .setColumnSize(8)
+ .build());
+ columns.add(
+ new StarRocksColumn.Builder()
+ .setColumnName("precision_time")
+ .setOrdinalPosition(2)
+ .setDataType("varchar")
+ .setNullable(true)
+ .setColumnSize(12)
+ .build());
+ StarRocksTable expectTable =
+ new StarRocksTable.Builder()
+ .setDatabaseName(tableId.getSchemaName())
+ .setTableName(tableId.getTableName())
+ .setTableType(StarRocksTable.TableType.PRIMARY_KEY)
+ .setColumns(columns)
+ .setTableKeys(schema.primaryKeys())
+ .setDistributionKeys(schema.primaryKeys())
+ .setNumBuckets(10)
+
.setTableProperties(Collections.singletonMap("replication_num", "5"))
+ .build();
+ Assertions.assertThat(actualTable).isEqualTo(expectTable);
+ }
+
+ @Test
+ void testTimeTypeWithDifferentPrecisions() throws Exception {
+ TableId tableId = TableId.parse("test.time_precision_table");
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("id", new IntType())
+ .physicalColumn("time_default", new TimeType()) //
Default precision
+ .physicalColumn("time_0", new TimeType(0)) // Second
precision
+ .physicalColumn("time_3", new TimeType(3)) //
Millisecond precision
+ .physicalColumn("time_max", new TimeType(3)) //
Maximum supported precision
Review Comment:
The inline comment says "Maximum supported precision" but the code uses new
TimeType(3). In this codebase, TimeType.MAX_PRECISION is 9, so this comment is
incorrect. Either update the comment (e.g., "example precision 3") or change
the test to actually use precision 9 if that’s what you want to validate.
```suggestion
.physicalColumn("time_max", new TimeType(3)) //
Example precision 3
```
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/EventRecordSerializationSchemaTest.java:
##########
@@ -260,6 +263,224 @@ void testMixedSchemaAndDataChanges() throws Exception {
Objects.requireNonNull(serializer.serialize(insertEvent3)));
}
+ @Test
+ void testTimeTypeSerialization() throws Exception {
+ TableId tableId = TableId.parse("test.time_table");
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("id", new IntType())
+ .physicalColumn("start_time", new TimeType())
+ .physicalColumn(
+ "end_time", new TimeType(3)) // TIME with
millisecond precision
+ .primaryKey("id")
+ .build();
+
+ // Create table
+ CreateTableEvent createTableEvent = new CreateTableEvent(tableId,
schema);
+ Assertions.assertThat(serializer.serialize(createTableEvent)).isNull();
+
+ BinaryRecordDataGenerator generator =
+ new
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
+
+ // Test insert with TIME values
+ DataChangeEvent insertEvent =
+ DataChangeEvent.insertEvent(
+ tableId,
+ generator.generate(
+ new Object[] {
+ 1,
+ TimeData.fromLocalTime(LocalTime.of(9, 30,
15)), // 09:30:15
+ TimeData.fromLocalTime(
+ LocalTime.of(17, 45, 30,
123000000)) // 17:45:30.123
+ }));
+
+ StarRocksRowData result = serializer.serialize(insertEvent);
+ Assertions.assertThat(result).isNotNull();
+
+ verifySerializeResult(
+ tableId,
+
"{\"id\":1,\"start_time\":\"09:30:15\",\"end_time\":\"17:45:30.123\",\"__op\":0}",
+ result);
+ }
+
+ @Test
+ void testTimeTypeZeroSecondsFormat() throws Exception {
+ TableId tableId = TableId.parse("test.time_zero_seconds_table");
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("id", new IntType())
+ .physicalColumn("zero_time", new TimeType())
+ .primaryKey("id")
+ .build();
+
+ CreateTableEvent createTableEvent = new CreateTableEvent(tableId,
schema);
+ Assertions.assertThat(serializer.serialize(createTableEvent)).isNull();
+
+ BinaryRecordDataGenerator generator =
+ new
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
+
+ DataChangeEvent insertEvent =
+ DataChangeEvent.insertEvent(
+ tableId,
+ generator.generate(
+ new Object[] {1,
TimeData.fromLocalTime(LocalTime.of(16, 0, 0))}));
+
+ StarRocksRowData result = serializer.serialize(insertEvent);
+ Assertions.assertThat(result).isNotNull();
+
+ verifySerializeResult(tableId,
"{\"id\":1,\"zero_time\":\"16:00:00\",\"__op\":0}", result);
+ }
+
+ @Test
+ void testTimeTypeWithSchemaEvolution() throws Exception {
+ TableId tableId = TableId.parse("test.time_evolution_table");
+ Schema initialSchema =
+ Schema.newBuilder()
+ .physicalColumn("id", new IntType())
+ .physicalColumn("name", new VarCharType(20))
+ .primaryKey("id")
+ .build();
+
+ // Create initial table
+ CreateTableEvent createTableEvent = new CreateTableEvent(tableId,
initialSchema);
+ Assertions.assertThat(serializer.serialize(createTableEvent)).isNull();
+
+ BinaryRecordDataGenerator initialGenerator =
+ new BinaryRecordDataGenerator(
+ initialSchema.getColumnDataTypes().toArray(new
DataType[0]));
+
+ // Insert initial data
+ DataChangeEvent initialInsert =
+ DataChangeEvent.insertEvent(
+ tableId,
+ initialGenerator.generate(
+ new Object[] {1,
BinaryStringData.fromString("Initial Record")}));
+
+ StarRocksRowData initialResult = serializer.serialize(initialInsert);
+ Assertions.assertThat(initialResult).isNotNull();
+
+ verifySerializeResult(
+ tableId, "{\"id\":1,\"name\":\"Initial Record\",\"__op\":0}",
initialResult);
+
+ // Simulate schema evolution: add TIME column
+ Schema evolvedSchema =
+ Schema.newBuilder()
+ .physicalColumn("id", new IntType())
+ .physicalColumn("name", new VarCharType(20))
+ .physicalColumn("created_time", new TimeType())
+ .primaryKey("id")
+ .build();
+
+ // Create AddColumnEvent to simulate schema evolution
+ AddColumnEvent addColumnEvent =
+ new AddColumnEvent(
+ tableId,
+ Arrays.asList(
+ new AddColumnEvent.ColumnWithPosition(
+ Column.physicalColumn("created_time",
new TimeType()),
+ AddColumnEvent.ColumnPosition.LAST,
+ null)));
+ serializer.serialize(addColumnEvent);
+
+ // Insert data with TIME column after schema evolution
+ BinaryRecordDataGenerator evolvedGenerator =
+ new BinaryRecordDataGenerator(
+ evolvedSchema.getColumnDataTypes().toArray(new
DataType[0]));
+
+ DataChangeEvent evolvedInsert =
+ DataChangeEvent.insertEvent(
+ tableId,
+ evolvedGenerator.generate(
+ new Object[] {
+ 2,
+ BinaryStringData.fromString("Evolved
Record"),
+ TimeData.fromLocalTime(LocalTime.of(14,
30, 0)) // 14:30:00
+ }));
+
+ StarRocksRowData evolvedResult = serializer.serialize(evolvedInsert);
+ Assertions.assertThat(evolvedResult).isNotNull();
+
+ verifySerializeResult(
+ tableId,
+ "{\"id\":2,\"name\":\"Evolved
Record\",\"created_time\":\"14:30:00\",\"__op\":0}",
+ evolvedResult);
+ }
+
+ @Test
+ void testTimeTypeBoundaryValues() throws Exception {
+ TableId tableId = TableId.parse("test.time_boundary_table");
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("id", new IntType())
+ .physicalColumn("min_time", new TimeType())
+ .physicalColumn("max_time", new TimeType())
+ .physicalColumn("midnight", new TimeType())
+ .primaryKey("id")
+ .build();
+
+ CreateTableEvent createTableEvent = new CreateTableEvent(tableId,
schema);
+ Assertions.assertThat(serializer.serialize(createTableEvent)).isNull();
+
+ BinaryRecordDataGenerator generator =
+ new
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
+
+ // Test boundary TIME values
+ DataChangeEvent insertEvent =
+ DataChangeEvent.insertEvent(
+ tableId,
+ generator.generate(
+ new Object[] {
+ 1,
+ TimeData.fromLocalTime(LocalTime.MIN), //
00:00:00
+ TimeData.fromLocalTime(LocalTime.MAX), //
23:59:59.999999999
Review Comment:
The comment claims LocalTime.MAX corresponds to "23:59:59.999999999", but
TimeData.fromLocalTime() truncates to millisecond precision (it stores
millisOfDay). This makes the comment inaccurate and can confuse readers of the
test; please adjust it to reflect the actual precision retained (e.g.,
"23:59:59.999" before formatting).
```suggestion
TimeData.fromLocalTime(LocalTime.MAX),
// 23:59:59.999 (truncated to millisecond precision)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]