This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 1969f83b2 [test] Improve pipeline connector test coverage i.e. 
doris/postgres/fluss (#4266)
1969f83b2 is described below

commit 1969f83b29b18270892d35bd6fc8d2197b700946
Author: Leonard Xu <[email protected]>
AuthorDate: Tue Feb 24 18:05:00 2026 +0800

    [test] Improve pipeline connector test coverage i.e. doris/postgres/fluss 
(#4266)
    
    * [test][pipeline-connector/starrocks] Add unit tests for StarRocksUtils
    
    * [test][pipeline-connector/postgres] Add unit tests for PostgresTypeUtils 
and PostgresSchemaUtils
    
    * [test][pipeline-connector/fluss] Add unit tests for FlussConversions
---
 .../fluss/utils/FlussConversionsTest.java          | 400 +++++++++++++++++++
 .../postgres/utils/PostgresSchemaUtilsTest.java    | 134 +++++++
 .../postgres/utils/PostgresTypeUtilsTest.java      | 302 +++++++++++++++
 .../starrocks/sink/StarRocksUtilsTest.java         | 429 +++++++++++++++++++++
 4 files changed, 1265 insertions(+)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversionsTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversionsTest.java
new file mode 100644
index 000000000..2f3487528
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversionsTest.java
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.fluss.utils;
+
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataTypes;
+
+import com.alibaba.fluss.metadata.TableDescriptor;
+import com.alibaba.fluss.types.RowType;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link FlussConversions}. */
+class FlussConversionsTest {
+
+    // 
--------------------------------------------------------------------------------------------
+    // Tests for toFlussSchema
+    // 
--------------------------------------------------------------------------------------------
+
+    @Test
+    void testToFlussSchemaBasic() {
+        Schema cdcSchema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT().notNull())
+                        .physicalColumn("name", DataTypes.STRING())
+                        .primaryKey("id")
+                        .build();
+
+        com.alibaba.fluss.metadata.Schema flussSchema = 
FlussConversions.toFlussSchema(cdcSchema);
+
+        assertThat(flussSchema.getColumnNames()).containsExactly("id", "name");
+        
assertThat(flussSchema.getPrimaryKeyColumnNames()).containsExactly("id");
+    }
+
+    @Test
+    void testToFlussSchemaWithoutPrimaryKey() {
+        Schema cdcSchema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT())
+                        .physicalColumn("name", DataTypes.STRING())
+                        .build();
+
+        com.alibaba.fluss.metadata.Schema flussSchema = 
FlussConversions.toFlussSchema(cdcSchema);
+
+        assertThat(flussSchema.getColumnNames()).hasSize(2);
+        assertThat(flussSchema.getPrimaryKeyColumnNames()).isEmpty();
+    }
+
+    @Test
+    void testToFlussSchemaTypeConversions() {
+        Schema cdcSchema =
+                Schema.newBuilder()
+                        .physicalColumn("bool_col", DataTypes.BOOLEAN())
+                        .physicalColumn("tinyint_col", DataTypes.TINYINT())
+                        .physicalColumn("smallint_col", DataTypes.SMALLINT())
+                        .physicalColumn("int_col", DataTypes.INT())
+                        .physicalColumn("bigint_col", DataTypes.BIGINT())
+                        .physicalColumn("float_col", DataTypes.FLOAT())
+                        .physicalColumn("double_col", DataTypes.DOUBLE())
+                        .physicalColumn("decimal_col", DataTypes.DECIMAL(10, 
2))
+                        .physicalColumn("char_col", DataTypes.CHAR(10))
+                        .physicalColumn("varchar_col", DataTypes.VARCHAR(100))
+                        .physicalColumn("binary_col", DataTypes.BINARY(16))
+                        .physicalColumn("varbinary_col", 
DataTypes.VARBINARY(100))
+                        .physicalColumn("date_col", DataTypes.DATE())
+                        .physicalColumn("time_col", DataTypes.TIME())
+                        .physicalColumn("timestamp_col", 
DataTypes.TIMESTAMP(3))
+                        .physicalColumn("ltz_col", DataTypes.TIMESTAMP_LTZ(6))
+                        .build();
+
+        com.alibaba.fluss.metadata.Schema flussSchema = 
FlussConversions.toFlussSchema(cdcSchema);
+
+        RowType rowType = flussSchema.getRowType();
+        assertThat(rowType.getFieldCount()).isEqualTo(16);
+
+        
assertThat(rowType.getTypeAt(0)).isEqualTo(com.alibaba.fluss.types.DataTypes.BOOLEAN());
+        
assertThat(rowType.getTypeAt(1)).isEqualTo(com.alibaba.fluss.types.DataTypes.TINYINT());
+        
assertThat(rowType.getTypeAt(2)).isEqualTo(com.alibaba.fluss.types.DataTypes.SMALLINT());
+        
assertThat(rowType.getTypeAt(3)).isEqualTo(com.alibaba.fluss.types.DataTypes.INT());
+        
assertThat(rowType.getTypeAt(4)).isEqualTo(com.alibaba.fluss.types.DataTypes.BIGINT());
+        
assertThat(rowType.getTypeAt(5)).isEqualTo(com.alibaba.fluss.types.DataTypes.FLOAT());
+        
assertThat(rowType.getTypeAt(6)).isEqualTo(com.alibaba.fluss.types.DataTypes.DOUBLE());
+        assertThat(rowType.getTypeAt(7))
+                .isEqualTo(com.alibaba.fluss.types.DataTypes.DECIMAL(10, 2));
+        
assertThat(rowType.getTypeAt(8)).isEqualTo(com.alibaba.fluss.types.DataTypes.CHAR(10));
+        // VarChar maps to StringType in Fluss
+        
assertThat(rowType.getTypeAt(9)).isEqualTo(com.alibaba.fluss.types.DataTypes.STRING());
+        
assertThat(rowType.getTypeAt(10)).isEqualTo(com.alibaba.fluss.types.DataTypes.BINARY(16));
+        // VarBinary maps to BytesType in Fluss
+        
assertThat(rowType.getTypeAt(11)).isEqualTo(com.alibaba.fluss.types.DataTypes.BYTES());
+        
assertThat(rowType.getTypeAt(12)).isEqualTo(com.alibaba.fluss.types.DataTypes.DATE());
+        
assertThat(rowType.getTypeAt(13)).isEqualTo(com.alibaba.fluss.types.DataTypes.TIME());
+        
assertThat(rowType.getTypeAt(14)).isEqualTo(com.alibaba.fluss.types.DataTypes.TIMESTAMP(3));
+        assertThat(rowType.getTypeAt(15))
+                .isEqualTo(com.alibaba.fluss.types.DataTypes.TIMESTAMP_LTZ(6));
+    }
+
+    @Test
+    void testToFlussSchemaTypeNullability() {
+        // Verify that nullable flag is correctly propagated from CDC type to 
Fluss type
+        Schema cdcSchema =
+                Schema.newBuilder()
+                        .physicalColumn("nullable_col", DataTypes.INT())
+                        .physicalColumn("not_null_col", 
DataTypes.INT().notNull())
+                        .build();
+
+        com.alibaba.fluss.metadata.Schema flussSchema = 
FlussConversions.toFlussSchema(cdcSchema);
+
+        RowType rowType = flussSchema.getRowType();
+        assertThat(rowType.getTypeAt(0).isNullable()).isTrue();
+        assertThat(rowType.getTypeAt(1).isNullable()).isFalse();
+    }
+
+    @Test
+    void testToFlussSchemaUnsupportedZonedTimestamp() {
+        Schema cdcSchema =
+                Schema.newBuilder()
+                        .physicalColumn(
+                                "ts", new 
org.apache.flink.cdc.common.types.ZonedTimestampType())
+                        .build();
+
+        assertThatThrownBy(() -> FlussConversions.toFlussSchema(cdcSchema))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining("Unsupported data type in fluss");
+    }
+
+    @Test
+    void testToFlussSchemaUnsupportedArrayType() {
+        Schema cdcSchema =
+                Schema.newBuilder().physicalColumn("arr", 
DataTypes.ARRAY(DataTypes.INT())).build();
+
+        assertThatThrownBy(() -> FlussConversions.toFlussSchema(cdcSchema))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining("Unsupported data type in fluss");
+    }
+
+    @Test
+    void testToFlussSchemaUnsupportedMapType() {
+        Schema cdcSchema =
+                Schema.newBuilder()
+                        .physicalColumn("map", 
DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()))
+                        .build();
+
+        assertThatThrownBy(() -> FlussConversions.toFlussSchema(cdcSchema))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining("Unsupported data type in fluss");
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Tests for toFlussTable
+    // 
--------------------------------------------------------------------------------------------
+
+    @Test
+    void testToFlussTableBasic() {
+        Schema cdcSchema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT().notNull())
+                        .physicalColumn("name", DataTypes.STRING())
+                        .primaryKey("id")
+                        .build();
+
+        TableDescriptor descriptor =
+                FlussConversions.toFlussTable(cdcSchema, null, null, 
Collections.emptyMap());
+
+        assertThat(descriptor.getSchema().getColumnNames()).hasSize(2);
+        
assertThat(descriptor.getSchema().getPrimaryKeyColumnNames()).containsExactly("id");
+    }
+
+    @Test
+    void testToFlussTableWithBucketKeys() {
+        Schema cdcSchema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT().notNull())
+                        .physicalColumn("name", DataTypes.STRING())
+                        .primaryKey("id")
+                        .build();
+
+        TableDescriptor descriptor =
+                FlussConversions.toFlussTable(
+                        cdcSchema, Arrays.asList("id"), 8, 
Collections.emptyMap());
+
+        assertThat(descriptor.getBucketKeys()).containsExactly("id");
+    }
+
+    @Test
+    void testToFlussTableWithProperties() {
+        Schema cdcSchema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT().notNull())
+                        .primaryKey("id")
+                        .build();
+
+        Map<String, String> properties = new HashMap<>();
+        properties.put("table.replication.factor", "3");
+
+        TableDescriptor descriptor =
+                FlussConversions.toFlussTable(cdcSchema, null, null, 
properties);
+
+        
assertThat(descriptor.getProperties()).containsEntry("table.replication.factor",
 "3");
+    }
+
+    @Test
+    void testToFlussTableWithComment() {
+        Schema cdcSchema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT().notNull())
+                        .primaryKey("id")
+                        .comment("my test table")
+                        .build();
+
+        TableDescriptor descriptor =
+                FlussConversions.toFlussTable(cdcSchema, null, null, 
Collections.emptyMap());
+
+        assertThat(descriptor.getComment()).hasValue("my test table");
+    }
+
+    @Test
+    void testToFlussTableWithPartitionKeys() {
+        // Partition keys should be propagated to Fluss TableDescriptor
+        Schema cdcSchema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT().notNull())
+                        .physicalColumn("dt", DataTypes.STRING())
+                        .primaryKey("id", "dt")
+                        .partitionKey("dt")
+                        .build();
+
+        TableDescriptor descriptor =
+                FlussConversions.toFlussTable(cdcSchema, null, null, 
Collections.emptyMap());
+
+        assertThat(descriptor.getPartitionKeys()).containsExactly("dt");
+    }
+
+    @Test
+    void testToFlussTableDefaultBucketKeysExcludePartitionKeys() {
+        // When bucket keys are null, default = (primary keys - partition keys)
+        Schema cdcSchema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT().notNull())
+                        .physicalColumn("dt", DataTypes.STRING().notNull())
+                        .primaryKey("id", "dt")
+                        .partitionKey("dt")
+                        .build();
+
+        TableDescriptor descriptor =
+                FlussConversions.toFlussTable(cdcSchema, null, null, 
Collections.emptyMap());
+
+        // "dt" is partition key and should be removed from bucket keys
+        assertThat(descriptor.getBucketKeys()).containsExactly("id");
+    }
+
+    @Test
+    void testToFlussTableDefaultBucketKeysFromPrimaryKeys() {
+        Schema cdcSchema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT().notNull())
+                        .physicalColumn("name", DataTypes.STRING())
+                        .primaryKey("id")
+                        .build();
+
+        // Pass empty bucket keys so it defaults to primary keys
+        TableDescriptor descriptor =
+                FlussConversions.toFlussTable(
+                        cdcSchema, Collections.emptyList(), null, 
Collections.emptyMap());
+
+        assertThat(descriptor.getBucketKeys()).containsExactly("id");
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Tests for sameCdcColumnsIgnoreCommentAndDefaultValue
+    // 
--------------------------------------------------------------------------------------------
+
+    @Test
+    void testSameCdcColumnsWithSameColumns() {
+        Schema schema1 =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT())
+                        .physicalColumn("name", DataTypes.STRING())
+                        .build();
+
+        Schema schema2 =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT())
+                        .physicalColumn("name", DataTypes.STRING())
+                        .build();
+
+        
assertThat(FlussConversions.sameCdcColumnsIgnoreCommentAndDefaultValue(schema1, 
schema2))
+                .isTrue();
+    }
+
+    @Test
+    void testSameCdcColumnsWithDifferentColumnNames() {
+        Schema schema1 =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT())
+                        .physicalColumn("name", DataTypes.STRING())
+                        .build();
+
+        Schema schema2 =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT())
+                        .physicalColumn("age", DataTypes.INT())
+                        .build();
+
+        
assertThat(FlussConversions.sameCdcColumnsIgnoreCommentAndDefaultValue(schema1, 
schema2))
+                .isFalse();
+    }
+
+    @Test
+    void testSameCdcColumnsWithDifferentTypes() {
+        Schema schema1 =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT())
+                        .physicalColumn("name", DataTypes.STRING())
+                        .build();
+
+        Schema schema2 =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.BIGINT())
+                        .physicalColumn("name", DataTypes.STRING())
+                        .build();
+
+        
assertThat(FlussConversions.sameCdcColumnsIgnoreCommentAndDefaultValue(schema1, 
schema2))
+                .isFalse();
+    }
+
+    @Test
+    void testSameCdcColumnsWithDifferentColumnCount() {
+        Schema schema1 =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT())
+                        .physicalColumn("name", DataTypes.STRING())
+                        .build();
+
+        Schema schema2 = Schema.newBuilder().physicalColumn("id", 
DataTypes.INT()).build();
+
+        
assertThat(FlussConversions.sameCdcColumnsIgnoreCommentAndDefaultValue(schema1, 
schema2))
+                .isFalse();
+    }
+
+    @Test
+    void testSameCdcColumnsIgnoresComment() {
+        Schema schema1 =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT(), "comment1")
+                        .physicalColumn("name", DataTypes.STRING())
+                        .build();
+
+        Schema schema2 =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT(), "different 
comment")
+                        .physicalColumn("name", DataTypes.STRING())
+                        .build();
+
+        // Should be true because comments are ignored
+        
assertThat(FlussConversions.sameCdcColumnsIgnoreCommentAndDefaultValue(schema1, 
schema2))
+                .isTrue();
+    }
+
+    @Test
+    void testSameCdcColumnsIgnoresDefaultValue() {
+        // Default values should be ignored when comparing schemas
+        Schema schema1 =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT(), null, "0")
+                        .physicalColumn("name", DataTypes.STRING())
+                        .build();
+
+        Schema schema2 =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT(), null, "100")
+                        .physicalColumn("name", DataTypes.STRING())
+                        .build();
+
+        
assertThat(FlussConversions.sameCdcColumnsIgnoreCommentAndDefaultValue(schema1, 
schema2))
+                .isTrue();
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtilsTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtilsTest.java
new file mode 100644
index 000000000..f0437061e
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtilsTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.utils;
+
+import org.apache.flink.cdc.common.event.TableId;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link PostgresSchemaUtils}. */
+class PostgresSchemaUtilsTest {
+
+    // 
--------------------------------------------------------------------------------------------
+    // Tests for quote
+    // 
--------------------------------------------------------------------------------------------
+
+    @Test
+    void testQuote() {
+        
assertThat(PostgresSchemaUtils.quote("my_table")).isEqualTo("\"my_table\"");
+    }
+
+    @Test
+    void testQuoteWithSpecialCharacters() {
+        
assertThat(PostgresSchemaUtils.quote("my-table.name")).isEqualTo("\"my-table.name\"");
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Tests for toDbzTableId
+    // 
--------------------------------------------------------------------------------------------
+
+    @Test
+    void testToDbzTableIdWithNamespaceAndSchema() {
+        TableId cdcTableId = TableId.tableId("my_namespace", "my_schema", 
"my_table");
+        io.debezium.relational.TableId dbzTableId = 
PostgresSchemaUtils.toDbzTableId(cdcTableId);
+
+        assertThat(dbzTableId.catalog()).isEqualTo("my_namespace");
+        assertThat(dbzTableId.schema()).isEqualTo("my_schema");
+        assertThat(dbzTableId.table()).isEqualTo("my_table");
+    }
+
+    @Test
+    void testToDbzTableIdWithSchemaOnly() {
+        TableId cdcTableId = TableId.tableId("my_schema", "my_table");
+        io.debezium.relational.TableId dbzTableId = 
PostgresSchemaUtils.toDbzTableId(cdcTableId);
+
+        assertThat(dbzTableId.schema()).isEqualTo("my_schema");
+        assertThat(dbzTableId.table()).isEqualTo("my_table");
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Tests for toCdcTableId
+    // 
--------------------------------------------------------------------------------------------
+
+    @Test
+    void testToCdcTableIdSimple() {
+        io.debezium.relational.TableId dbzTableId =
+                new io.debezium.relational.TableId(null, "public", "users");
+        TableId cdcTableId = PostgresSchemaUtils.toCdcTableId(dbzTableId);
+
+        assertThat(cdcTableId.getSchemaName()).isEqualTo("public");
+        assertThat(cdcTableId.getTableName()).isEqualTo("users");
+    }
+
+    @Test
+    void testToCdcTableIdWithNullSchema() {
+        io.debezium.relational.TableId dbzTableId =
+                new io.debezium.relational.TableId(null, null, "users");
+        TableId cdcTableId = PostgresSchemaUtils.toCdcTableId(dbzTableId);
+
+        assertThat(cdcTableId.getTableName()).isEqualTo("users");
+    }
+
+    @Test
+    void testToCdcTableIdWithDatabaseIncluded() {
+        io.debezium.relational.TableId dbzTableId =
+                new io.debezium.relational.TableId(null, "public", "users");
+        TableId cdcTableId = PostgresSchemaUtils.toCdcTableId(dbzTableId, 
"my_db", true);
+
+        assertThat(cdcTableId.getNamespace()).isEqualTo("my_db");
+        assertThat(cdcTableId.getSchemaName()).isEqualTo("public");
+        assertThat(cdcTableId.getTableName()).isEqualTo("users");
+    }
+
+    @Test
+    void testToCdcTableIdWithDatabaseNotIncluded() {
+        io.debezium.relational.TableId dbzTableId =
+                new io.debezium.relational.TableId(null, "public", "users");
+        TableId cdcTableId = PostgresSchemaUtils.toCdcTableId(dbzTableId, 
"my_db", false);
+
+        // Database not included, should use schema and table only
+        assertThat(cdcTableId.getSchemaName()).isEqualTo("public");
+        assertThat(cdcTableId.getTableName()).isEqualTo("users");
+    }
+
+    @Test
+    void testToCdcTableIdWithNullDatabaseName() {
+        io.debezium.relational.TableId dbzTableId =
+                new io.debezium.relational.TableId(null, "public", "users");
+        TableId cdcTableId = PostgresSchemaUtils.toCdcTableId(dbzTableId, 
null, true);
+
+        // Null database name, should fall back to schema + table
+        assertThat(cdcTableId.getSchemaName()).isEqualTo("public");
+        assertThat(cdcTableId.getTableName()).isEqualTo("users");
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Tests for round-trip conversion
+    // 
--------------------------------------------------------------------------------------------
+
+    @Test
+    void testRoundTripConversion() {
+        TableId original = TableId.tableId("my_namespace", "my_schema", 
"my_table");
+        io.debezium.relational.TableId dbzId = 
PostgresSchemaUtils.toDbzTableId(original);
+        TableId roundTripped = PostgresSchemaUtils.toCdcTableId(dbzId, 
"my_namespace", true);
+
+        assertThat(roundTripped).isEqualTo(original);
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtilsTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtilsTest.java
new file mode 100644
index 000000000..fda181169
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtilsTest.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.utils;
+
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.table.types.logical.DecimalType;
+
+import io.debezium.config.CommonConnectorConfig;
+import io.debezium.connector.postgresql.PostgresConnectorConfig;
+import io.debezium.jdbc.JdbcValueConverters;
+import io.debezium.jdbc.TemporalPrecisionMode;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link PostgresTypeUtils}. */
+class PostgresTypeUtilsTest {
+
+    // 
--------------------------------------------------------------------------------------------
+    // Tests for handleNumericWithDecimalMode
+    // 
--------------------------------------------------------------------------------------------
+
+    @Test
+    void testHandleNumericPreciseWithinRange() {
+        // precision=20 > DEFAULT_SCALE and <= MAX_PRECISION, should produce 
DECIMAL(20, 5)
+        DataType result =
+                PostgresTypeUtils.handleNumericWithDecimalMode(
+                        20, 5, JdbcValueConverters.DecimalMode.PRECISE);
+        assertThat(result).isEqualTo(DataTypes.DECIMAL(20, 5));
+    }
+
+    @Test
+    void testHandleNumericPreciseWithZeroPrecision() {
+        // precision=0 (no explicit precision), should fall back to 
(MAX_PRECISION, DEFAULT_SCALE)
+        DataType result =
+                PostgresTypeUtils.handleNumericWithDecimalMode(
+                        0, 0, JdbcValueConverters.DecimalMode.PRECISE);
+        assertThat(result)
+                .isEqualTo(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 
DecimalType.DEFAULT_SCALE));
+    }
+
+    @Test
+    void testHandleNumericPreciseBoundaryAtDefaultScale() {
+        // precision == DEFAULT_SCALE is NOT > DEFAULT_SCALE, so falls back to 
max
+        DataType result =
+                PostgresTypeUtils.handleNumericWithDecimalMode(
+                        DecimalType.DEFAULT_SCALE, 2, 
JdbcValueConverters.DecimalMode.PRECISE);
+        assertThat(result)
+                .isEqualTo(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 
DecimalType.DEFAULT_SCALE));
+    }
+
+    @Test
+    void testHandleNumericPreciseJustAboveDefaultScale() {
+        // precision = DEFAULT_SCALE + 1 is the smallest value where the 
condition
+        // (precision > DEFAULT_SCALE) becomes true, should use exact 
precision and scale
+        int precision = DecimalType.DEFAULT_SCALE + 1;
+        int scale = 0;
+        DataType result =
+                PostgresTypeUtils.handleNumericWithDecimalMode(
+                        precision, scale, 
JdbcValueConverters.DecimalMode.PRECISE);
+        assertThat(result).isEqualTo(DataTypes.DECIMAL(precision, scale));
+    }
+
+    @Test
+    void testHandleNumericPreciseExceedsMaxPrecision() {
+        // precision > MAX_PRECISION, should still fall back to max
+        DataType result =
+                PostgresTypeUtils.handleNumericWithDecimalMode(
+                        DecimalType.MAX_PRECISION + 1, 2, 
JdbcValueConverters.DecimalMode.PRECISE);
+        assertThat(result)
+                .isEqualTo(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 
DecimalType.DEFAULT_SCALE));
+    }
+
+    @Test
+    void testHandleNumericDoubleMode() {
+        DataType result =
+                PostgresTypeUtils.handleNumericWithDecimalMode(
+                        10, 5, JdbcValueConverters.DecimalMode.DOUBLE);
+        assertThat(result).isEqualTo(DataTypes.DOUBLE());
+    }
+
+    @Test
+    void testHandleNumericStringMode() {
+        DataType result =
+                PostgresTypeUtils.handleNumericWithDecimalMode(
+                        10, 5, JdbcValueConverters.DecimalMode.STRING);
+        assertThat(result).isEqualTo(DataTypes.STRING());
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Tests for handleBinaryWithBinaryMode
+    // 
--------------------------------------------------------------------------------------------
+
+    @Test
+    void testHandleBinaryBytesMode() {
+        DataType result =
+                PostgresTypeUtils.handleBinaryWithBinaryMode(
+                        CommonConnectorConfig.BinaryHandlingMode.BYTES);
+        assertThat(result).isEqualTo(DataTypes.BYTES());
+    }
+
+    @Test
+    void testHandleBinaryBase64Mode() {
+        DataType result =
+                PostgresTypeUtils.handleBinaryWithBinaryMode(
+                        CommonConnectorConfig.BinaryHandlingMode.BASE64);
+        assertThat(result).isEqualTo(DataTypes.STRING());
+    }
+
+    @Test
+    void testHandleBinaryHexMode() {
+        DataType result =
+                PostgresTypeUtils.handleBinaryWithBinaryMode(
+                        CommonConnectorConfig.BinaryHandlingMode.HEX);
+        assertThat(result).isEqualTo(DataTypes.STRING());
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Tests for handleMoneyWithDecimalMode
+    // 
--------------------------------------------------------------------------------------------
+
+    @Test
+    void testHandleMoneyPreciseMode() {
+        DataType result =
+                PostgresTypeUtils.handleMoneyWithDecimalMode(
+                        2, JdbcValueConverters.DecimalMode.PRECISE);
+        
assertThat(result).isEqualTo(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 2));
+    }
+
+    @Test
+    void testHandleMoneyDoubleMode() {
+        DataType result =
+                PostgresTypeUtils.handleMoneyWithDecimalMode(
+                        2, JdbcValueConverters.DecimalMode.DOUBLE);
+        assertThat(result).isEqualTo(DataTypes.DOUBLE());
+    }
+
+    @Test
+    void testHandleMoneyStringMode() {
+        DataType result =
+                PostgresTypeUtils.handleMoneyWithDecimalMode(
+                        2, JdbcValueConverters.DecimalMode.STRING);
+        assertThat(result).isEqualTo(DataTypes.STRING());
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Tests for handleIntervalWithIntervalHandlingMode
+    // 
--------------------------------------------------------------------------------------------
+
+    @Test
+    void testHandleIntervalNumericMode() {
+        DataType result =
+                PostgresTypeUtils.handleIntervalWithIntervalHandlingMode(
+                        PostgresConnectorConfig.IntervalHandlingMode.NUMERIC);
+        assertThat(result).isEqualTo(DataTypes.BIGINT());
+    }
+
+    @Test
+    void testHandleIntervalStringMode() {
+        DataType result =
+                PostgresTypeUtils.handleIntervalWithIntervalHandlingMode(
+                        PostgresConnectorConfig.IntervalHandlingMode.STRING);
+        assertThat(result).isEqualTo(DataTypes.STRING());
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Tests for handleDateWithTemporalMode
+    // 
--------------------------------------------------------------------------------------------
+
+    @Test
+    void testHandleDateAdaptiveMode() {
+        DataType result =
+                
PostgresTypeUtils.handleDateWithTemporalMode(TemporalPrecisionMode.ADAPTIVE);
+        assertThat(result).isEqualTo(DataTypes.DATE());
+    }
+
+    @Test
+    void testHandleDateAdaptiveTimeMicrosecondsMode() {
+        DataType result =
+                PostgresTypeUtils.handleDateWithTemporalMode(
+                        TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS);
+        assertThat(result).isEqualTo(DataTypes.DATE());
+    }
+
+    @Test
+    void testHandleDateConnectMode() {
+        DataType result =
+                
PostgresTypeUtils.handleDateWithTemporalMode(TemporalPrecisionMode.CONNECT);
+        assertThat(result).isEqualTo(DataTypes.DATE());
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Tests for handleTimeWithTemporalMode
+    // 
--------------------------------------------------------------------------------------------
+
+    @Test
+    void testHandleTimeAdaptiveMode() {
+        DataType result =
+                
PostgresTypeUtils.handleTimeWithTemporalMode(TemporalPrecisionMode.ADAPTIVE, 6);
+        assertThat(result).isEqualTo(DataTypes.TIME(6));
+    }
+
+    @Test
+    void testHandleTimeAdaptiveTimeMicrosecondsMode() {
+        DataType result =
+                PostgresTypeUtils.handleTimeWithTemporalMode(
+                        TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS, 3);
+        assertThat(result).isEqualTo(DataTypes.TIME(3));
+    }
+
+    @Test
+    void testHandleTimeConnectMode() {
+        DataType result =
+                
PostgresTypeUtils.handleTimeWithTemporalMode(TemporalPrecisionMode.CONNECT, 0);
+        assertThat(result).isEqualTo(DataTypes.TIME(0));
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Tests for handleTimestampWithTemporalMode
+    // 
--------------------------------------------------------------------------------------------
+
+    @Test
+    void testHandleTimestampAdaptiveMode() {
+        DataType result =
+                PostgresTypeUtils.handleTimestampWithTemporalMode(
+                        TemporalPrecisionMode.ADAPTIVE, 6);
+        assertThat(result).isEqualTo(DataTypes.TIMESTAMP(6));
+    }
+
+    @Test
+    void testHandleTimestampAdaptiveTimeMicrosecondsMode() {
+        DataType result =
+                PostgresTypeUtils.handleTimestampWithTemporalMode(
+                        TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS, 3);
+        assertThat(result).isEqualTo(DataTypes.TIMESTAMP(3));
+    }
+
+    @Test
+    void testHandleTimestampConnectMode() {
+        DataType result =
+                
PostgresTypeUtils.handleTimestampWithTemporalMode(TemporalPrecisionMode.CONNECT,
 0);
+        assertThat(result).isEqualTo(DataTypes.TIMESTAMP(0));
+    }
+
+    @Test
+    void testHandleTimestampScalePreserved() {
+        // Verify that scale (fractional seconds precision) is correctly 
passed through
+        for (int scale = 0; scale <= 6; scale++) {
+            DataType result =
+                    PostgresTypeUtils.handleTimestampWithTemporalMode(
+                            TemporalPrecisionMode.ADAPTIVE, scale);
+            assertThat(result).isEqualTo(DataTypes.TIMESTAMP(scale));
+        }
+    }
+
+    @Test
+    void testHandleTimeScalePreserved() {
+        // Verify that scale (fractional seconds precision) is correctly 
passed through
+        for (int scale = 0; scale <= 6; scale++) {
+            DataType result =
+                    PostgresTypeUtils.handleTimeWithTemporalMode(
+                            TemporalPrecisionMode.ADAPTIVE, scale);
+            assertThat(result).isEqualTo(DataTypes.TIME(scale));
+        }
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Tests for handleHstoreWithHstoreMode
+    // 
--------------------------------------------------------------------------------------------
+
+    @Test
+    void testHandleHstoreJsonMode() {
+        DataType result =
+                PostgresTypeUtils.handleHstoreWithHstoreMode(
+                        PostgresConnectorConfig.HStoreHandlingMode.JSON);
+        assertThat(result).isEqualTo(DataTypes.STRING());
+    }
+
+    @Test
+    void testHandleHstoreMapMode() {
+        DataType result =
+                PostgresTypeUtils.handleHstoreWithHstoreMode(
+                        PostgresConnectorConfig.HStoreHandlingMode.MAP);
+        assertThat(result).isEqualTo(DataTypes.MAP(DataTypes.STRING(), 
DataTypes.STRING()));
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtilsTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtilsTest.java
new file mode 100644
index 000000000..3e93c9fd5
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtilsTest.java
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.starrocks.sink;
+
+import org.apache.flink.cdc.common.data.DateData;
+import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.data.TimestampData;
+import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.BigIntType;
+import org.apache.flink.cdc.common.types.BooleanType;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.DateType;
+import org.apache.flink.cdc.common.types.DoubleType;
+import org.apache.flink.cdc.common.types.FloatType;
+import org.apache.flink.cdc.common.types.IntType;
+import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
+import org.apache.flink.cdc.common.types.SmallIntType;
+import org.apache.flink.cdc.common.types.TimestampType;
+import org.apache.flink.cdc.common.types.TinyIntType;
+import org.apache.flink.cdc.common.types.VarCharType;
+import org.apache.flink.cdc.common.types.ZonedTimestampType;
+import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+
+import com.starrocks.connector.flink.catalog.StarRocksColumn;
+import com.starrocks.connector.flink.catalog.StarRocksTable;
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link StarRocksUtils}. */
+class StarRocksUtilsTest {
+
+    // 
--------------------------------------------------------------------------------------------
+    // Tests for toStarRocksTable
+    // 
--------------------------------------------------------------------------------------------
+
+    @Test
+    void testToStarRocksTableBasic() {
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.BIGINT().notNull())
+                        .physicalColumn("name", DataTypes.VARCHAR(100))
+                        .physicalColumn("age", DataTypes.INT())
+                        .primaryKey("id")
+                        .build();
+
+        TableId tableId = TableId.tableId("my_db", "my_table");
+        TableCreateConfig tableCreateConfig = new TableCreateConfig(null, 
Collections.emptyMap());
+
+        StarRocksTable table = StarRocksUtils.toStarRocksTable(tableId, 
schema, tableCreateConfig);
+
+        assertThat(table.getDatabaseName()).isEqualTo("my_db");
+        assertThat(table.getTableName()).isEqualTo("my_table");
+        
assertThat(table.getTableType()).isEqualTo(StarRocksTable.TableType.PRIMARY_KEY);
+        assertThat(table.getTableKeys()).hasValue(Arrays.asList("id"));
+        assertThat(table.getDistributionKeys()).hasValue(Arrays.asList("id"));
+
+        // Verify columns: primary key column should be first
+        List<StarRocksColumn> columns = table.getColumns();
+        assertThat(columns).hasSize(3);
+        assertThat(columns.get(0).getColumnName()).isEqualTo("id");
+        assertThat(columns.get(1).getColumnName()).isEqualTo("name");
+        assertThat(columns.get(2).getColumnName()).isEqualTo("age");
+    }
+
+    @Test
+    void testToStarRocksTablePrimaryKeyReordering() {
+        // Define schema where primary key columns are NOT at the front
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("name", DataTypes.VARCHAR(100))
+                        .physicalColumn("age", DataTypes.INT())
+                        .physicalColumn("id", DataTypes.BIGINT().notNull())
+                        .primaryKey("id")
+                        .build();
+
+        TableId tableId = TableId.tableId("db", "table");
+        TableCreateConfig config = new TableCreateConfig(null, 
Collections.emptyMap());
+
+        StarRocksTable table = StarRocksUtils.toStarRocksTable(tableId, 
schema, config);
+
+        // Primary key "id" should be reordered to the front
+        List<StarRocksColumn> columns = table.getColumns();
+        assertThat(columns).hasSize(3);
+        assertThat(columns.get(0).getColumnName()).isEqualTo("id");
+        assertThat(columns.get(1).getColumnName()).isEqualTo("name");
+        assertThat(columns.get(2).getColumnName()).isEqualTo("age");
+    }
+
+    @Test
+    void testToStarRocksTableCompositePrimaryKey() {
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("data", DataTypes.VARCHAR(200))
+                        .physicalColumn("id1", DataTypes.INT().notNull())
+                        .physicalColumn("id2", DataTypes.BIGINT().notNull())
+                        .primaryKey("id1", "id2")
+                        .build();
+
+        TableId tableId = TableId.tableId("db", "table");
+        TableCreateConfig config = new TableCreateConfig(null, 
Collections.emptyMap());
+
+        StarRocksTable table = StarRocksUtils.toStarRocksTable(tableId, 
schema, config);
+
+        // Both primary key columns should be at the front, in the order of 
primaryKeys definition
+        List<StarRocksColumn> columns = table.getColumns();
+        assertThat(columns).hasSize(3);
+        assertThat(columns.get(0).getColumnName()).isEqualTo("id1");
+        assertThat(columns.get(1).getColumnName()).isEqualTo("id2");
+        assertThat(columns.get(2).getColumnName()).isEqualTo("data");
+
+        assertThat(table.getTableKeys()).hasValue(Arrays.asList("id1", "id2"));
+        assertThat(table.getDistributionKeys()).hasValue(Arrays.asList("id1", 
"id2"));
+    }
+
+    @Test
+    void testToStarRocksTableNoPrimaryKeyThrowsException() {
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.BIGINT())
+                        .physicalColumn("name", DataTypes.VARCHAR(100))
+                        .build();
+
+        TableId tableId = TableId.tableId("db", "table");
+        TableCreateConfig config = new TableCreateConfig(null, 
Collections.emptyMap());
+
+        assertThatThrownBy(() -> StarRocksUtils.toStarRocksTable(tableId, 
schema, config))
+                .isInstanceOf(RuntimeException.class)
+                .hasMessageContaining("has no primary keys");
+    }
+
+    @Test
+    void testToStarRocksTableWithNumBuckets() {
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.BIGINT().notNull())
+                        .primaryKey("id")
+                        .build();
+
+        TableId tableId = TableId.tableId("db", "table");
+        TableCreateConfig config = new TableCreateConfig(10, 
Collections.emptyMap());
+
+        StarRocksTable table = StarRocksUtils.toStarRocksTable(tableId, 
schema, config);
+        assertThat(table.getNumBuckets()).hasValue(10);
+    }
+
+    @Test
+    void testToStarRocksTableWithProperties() {
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.BIGINT().notNull())
+                        .primaryKey("id")
+                        .build();
+
+        TableId tableId = TableId.tableId("db", "table");
+        Map<String, String> properties = new HashMap<>();
+        properties.put("replication_num", "3");
+        TableCreateConfig config = new TableCreateConfig(null, properties);
+
+        StarRocksTable table = StarRocksUtils.toStarRocksTable(tableId, 
schema, config);
+        assertThat(table.getProperties()).containsEntry("replication_num", 
"3");
+    }
+
+    @Test
+    void testToStarRocksTableWithComment() {
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.BIGINT().notNull())
+                        .primaryKey("id")
+                        .comment("test table comment")
+                        .build();
+
+        TableId tableId = TableId.tableId("db", "table");
+        TableCreateConfig config = new TableCreateConfig(null, 
Collections.emptyMap());
+
+        StarRocksTable table = StarRocksUtils.toStarRocksTable(tableId, 
schema, config);
+        assertThat(table.getComment()).hasValue("test table comment");
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Tests for convertInvalidTimestampDefaultValue
+    // 
--------------------------------------------------------------------------------------------
+
+    @Test
+    void testConvertInvalidTimestampDefaultValueWithNull() {
+        assertThat(StarRocksUtils.convertInvalidTimestampDefaultValue(null, 
new TimestampType()))
+                .isNull();
+    }
+
+    @Test
+    void testConvertInvalidTimestampDefaultValueWithValidValue() {
+        String validDefault = "2024-01-01 12:00:00";
+        assertThat(
+                        StarRocksUtils.convertInvalidTimestampDefaultValue(
+                                validDefault, new TimestampType()))
+                .isEqualTo(validDefault);
+    }
+
+    @Test
+    void testConvertInvalidTimestampDefaultValueWithInvalidTimestamp() {
+        assertThat(
+                        StarRocksUtils.convertInvalidTimestampDefaultValue(
+                                "0000-00-00 00:00:00", new TimestampType()))
+                .isEqualTo(StarRocksUtils.DEFAULT_DATETIME);
+    }
+
+    @Test
+    void testConvertInvalidTimestampDefaultValueWithLocalZonedTimestamp() {
+        assertThat(
+                        StarRocksUtils.convertInvalidTimestampDefaultValue(
+                                "0000-00-00 00:00:00", new 
LocalZonedTimestampType()))
+                .isEqualTo(StarRocksUtils.DEFAULT_DATETIME);
+    }
+
+    @Test
+    void testConvertInvalidTimestampDefaultValueWithZonedTimestamp() {
+        // ZonedTimestampType is also a timestamp type, should convert invalid 
values
+        assertThat(
+                        StarRocksUtils.convertInvalidTimestampDefaultValue(
+                                "0000-00-00 00:00:00", new 
ZonedTimestampType()))
+                .isEqualTo(StarRocksUtils.DEFAULT_DATETIME);
+    }
+
+    @Test
+    void testConvertInvalidTimestampDefaultValueWithNonTimestampType() {
+        // For non-timestamp types, the invalid datetime string should be 
returned as-is
+        assertThat(
+                        StarRocksUtils.convertInvalidTimestampDefaultValue(
+                                "0000-00-00 00:00:00", DataTypes.VARCHAR(100)))
+                .isEqualTo("0000-00-00 00:00:00");
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Tests for toStarRocksDataType
+    // 
--------------------------------------------------------------------------------------------
+
+    @Test
+    void testToStarRocksDataTypeNullableFlag() {
+        // nullable type should set isNullable=true
+        StarRocksColumn.Builder nullableBuilder =
+                new 
StarRocksColumn.Builder().setColumnName("col").setOrdinalPosition(0);
+        StarRocksUtils.toStarRocksDataType(new IntType(), false, 
nullableBuilder);
+        StarRocksColumn nullableCol = nullableBuilder.build();
+        assertThat(nullableCol.getDataType()).isEqualTo(StarRocksUtils.INT);
+        assertThat(nullableCol.isNullable()).isTrue();
+
+        // not-null type should set isNullable=false
+        StarRocksColumn.Builder notNullBuilder =
+                new 
StarRocksColumn.Builder().setColumnName("col").setOrdinalPosition(0);
+        StarRocksUtils.toStarRocksDataType(new IntType(false), false, 
notNullBuilder);
+        StarRocksColumn notNullCol = notNullBuilder.build();
+        assertThat(notNullCol.getDataType()).isEqualTo(StarRocksUtils.INT);
+        assertThat(notNullCol.isNullable()).isFalse();
+    }
+
+    @Test
+    void testToStarRocksDataTypeAllBasicTypes() {
+        // Verify data type mapping for all basic CDC types
+        assertStarRocksDataType(new BooleanType(), StarRocksUtils.BOOLEAN);
+        assertStarRocksDataType(new TinyIntType(), StarRocksUtils.TINYINT);
+        assertStarRocksDataType(new SmallIntType(), StarRocksUtils.SMALLINT);
+        assertStarRocksDataType(new IntType(), StarRocksUtils.INT);
+        assertStarRocksDataType(new BigIntType(), StarRocksUtils.BIGINT);
+        assertStarRocksDataType(new FloatType(), StarRocksUtils.FLOAT);
+        assertStarRocksDataType(new DoubleType(), StarRocksUtils.DOUBLE);
+        assertStarRocksDataType(new DateType(), StarRocksUtils.DATE);
+        assertStarRocksDataType(new TimestampType(), StarRocksUtils.DATETIME);
+        assertStarRocksDataType(new LocalZonedTimestampType(), 
StarRocksUtils.DATETIME);
+    }
+
+    @Test
+    void testToStarRocksDataTypeUnsupported() {
+        StarRocksColumn.Builder builder =
+                new 
StarRocksColumn.Builder().setColumnName("col").setOrdinalPosition(0);
+        assertThatThrownBy(
+                        () -> 
StarRocksUtils.toStarRocksDataType(DataTypes.BYTES(), false, builder))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining("Unsupported CDC data type");
+    }
+
+    private void assertStarRocksDataType(DataType cdcType, String 
expectedStarRocksType) {
+        StarRocksColumn.Builder builder =
+                new 
StarRocksColumn.Builder().setColumnName("col").setOrdinalPosition(0);
+        StarRocksUtils.toStarRocksDataType(cdcType, false, builder);
+        
assertThat(builder.build().getDataType()).isEqualTo(expectedStarRocksType);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Tests for createFieldGetter
+    // 
--------------------------------------------------------------------------------------------
+
+    @Test
+    void testCreateFieldGetterBoolean() {
+        RecordData.FieldGetter getter =
+                StarRocksUtils.createFieldGetter(new BooleanType(), 0, 
ZoneId.of("UTC"));
+
+        BinaryRecordDataGenerator generator =
+                new BinaryRecordDataGenerator(new DataType[] {new 
BooleanType()});
+        BinaryRecordData record = generator.generate(new Object[] {true});
+        assertThat(getter.getFieldOrNull(record)).isEqualTo(true);
+    }
+
+    @Test
+    void testCreateFieldGetterInt() {
+        RecordData.FieldGetter getter =
+                StarRocksUtils.createFieldGetter(new IntType(), 0, 
ZoneId.of("UTC"));
+
+        BinaryRecordDataGenerator generator =
+                new BinaryRecordDataGenerator(new DataType[] {new IntType()});
+        BinaryRecordData record = generator.generate(new Object[] {42});
+        assertThat(getter.getFieldOrNull(record)).isEqualTo(42);
+    }
+
+    @Test
+    void testCreateFieldGetterVarChar() {
+        RecordData.FieldGetter getter =
+                StarRocksUtils.createFieldGetter(new VarCharType(100), 0, 
ZoneId.of("UTC"));
+
+        BinaryRecordDataGenerator generator =
+                new BinaryRecordDataGenerator(new DataType[] {new 
VarCharType(100)});
+        BinaryRecordData record =
+                generator.generate(new Object[] 
{BinaryStringData.fromString("hello")});
+        assertThat(getter.getFieldOrNull(record)).isEqualTo("hello");
+    }
+
+    @Test
+    void testCreateFieldGetterDate() {
+        // Date field getter should format output as "yyyy-MM-dd"
+        RecordData.FieldGetter getter =
+                StarRocksUtils.createFieldGetter(new DateType(), 0, 
ZoneId.of("UTC"));
+
+        BinaryRecordDataGenerator generator =
+                new BinaryRecordDataGenerator(new DataType[] {new DateType()});
+        BinaryRecordData record =
+                generator.generate(
+                        new Object[] 
{DateData.fromLocalDate(LocalDate.of(2024, 1, 15))});
+        assertThat(getter.getFieldOrNull(record)).isEqualTo("2024-01-15");
+    }
+
+    @Test
+    void testCreateFieldGetterTimestamp() {
+        // Timestamp field getter should format output as "yyyy-MM-dd HH:mm:ss"
+        RecordData.FieldGetter getter =
+                StarRocksUtils.createFieldGetter(new TimestampType(3), 0, 
ZoneId.of("UTC"));
+
+        BinaryRecordDataGenerator generator =
+                new BinaryRecordDataGenerator(new DataType[] {new 
TimestampType(3)});
+        BinaryRecordData record =
+                generator.generate(
+                        new Object[] {
+                            TimestampData.fromLocalDateTime(
+                                    LocalDateTime.of(2024, 1, 15, 10, 30, 0))
+                        });
+        assertThat(getter.getFieldOrNull(record)).isEqualTo("2024-01-15 
10:30:00");
+    }
+
+    @Test
+    void testCreateFieldGetterLocalZonedTimestamp() {
+        // LocalZonedTimestamp field getter should convert to the specified 
zone
+        ZoneId zoneId = ZoneId.of("Asia/Shanghai");
+        RecordData.FieldGetter getter =
+                StarRocksUtils.createFieldGetter(new 
LocalZonedTimestampType(6), 0, zoneId);
+
+        BinaryRecordDataGenerator generator =
+                new BinaryRecordDataGenerator(new DataType[] {new 
LocalZonedTimestampType(6)});
+        BinaryRecordData record =
+                generator.generate(
+                        new Object[] {
+                            LocalZonedTimestampData.fromInstant(
+                                    LocalDateTime.of(2024, 1, 15, 10, 30, 0)
+                                            .toInstant(ZoneOffset.UTC))
+                        });
+        // UTC 10:30 -> Asia/Shanghai 18:30
+        assertThat(getter.getFieldOrNull(record)).isEqualTo("2024-01-15 
18:30:00");
+    }
+
+    @Test
+    void testCreateFieldGetterNullable() {
+        // For nullable types, null values should return null
+        RecordData.FieldGetter getter =
+                StarRocksUtils.createFieldGetter(DataTypes.INT(), 0, 
ZoneId.of("UTC"));
+
+        BinaryRecordDataGenerator generator =
+                new BinaryRecordDataGenerator(new DataType[] 
{DataTypes.INT()});
+        BinaryRecordData record = generator.generate(new Object[] {null});
+        assertThat(getter.getFieldOrNull(record)).isNull();
+    }
+
+    @Test
+    void testCreateFieldGetterUnsupportedType() {
+        assertThatThrownBy(
+                        () ->
+                                StarRocksUtils.createFieldGetter(
+                                        DataTypes.BYTES(), 0, 
ZoneId.of("UTC")))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining("Don't support data type");
+    }
+}

Reply via email to