This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 69dae39e7 [FLINK-37485][starrocks] Add support for TIME type (#4253)
69dae39e7 is described below
commit 69dae39e7b2eebc53ea23b0c54026b8ec7fe8142
Author: Jia Fan <[email protected]>
AuthorDate: Tue Mar 3 19:06:01 2026 +0800
[FLINK-37485][starrocks] Add support for TIME type (#4253)
---
.../connectors/pipeline-connectors/starrocks.md | 18 +-
.../connectors/pipeline-connectors/starrocks.md | 16 +-
.../cdc/connectors/fluss/FlussPipelineITCase.java | 26 ++-
.../connectors/starrocks/sink/StarRocksUtils.java | 57 +++++-
.../sink/EventRecordSerializationSchemaTest.java | 224 +++++++++++++++++++++
.../sink/StarRocksMetadataApplierITCase.java | 11 +-
.../sink/StarRocksMetadataApplierTest.java | 147 ++++++++++++++
7 files changed, 488 insertions(+), 11 deletions(-)
diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/starrocks.md
b/docs/content.zh/docs/connectors/pipeline-connectors/starrocks.md
index 33b7d96ae..98f79f86a 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/starrocks.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/starrocks.md
@@ -128,15 +128,15 @@ pipeline:
<td>sink.connect.timeout-ms</td>
<td>optional</td>
<td style="word-wrap: break-word;">30000</td>
- <td>String</td>
+ <td>Integer</td>
<td>与 FE 建立 HTTP 连接的超时时间。取值范围:[100, 60000]。</td>
</tr>
<tr>
<td>sink.wait-for-continue.timeout-ms</td>
<td>optional</td>
<td style="word-wrap: break-word;">30000</td>
- <td>String</td>
- <td>等待 FE HTTP 100-continue 应答的超时时间。取值范围:[3000, 60000]。</td>
+ <td>Integer</td>
+ <td>等待 FE HTTP 100-continue 应答的超时时间。取值范围:[3000, 600000]。</td>
</tr>
<tr>
<td>sink.buffer-flush.max-bytes</td>
@@ -174,6 +174,13 @@ pipeline:
<td>Boolean</td>
<td>at-least-once 下是否使用 transaction stream load。</td>
</tr>
+ <tr>
+ <td>sink.metric.histogram-window-size</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">100</td>
+ <td>Integer</td>
+ <td>直方图指标的窗口大小。</td>
+ </tr>
<tr>
<td>sink.properties.*</td>
<td>optional</td>
@@ -297,6 +304,11 @@ pipeline:
<td>DATE</td>
<td></td>
</tr>
+ <tr>
+ <td>TIME</td>
+ <td>VARCHAR</td>
+ <td>StarRocks 不支持 TIME 类型,因此映射为 VARCHAR。TIME(p) 值以字符串形式存储:当 p = 0 时格式为
"HH:mm:ss",当 p > 0 时格式为 "HH:mm:ss.<p 位小数>"(例如 p = 3 时为
"HH:mm:ss.SSS")。</td>
+ </tr>
<tr>
<td>TIMESTAMP</td>
<td>DATETIME</td>
diff --git a/docs/content/docs/connectors/pipeline-connectors/starrocks.md
b/docs/content/docs/connectors/pipeline-connectors/starrocks.md
index 7a979d019..67fbbf046 100644
--- a/docs/content/docs/connectors/pipeline-connectors/starrocks.md
+++ b/docs/content/docs/connectors/pipeline-connectors/starrocks.md
@@ -128,14 +128,14 @@ pipeline:
<td>sink.connect.timeout-ms</td>
<td>optional</td>
<td style="word-wrap: break-word;">30000</td>
- <td>String</td>
+ <td>Integer</td>
<td>The timeout for establishing HTTP connection. Valid values: 100 to
60000.</td>
</tr>
<tr>
<td>sink.wait-for-continue.timeout-ms</td>
<td>optional</td>
<td style="word-wrap: break-word;">30000</td>
- <td>String</td>
+ <td>Integer</td>
<td>Timeout in millisecond to wait for 100-continue response from FE
http server.
Valid values: 3000 to 600000.</td>
</tr>
@@ -177,6 +177,13 @@ pipeline:
<td>Boolean</td>
<td>Whether to use transaction stream load for at-least-once when it's
available.</td>
</tr>
+ <tr>
+ <td>sink.metric.histogram-window-size</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">100</td>
+ <td>Integer</td>
+ <td>Window size of histogram metrics.</td>
+ </tr>
<tr>
<td>sink.properties.*</td>
<td>optional</td>
@@ -306,6 +313,11 @@ pipeline:
<td>DATE</td>
<td></td>
</tr>
+ <tr>
+ <td>TIME</td>
+ <td>VARCHAR</td>
+ <td>StarRocks does not support TIME type, so it is mapped to VARCHAR.
TIME values are stored as strings in format "HH:mm:ss" when the precision p =
0, or "HH:mm:ss.<p digits>" when p > 0 (for example, p = 3 uses
"HH:mm:ss.SSS").</td>
+ </tr>
<tr>
<td>TIMESTAMP</td>
<td>DATETIME</td>
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/FlussPipelineITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/FlussPipelineITCase.java
index a625f0fd9..ec2d2b4a2 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/FlussPipelineITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/FlussPipelineITCase.java
@@ -45,6 +45,8 @@ import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
+import com.alibaba.fluss.client.Connection;
+import com.alibaba.fluss.client.ConnectionFactory;
import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.MemorySize;
import com.alibaba.fluss.metadata.DataLakeFormat;
@@ -117,7 +119,8 @@ public class FlussPipelineITCase {
protected TableEnvironment tBatchEnv;
@BeforeEach
- void before() {
+ void before() throws Exception {
+ waitForFlussClusterReady();
// open a catalog so that we can get table from the catalog
String bootstrapServers =
FLUSS_CLUSTER_EXTENSION.getBootstrapServers();
@@ -137,6 +140,27 @@ public class FlussPipelineITCase {
tBatchEnv.useDatabase(DEFAULT_DB);
}
+ private void waitForFlussClusterReady() throws Exception {
+ int maxRetries = 30;
+ int retryIntervalMs = 1000;
+ Exception lastException = null;
+
+ for (int i = 0; i < maxRetries; i++) {
+ try (Connection connection =
+
ConnectionFactory.createConnection(FLUSS_CLUSTER_EXTENSION.getClientConfig())) {
+ // Connection successful, cluster is ready
+ return;
+ } catch (Exception e) {
+ lastException = e;
+ Thread.sleep(retryIntervalMs);
+ }
+ }
+
+ throw new IllegalStateException(
+ "Failed to connect to Fluss cluster after " + maxRetries + "
attempts",
+ lastException);
+ }
+
@AfterEach
void after() {
tBatchEnv.useDatabase(BUILTIN_DATABASE);
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
index d302f297e..22237a5f3 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
@@ -33,6 +33,7 @@ import org.apache.flink.cdc.common.types.FloatType;
import org.apache.flink.cdc.common.types.IntType;
import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
import org.apache.flink.cdc.common.types.SmallIntType;
+import org.apache.flink.cdc.common.types.TimeType;
import org.apache.flink.cdc.common.types.TimestampType;
import org.apache.flink.cdc.common.types.TinyIntType;
import org.apache.flink.cdc.common.types.VarCharType;
@@ -43,6 +44,8 @@ import com.starrocks.connector.flink.catalog.StarRocksTable;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.temporal.ChronoField;
import java.util.ArrayList;
import java.util.List;
@@ -132,6 +135,35 @@ public class StarRocksUtils {
private static final DateTimeFormatter DATETIME_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+ /** Format TIME type data. */
+ private static final DateTimeFormatter TIME_FORMATTER =
+ new
DateTimeFormatterBuilder().appendPattern("HH:mm:ss").toFormatter();
+
+ private static final DateTimeFormatter[] TIME_FORMATTERS = new
DateTimeFormatter[10];
+
+ private static DateTimeFormatter timeFormatter(int precision) {
+ if (precision <= 0) {
+ return TIME_FORMATTER;
+ }
+ if (precision < TIME_FORMATTERS.length) {
+ DateTimeFormatter formatter = TIME_FORMATTERS[precision];
+ if (formatter == null) {
+ formatter =
+ new DateTimeFormatterBuilder()
+ .appendPattern("HH:mm:ss")
+ .appendFraction(
+ ChronoField.NANO_OF_SECOND, precision,
precision, true)
+ .toFormatter();
+ TIME_FORMATTERS[precision] = formatter;
+ }
+ return formatter;
+ }
+ return new DateTimeFormatterBuilder()
+ .appendPattern("HH:mm:ss")
+ .appendFraction(ChronoField.NANO_OF_SECOND, precision,
precision, true)
+ .toFormatter();
+ }
+
/**
* Creates an accessor for getting elements in an internal RecordData
structure at the given
* position.
@@ -183,6 +215,13 @@ public class StarRocksUtils {
fieldGetter =
record ->
record.getDate(fieldPos).toLocalDate().format(DATE_FORMATTER);
break;
+ case TIME_WITHOUT_TIME_ZONE:
+ fieldGetter =
+ record ->
+ record.getTime(fieldPos)
+ .toLocalTime()
+
.format(timeFormatter(getPrecision(fieldType)));
+ break;
case TIMESTAMP_WITHOUT_TIME_ZONE:
fieldGetter =
record ->
@@ -374,6 +413,21 @@ public class StarRocksUtils {
return builder;
}
+ @Override
+ public StarRocksColumn.Builder visit(TimeType timeType) {
+ // StarRocks does not support TIME type, so map it to VARCHAR.
+ // Format: HH:mm:ss for precision 0, HH:mm:ss.<p digits> for
precision > 0
+ // Maximum length: 8 (HH:mm:ss) + 1 (.) + precision = 8 + 1 +
precision
+ // For precision 0: "HH:mm:ss" = 8 characters
+ // For precision > 0: "HH:mm:ss." + precision digits
+ builder.setDataType(VARCHAR);
+ builder.setNullable(timeType.isNullable());
+ int precision = timeType.getPrecision();
+ int length = precision > 0 ? 8 + 1 + precision : 8;
+ builder.setColumnSize(length);
+ return builder;
+ }
+
@Override
public StarRocksColumn.Builder visit(TimestampType timestampType) {
builder.setDataType(DATETIME);
@@ -404,7 +458,8 @@ public class StarRocksUtils {
|| dataType instanceof
org.apache.flink.cdc.common.types.TimestampType
|| dataType instanceof
org.apache.flink.cdc.common.types.ZonedTimestampType) {
- if (INVALID_OR_MISSING_DATATIME.equals(defaultValue)) {
+ if (INVALID_OR_MISSING_DATATIME.equals(defaultValue)
+ || defaultValue.startsWith(INVALID_OR_MISSING_DATATIME)) {
return DEFAULT_DATETIME;
}
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/EventRecordSerializationSchemaTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/EventRecordSerializationSchemaTest.java
index 5d830b935..a9c6f240e 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/EventRecordSerializationSchemaTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/EventRecordSerializationSchemaTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.cdc.common.data.DateData;
import org.apache.flink.cdc.common.data.DecimalData;
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
+import org.apache.flink.cdc.common.data.TimeData;
import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.AddColumnEvent;
@@ -45,6 +46,7 @@ import org.apache.flink.cdc.common.types.FloatType;
import org.apache.flink.cdc.common.types.IntType;
import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
import org.apache.flink.cdc.common.types.SmallIntType;
+import org.apache.flink.cdc.common.types.TimeType;
import org.apache.flink.cdc.common.types.TimestampType;
import org.apache.flink.cdc.common.types.VarCharType;
import org.apache.flink.cdc.common.utils.SchemaUtils;
@@ -71,6 +73,7 @@ import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
+import java.time.LocalTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.Arrays;
@@ -260,6 +263,227 @@ class EventRecordSerializationSchemaTest {
Objects.requireNonNull(serializer.serialize(insertEvent3)));
}
+ @Test
+ void testTimeTypeSerialization() throws Exception {
+ TableId tableId = TableId.parse("test.time_table");
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("id", new IntType())
+ .physicalColumn("start_time", new TimeType())
+ .physicalColumn(
+ "end_time", new TimeType(3)) // TIME with
millisecond precision
+ .primaryKey("id")
+ .build();
+
+ // Create table
+ CreateTableEvent createTableEvent = new CreateTableEvent(tableId,
schema);
+ Assertions.assertThat(serializer.serialize(createTableEvent)).isNull();
+
+ BinaryRecordDataGenerator generator =
+ new
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
+
+ // Test insert with TIME values
+ DataChangeEvent insertEvent =
+ DataChangeEvent.insertEvent(
+ tableId,
+ generator.generate(
+ new Object[] {
+ 1,
+ TimeData.fromLocalTime(LocalTime.of(9, 30,
15)), // 09:30:15
+ TimeData.fromLocalTime(
+ LocalTime.of(17, 45, 30,
123000000)) // 17:45:30.123
+ }));
+
+ StarRocksRowData result = serializer.serialize(insertEvent);
+ Assertions.assertThat(result).isNotNull();
+
+ verifySerializeResult(
+ tableId,
+
"{\"id\":1,\"start_time\":\"09:30:15\",\"end_time\":\"17:45:30.123\",\"__op\":0}",
+ result);
+ }
+
+ @Test
+ void testTimeTypeZeroSecondsFormat() throws Exception {
+ TableId tableId = TableId.parse("test.time_zero_seconds_table");
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("id", new IntType())
+ .physicalColumn("zero_time", new TimeType())
+ .primaryKey("id")
+ .build();
+
+ CreateTableEvent createTableEvent = new CreateTableEvent(tableId,
schema);
+ Assertions.assertThat(serializer.serialize(createTableEvent)).isNull();
+
+ BinaryRecordDataGenerator generator =
+ new
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
+
+ DataChangeEvent insertEvent =
+ DataChangeEvent.insertEvent(
+ tableId,
+ generator.generate(
+ new Object[] {1,
TimeData.fromLocalTime(LocalTime.of(16, 0, 0))}));
+
+ StarRocksRowData result = serializer.serialize(insertEvent);
+ Assertions.assertThat(result).isNotNull();
+
+ verifySerializeResult(tableId,
"{\"id\":1,\"zero_time\":\"16:00:00\",\"__op\":0}", result);
+ }
+
+ @Test
+ void testTimeTypeWithSchemaEvolution() throws Exception {
+ TableId tableId = TableId.parse("test.time_evolution_table");
+ Schema initialSchema =
+ Schema.newBuilder()
+ .physicalColumn("id", new IntType())
+ .physicalColumn("name", new VarCharType(20))
+ .primaryKey("id")
+ .build();
+
+ // Create initial table
+ CreateTableEvent createTableEvent = new CreateTableEvent(tableId,
initialSchema);
+ Assertions.assertThat(serializer.serialize(createTableEvent)).isNull();
+
+ BinaryRecordDataGenerator initialGenerator =
+ new BinaryRecordDataGenerator(
+ initialSchema.getColumnDataTypes().toArray(new
DataType[0]));
+
+ // Insert initial data
+ DataChangeEvent initialInsert =
+ DataChangeEvent.insertEvent(
+ tableId,
+ initialGenerator.generate(
+ new Object[] {1,
BinaryStringData.fromString("Initial Record")}));
+
+ StarRocksRowData initialResult = serializer.serialize(initialInsert);
+ Assertions.assertThat(initialResult).isNotNull();
+
+ verifySerializeResult(
+ tableId, "{\"id\":1,\"name\":\"Initial Record\",\"__op\":0}",
initialResult);
+
+ // Simulate schema evolution: add TIME column
+ Schema evolvedSchema =
+ Schema.newBuilder()
+ .physicalColumn("id", new IntType())
+ .physicalColumn("name", new VarCharType(20))
+ .physicalColumn("created_time", new TimeType())
+ .primaryKey("id")
+ .build();
+
+ // Create AddColumnEvent to simulate schema evolution
+ AddColumnEvent addColumnEvent =
+ new AddColumnEvent(
+ tableId,
+ Arrays.asList(
+ new AddColumnEvent.ColumnWithPosition(
+ Column.physicalColumn("created_time",
new TimeType()),
+ AddColumnEvent.ColumnPosition.LAST,
+ null)));
+ serializer.serialize(addColumnEvent);
+
+ // Insert data with TIME column after schema evolution
+ BinaryRecordDataGenerator evolvedGenerator =
+ new BinaryRecordDataGenerator(
+ evolvedSchema.getColumnDataTypes().toArray(new
DataType[0]));
+
+ DataChangeEvent evolvedInsert =
+ DataChangeEvent.insertEvent(
+ tableId,
+ evolvedGenerator.generate(
+ new Object[] {
+ 2,
+ BinaryStringData.fromString("Evolved
Record"),
+ TimeData.fromLocalTime(LocalTime.of(14,
30, 0)) // 14:30:00
+ }));
+
+ StarRocksRowData evolvedResult = serializer.serialize(evolvedInsert);
+ Assertions.assertThat(evolvedResult).isNotNull();
+
+ verifySerializeResult(
+ tableId,
+ "{\"id\":2,\"name\":\"Evolved
Record\",\"created_time\":\"14:30:00\",\"__op\":0}",
+ evolvedResult);
+ }
+
+ @Test
+ void testTimeTypeBoundaryValues() throws Exception {
+ TableId tableId = TableId.parse("test.time_boundary_table");
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("id", new IntType())
+ .physicalColumn("min_time", new TimeType())
+ .physicalColumn("max_time", new TimeType())
+ .physicalColumn("midnight", new TimeType())
+ .primaryKey("id")
+ .build();
+
+ CreateTableEvent createTableEvent = new CreateTableEvent(tableId,
schema);
+ Assertions.assertThat(serializer.serialize(createTableEvent)).isNull();
+
+ BinaryRecordDataGenerator generator =
+ new
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
+
+ // Test boundary TIME values
+ DataChangeEvent insertEvent =
+ DataChangeEvent.insertEvent(
+ tableId,
+ generator.generate(
+ new Object[] {
+ 1,
+ TimeData.fromLocalTime(LocalTime.MIN), //
00:00:00
+ TimeData.fromLocalTime(
+ LocalTime
+ .MAX), // 23:59:59.999
(truncated to millisecond
+ // precision)
+ TimeData.fromLocalTime(LocalTime.MIDNIGHT)
// 00:00:00
+ }));
+
+ StarRocksRowData result = serializer.serialize(insertEvent);
+ Assertions.assertThat(result).isNotNull();
+
+ verifySerializeResult(
+ tableId,
+
"{\"id\":1,\"min_time\":\"00:00:00\",\"max_time\":\"23:59:59\",\"midnight\":\"00:00:00\",\"__op\":0}",
+ result);
+ }
+
+ @Test
+ void testTimeTypeWithNullValues() throws Exception {
+ TableId tableId = TableId.parse("test.time_null_table");
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("id", new IntType())
+ .physicalColumn("nullable_time", new TimeType())
+ .physicalColumn("not_null_time", new
TimeType().notNull())
+ .primaryKey("id")
+ .build();
+
+ CreateTableEvent createTableEvent = new CreateTableEvent(tableId,
schema);
+ Assertions.assertThat(serializer.serialize(createTableEvent)).isNull();
+
+ BinaryRecordDataGenerator generator =
+ new
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
+
+ // Test TIME values with null
+ DataChangeEvent insertEvent =
+ DataChangeEvent.insertEvent(
+ tableId,
+ generator.generate(
+ new Object[] {
+ 1,
+ null, // Null value for nullable column
+ TimeData.fromLocalTime(
+ LocalTime.of(12, 0, 0)) // Not
null column
+ }));
+
+ StarRocksRowData result = serializer.serialize(insertEvent);
+ Assertions.assertThat(result).isNotNull();
+
+ verifySerializeResult(
+ tableId,
"{\"id\":1,\"not_null_time\":\"12:00:00\",\"__op\":0}", result);
+ }
+
private void verifySerializeResult(
TableId expectTable, String expectRow, StarRocksRowData
actualRowData)
throws Exception {
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
index e2dc50551..6501ba03c 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
@@ -213,10 +213,10 @@ class StarRocksMetadataApplierITCase extends
StarRocksSinkTestBase {
.column(new PhysicalColumn("string",
DataTypes.STRING(), "String"))
.column(new PhysicalColumn("decimal",
DataTypes.DECIMAL(17, 7), "Decimal"))
.column(new PhysicalColumn("date", DataTypes.DATE(),
"Date"))
- // StarRocks sink doesn't support TIME type yet.
- // .column(new PhysicalColumn("time",
DataTypes.TIME(), "Time"))
- // .column(new PhysicalColumn("time_3",
DataTypes.TIME(3), "Time With
- // Precision"))
+ .column(new PhysicalColumn("time", DataTypes.TIME(),
"Time"))
+ .column(
+ new PhysicalColumn(
+ "time_3", DataTypes.TIME(3), "Time
With Precision"))
.column(new PhysicalColumn("timestamp",
DataTypes.TIMESTAMP(), "Timestamp"))
.column(
new PhysicalColumn(
@@ -256,6 +256,9 @@ class StarRocksMetadataApplierITCase extends
StarRocksSinkTestBase {
"string | varchar(1048576) | YES | false | null",
"decimal | decimal(17,7) | YES | false | null",
"date | date | YES | false | null",
+ // TIME type mapped to VARCHAR since StarRocks doesn't
support TIME type
+ "time | varchar(8) | YES | false | null",
+ "time_3 | varchar(12) | YES | false | null",
"timestamp | datetime | YES | false | null",
"timestamp_3 | datetime | YES | false | null",
"timestampltz | datetime | YES | false | null",
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierTest.java
index 28d7e9404..2da34d6cf 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.cdc.common.types.BooleanType;
import org.apache.flink.cdc.common.types.DecimalType;
import org.apache.flink.cdc.common.types.IntType;
import org.apache.flink.cdc.common.types.SmallIntType;
+import org.apache.flink.cdc.common.types.TimeType;
import org.apache.flink.cdc.common.types.TimestampType;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
@@ -226,4 +227,150 @@ class StarRocksMetadataApplierTest {
.build();
Assertions.assertThat(actualTable).isEqualTo(expectTable);
}
+
+ @Test
+ void testCreateTableWithTimeType() throws Exception {
+ TableId tableId = TableId.parse("test.time_table");
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("id", new IntType())
+ .physicalColumn("start_time", new TimeType())
+ .physicalColumn(
+ "end_time", new TimeType(3)) // TIME with
millisecond precision
+ .primaryKey("id")
+ .build();
+ CreateTableEvent createTableEvent = new CreateTableEvent(tableId,
schema);
+ metadataApplier.applySchemaChange(createTableEvent);
+
+ StarRocksTable actualTable =
+ catalog.getTable(tableId.getSchemaName(),
tableId.getTableName()).orElse(null);
+ Assertions.assertThat(actualTable).isNotNull();
+
+ List<StarRocksColumn> columns = new ArrayList<>();
+ columns.add(
+ new StarRocksColumn.Builder()
+ .setColumnName("id")
+ .setOrdinalPosition(0)
+ .setDataType("int")
+ .setNullable(true)
+ .build());
+ columns.add(
+ new StarRocksColumn.Builder()
+ .setColumnName("start_time")
+ .setOrdinalPosition(1)
+ .setDataType("varchar")
+ .setNullable(true)
+ .setColumnSize(8)
+ .build());
+ columns.add(
+ new StarRocksColumn.Builder()
+ .setColumnName("end_time")
+ .setOrdinalPosition(2)
+ .setDataType("varchar")
+ .setNullable(true)
+ .setColumnSize(12)
+ .build());
+ StarRocksTable expectTable =
+ new StarRocksTable.Builder()
+ .setDatabaseName(tableId.getSchemaName())
+ .setTableName(tableId.getTableName())
+ .setTableType(StarRocksTable.TableType.PRIMARY_KEY)
+ .setColumns(columns)
+ .setTableKeys(schema.primaryKeys())
+ .setDistributionKeys(schema.primaryKeys())
+ .setNumBuckets(10)
+
.setTableProperties(Collections.singletonMap("replication_num", "5"))
+ .build();
+ Assertions.assertThat(actualTable).isEqualTo(expectTable);
+ }
+
+ @Test
+ void testAddTimeTypeColumn() throws Exception {
+ TableId tableId = TableId.parse("test.add_time_column");
+ Schema schema =
+ Schema.newBuilder().physicalColumn("id", new
IntType()).primaryKey("id").build();
+ CreateTableEvent createTableEvent = new CreateTableEvent(tableId,
schema);
+ metadataApplier.applySchemaChange(createTableEvent);
+
+ // Add TIME type column through schema evolution
+ AddColumnEvent addColumnEvent =
+ new AddColumnEvent(
+ tableId,
+ Arrays.asList(
+ new AddColumnEvent.ColumnWithPosition(
+ Column.physicalColumn("duration", new
TimeType())),
+ new AddColumnEvent.ColumnWithPosition(
+
Column.physicalColumn("precision_time", new TimeType(3)))));
+ metadataApplier.applySchemaChange(addColumnEvent);
+
+ StarRocksTable actualTable =
+ catalog.getTable(tableId.getSchemaName(),
tableId.getTableName()).orElse(null);
+ Assertions.assertThat(actualTable).isNotNull();
+
+ List<StarRocksColumn> columns = new ArrayList<>();
+ columns.add(
+ new StarRocksColumn.Builder()
+ .setColumnName("id")
+ .setOrdinalPosition(0)
+ .setDataType("int")
+ .setNullable(true)
+ .build());
+ columns.add(
+ new StarRocksColumn.Builder()
+ .setColumnName("duration")
+ .setOrdinalPosition(1)
+ .setDataType("varchar")
+ .setNullable(true)
+ .setColumnSize(8)
+ .build());
+ columns.add(
+ new StarRocksColumn.Builder()
+ .setColumnName("precision_time")
+ .setOrdinalPosition(2)
+ .setDataType("varchar")
+ .setNullable(true)
+ .setColumnSize(12)
+ .build());
+ StarRocksTable expectTable =
+ new StarRocksTable.Builder()
+ .setDatabaseName(tableId.getSchemaName())
+ .setTableName(tableId.getTableName())
+ .setTableType(StarRocksTable.TableType.PRIMARY_KEY)
+ .setColumns(columns)
+ .setTableKeys(schema.primaryKeys())
+ .setDistributionKeys(schema.primaryKeys())
+ .setNumBuckets(10)
+
.setTableProperties(Collections.singletonMap("replication_num", "5"))
+ .build();
+ Assertions.assertThat(actualTable).isEqualTo(expectTable);
+ }
+
+ @Test
+ void testTimeTypeWithDifferentPrecisions() throws Exception {
+ TableId tableId = TableId.parse("test.time_precision_table");
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("id", new IntType())
+ .physicalColumn("time_default", new TimeType()) //
Default precision
+ .physicalColumn("time_0", new TimeType(0)) // Second
precision
+ .physicalColumn("time_3", new TimeType(3)) //
Millisecond precision
+ .physicalColumn("time_max", new TimeType(3)) //
Example precision 3
+ .primaryKey("id")
+ .build();
+ CreateTableEvent createTableEvent = new CreateTableEvent(tableId,
schema);
+ metadataApplier.applySchemaChange(createTableEvent);
+
+ StarRocksTable actualTable =
+ catalog.getTable(tableId.getSchemaName(),
tableId.getTableName()).orElse(null);
+ Assertions.assertThat(actualTable).isNotNull();
+
+ // Verify all TIME columns are correctly mapped to StarRocks VARCHAR
type
+ // since StarRocks doesn't support TIME type
+ List<String> timeColumns = Arrays.asList("time_default", "time_0",
"time_3", "time_max");
+ for (StarRocksColumn column : actualTable.getColumns()) {
+ if (timeColumns.contains(column.getColumnName())) {
+
Assertions.assertThat(column.getDataType().toLowerCase()).isEqualTo("varchar");
+ }
+ }
+ }
}