This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 1969f83b2 [test] Improve pipeline connector test coverage i.e.
doris/postgres/fluss (#4266)
1969f83b2 is described below
commit 1969f83b29b18270892d35bd6fc8d2197b700946
Author: Leonard Xu <[email protected]>
AuthorDate: Tue Feb 24 18:05:00 2026 +0800
[test] Improve pipeline connector test coverage i.e. doris/postgres/fluss
(#4266)
* [test][pipeline-connector/starrocks] Add unit tests for StarRocksUtils
* [test][pipeline-connector/postgres] Add unit tests for PostgresTypeUtils
and PostgresSchemaUtils
* [test][pipeline-connector/fluss] Add unit tests for FlussConversions
---
.../fluss/utils/FlussConversionsTest.java | 400 +++++++++++++++++++
.../postgres/utils/PostgresSchemaUtilsTest.java | 134 +++++++
.../postgres/utils/PostgresTypeUtilsTest.java | 302 +++++++++++++++
.../starrocks/sink/StarRocksUtilsTest.java | 429 +++++++++++++++++++++
4 files changed, 1265 insertions(+)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversionsTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversionsTest.java
new file mode 100644
index 000000000..2f3487528
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/utils/FlussConversionsTest.java
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.fluss.utils;
+
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataTypes;
+
+import com.alibaba.fluss.metadata.TableDescriptor;
+import com.alibaba.fluss.types.RowType;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link FlussConversions}. */
+class FlussConversionsTest {
+
+ //
--------------------------------------------------------------------------------------------
+ // Tests for toFlussSchema
+ //
--------------------------------------------------------------------------------------------
+
+ @Test
+ void testToFlussSchemaBasic() {
+ Schema cdcSchema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT().notNull())
+ .physicalColumn("name", DataTypes.STRING())
+ .primaryKey("id")
+ .build();
+
+ com.alibaba.fluss.metadata.Schema flussSchema =
FlussConversions.toFlussSchema(cdcSchema);
+
+ assertThat(flussSchema.getColumnNames()).containsExactly("id", "name");
+
assertThat(flussSchema.getPrimaryKeyColumnNames()).containsExactly("id");
+ }
+
+ @Test
+ void testToFlussSchemaWithoutPrimaryKey() {
+ Schema cdcSchema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .build();
+
+ com.alibaba.fluss.metadata.Schema flussSchema =
FlussConversions.toFlussSchema(cdcSchema);
+
+ assertThat(flussSchema.getColumnNames()).hasSize(2);
+ assertThat(flussSchema.getPrimaryKeyColumnNames()).isEmpty();
+ }
+
+ @Test
+ void testToFlussSchemaTypeConversions() {
+ Schema cdcSchema =
+ Schema.newBuilder()
+ .physicalColumn("bool_col", DataTypes.BOOLEAN())
+ .physicalColumn("tinyint_col", DataTypes.TINYINT())
+ .physicalColumn("smallint_col", DataTypes.SMALLINT())
+ .physicalColumn("int_col", DataTypes.INT())
+ .physicalColumn("bigint_col", DataTypes.BIGINT())
+ .physicalColumn("float_col", DataTypes.FLOAT())
+ .physicalColumn("double_col", DataTypes.DOUBLE())
+ .physicalColumn("decimal_col", DataTypes.DECIMAL(10,
2))
+ .physicalColumn("char_col", DataTypes.CHAR(10))
+ .physicalColumn("varchar_col", DataTypes.VARCHAR(100))
+ .physicalColumn("binary_col", DataTypes.BINARY(16))
+ .physicalColumn("varbinary_col",
DataTypes.VARBINARY(100))
+ .physicalColumn("date_col", DataTypes.DATE())
+ .physicalColumn("time_col", DataTypes.TIME())
+ .physicalColumn("timestamp_col",
DataTypes.TIMESTAMP(3))
+ .physicalColumn("ltz_col", DataTypes.TIMESTAMP_LTZ(6))
+ .build();
+
+ com.alibaba.fluss.metadata.Schema flussSchema =
FlussConversions.toFlussSchema(cdcSchema);
+
+ RowType rowType = flussSchema.getRowType();
+ assertThat(rowType.getFieldCount()).isEqualTo(16);
+
+
assertThat(rowType.getTypeAt(0)).isEqualTo(com.alibaba.fluss.types.DataTypes.BOOLEAN());
+
assertThat(rowType.getTypeAt(1)).isEqualTo(com.alibaba.fluss.types.DataTypes.TINYINT());
+
assertThat(rowType.getTypeAt(2)).isEqualTo(com.alibaba.fluss.types.DataTypes.SMALLINT());
+
assertThat(rowType.getTypeAt(3)).isEqualTo(com.alibaba.fluss.types.DataTypes.INT());
+
assertThat(rowType.getTypeAt(4)).isEqualTo(com.alibaba.fluss.types.DataTypes.BIGINT());
+
assertThat(rowType.getTypeAt(5)).isEqualTo(com.alibaba.fluss.types.DataTypes.FLOAT());
+
assertThat(rowType.getTypeAt(6)).isEqualTo(com.alibaba.fluss.types.DataTypes.DOUBLE());
+ assertThat(rowType.getTypeAt(7))
+ .isEqualTo(com.alibaba.fluss.types.DataTypes.DECIMAL(10, 2));
+
assertThat(rowType.getTypeAt(8)).isEqualTo(com.alibaba.fluss.types.DataTypes.CHAR(10));
+ // VarChar maps to StringType in Fluss
+
assertThat(rowType.getTypeAt(9)).isEqualTo(com.alibaba.fluss.types.DataTypes.STRING());
+
assertThat(rowType.getTypeAt(10)).isEqualTo(com.alibaba.fluss.types.DataTypes.BINARY(16));
+ // VarBinary maps to BytesType in Fluss
+
assertThat(rowType.getTypeAt(11)).isEqualTo(com.alibaba.fluss.types.DataTypes.BYTES());
+
assertThat(rowType.getTypeAt(12)).isEqualTo(com.alibaba.fluss.types.DataTypes.DATE());
+
assertThat(rowType.getTypeAt(13)).isEqualTo(com.alibaba.fluss.types.DataTypes.TIME());
+
assertThat(rowType.getTypeAt(14)).isEqualTo(com.alibaba.fluss.types.DataTypes.TIMESTAMP(3));
+ assertThat(rowType.getTypeAt(15))
+ .isEqualTo(com.alibaba.fluss.types.DataTypes.TIMESTAMP_LTZ(6));
+ }
+
+ @Test
+ void testToFlussSchemaTypeNullability() {
+ // Verify that nullable flag is correctly propagated from CDC type to
Fluss type
+ Schema cdcSchema =
+ Schema.newBuilder()
+ .physicalColumn("nullable_col", DataTypes.INT())
+ .physicalColumn("not_null_col",
DataTypes.INT().notNull())
+ .build();
+
+ com.alibaba.fluss.metadata.Schema flussSchema =
FlussConversions.toFlussSchema(cdcSchema);
+
+ RowType rowType = flussSchema.getRowType();
+ assertThat(rowType.getTypeAt(0).isNullable()).isTrue();
+ assertThat(rowType.getTypeAt(1).isNullable()).isFalse();
+ }
+
+ @Test
+ void testToFlussSchemaUnsupportedZonedTimestamp() {
+ Schema cdcSchema =
+ Schema.newBuilder()
+ .physicalColumn(
+ "ts", new
org.apache.flink.cdc.common.types.ZonedTimestampType())
+ .build();
+
+ assertThatThrownBy(() -> FlussConversions.toFlussSchema(cdcSchema))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessageContaining("Unsupported data type in fluss");
+ }
+
+ @Test
+ void testToFlussSchemaUnsupportedArrayType() {
+ Schema cdcSchema =
+ Schema.newBuilder().physicalColumn("arr",
DataTypes.ARRAY(DataTypes.INT())).build();
+
+ assertThatThrownBy(() -> FlussConversions.toFlussSchema(cdcSchema))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessageContaining("Unsupported data type in fluss");
+ }
+
+ @Test
+ void testToFlussSchemaUnsupportedMapType() {
+ Schema cdcSchema =
+ Schema.newBuilder()
+ .physicalColumn("map",
DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()))
+ .build();
+
+ assertThatThrownBy(() -> FlussConversions.toFlussSchema(cdcSchema))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessageContaining("Unsupported data type in fluss");
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Tests for toFlussTable
+ //
--------------------------------------------------------------------------------------------
+
+ @Test
+ void testToFlussTableBasic() {
+ Schema cdcSchema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT().notNull())
+ .physicalColumn("name", DataTypes.STRING())
+ .primaryKey("id")
+ .build();
+
+ TableDescriptor descriptor =
+ FlussConversions.toFlussTable(cdcSchema, null, null,
Collections.emptyMap());
+
+ assertThat(descriptor.getSchema().getColumnNames()).hasSize(2);
+
assertThat(descriptor.getSchema().getPrimaryKeyColumnNames()).containsExactly("id");
+ }
+
+ @Test
+ void testToFlussTableWithBucketKeys() {
+ Schema cdcSchema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT().notNull())
+ .physicalColumn("name", DataTypes.STRING())
+ .primaryKey("id")
+ .build();
+
+ TableDescriptor descriptor =
+ FlussConversions.toFlussTable(
+ cdcSchema, Arrays.asList("id"), 8,
Collections.emptyMap());
+
+ assertThat(descriptor.getBucketKeys()).containsExactly("id");
+ }
+
+ @Test
+ void testToFlussTableWithProperties() {
+ Schema cdcSchema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT().notNull())
+ .primaryKey("id")
+ .build();
+
+ Map<String, String> properties = new HashMap<>();
+ properties.put("table.replication.factor", "3");
+
+ TableDescriptor descriptor =
+ FlussConversions.toFlussTable(cdcSchema, null, null,
properties);
+
+
assertThat(descriptor.getProperties()).containsEntry("table.replication.factor",
"3");
+ }
+
+ @Test
+ void testToFlussTableWithComment() {
+ Schema cdcSchema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT().notNull())
+ .primaryKey("id")
+ .comment("my test table")
+ .build();
+
+ TableDescriptor descriptor =
+ FlussConversions.toFlussTable(cdcSchema, null, null,
Collections.emptyMap());
+
+ assertThat(descriptor.getComment()).hasValue("my test table");
+ }
+
+ @Test
+ void testToFlussTableWithPartitionKeys() {
+ // Partition keys should be propagated to Fluss TableDescriptor
+ Schema cdcSchema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT().notNull())
+ .physicalColumn("dt", DataTypes.STRING())
+ .primaryKey("id", "dt")
+ .partitionKey("dt")
+ .build();
+
+ TableDescriptor descriptor =
+ FlussConversions.toFlussTable(cdcSchema, null, null,
Collections.emptyMap());
+
+ assertThat(descriptor.getPartitionKeys()).containsExactly("dt");
+ }
+
+ @Test
+ void testToFlussTableDefaultBucketKeysExcludePartitionKeys() {
+ // When bucket keys are null, default = (primary keys - partition keys)
+ Schema cdcSchema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT().notNull())
+ .physicalColumn("dt", DataTypes.STRING().notNull())
+ .primaryKey("id", "dt")
+ .partitionKey("dt")
+ .build();
+
+ TableDescriptor descriptor =
+ FlussConversions.toFlussTable(cdcSchema, null, null,
Collections.emptyMap());
+
+ // "dt" is partition key and should be removed from bucket keys
+ assertThat(descriptor.getBucketKeys()).containsExactly("id");
+ }
+
+ @Test
+ void testToFlussTableDefaultBucketKeysFromPrimaryKeys() {
+ Schema cdcSchema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT().notNull())
+ .physicalColumn("name", DataTypes.STRING())
+ .primaryKey("id")
+ .build();
+
+ // Pass empty bucket keys so it defaults to primary keys
+ TableDescriptor descriptor =
+ FlussConversions.toFlussTable(
+ cdcSchema, Collections.emptyList(), null,
Collections.emptyMap());
+
+ assertThat(descriptor.getBucketKeys()).containsExactly("id");
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Tests for sameCdcColumnsIgnoreCommentAndDefaultValue
+ //
--------------------------------------------------------------------------------------------
+
+ @Test
+ void testSameCdcColumnsWithSameColumns() {
+ Schema schema1 =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .build();
+
+ Schema schema2 =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .build();
+
+
assertThat(FlussConversions.sameCdcColumnsIgnoreCommentAndDefaultValue(schema1,
schema2))
+ .isTrue();
+ }
+
+ @Test
+ void testSameCdcColumnsWithDifferentColumnNames() {
+ Schema schema1 =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .build();
+
+ Schema schema2 =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("age", DataTypes.INT())
+ .build();
+
+
assertThat(FlussConversions.sameCdcColumnsIgnoreCommentAndDefaultValue(schema1,
schema2))
+ .isFalse();
+ }
+
+ @Test
+ void testSameCdcColumnsWithDifferentTypes() {
+ Schema schema1 =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .build();
+
+ Schema schema2 =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.BIGINT())
+ .physicalColumn("name", DataTypes.STRING())
+ .build();
+
+
assertThat(FlussConversions.sameCdcColumnsIgnoreCommentAndDefaultValue(schema1,
schema2))
+ .isFalse();
+ }
+
+ @Test
+ void testSameCdcColumnsWithDifferentColumnCount() {
+ Schema schema1 =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .build();
+
+ Schema schema2 = Schema.newBuilder().physicalColumn("id",
DataTypes.INT()).build();
+
+
assertThat(FlussConversions.sameCdcColumnsIgnoreCommentAndDefaultValue(schema1,
schema2))
+ .isFalse();
+ }
+
+ @Test
+ void testSameCdcColumnsIgnoresComment() {
+ Schema schema1 =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT(), "comment1")
+ .physicalColumn("name", DataTypes.STRING())
+ .build();
+
+ Schema schema2 =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT(), "different
comment")
+ .physicalColumn("name", DataTypes.STRING())
+ .build();
+
+ // Should be true because comments are ignored
+
assertThat(FlussConversions.sameCdcColumnsIgnoreCommentAndDefaultValue(schema1,
schema2))
+ .isTrue();
+ }
+
+ @Test
+ void testSameCdcColumnsIgnoresDefaultValue() {
+ // Default values should be ignored when comparing schemas
+ Schema schema1 =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT(), null, "0")
+ .physicalColumn("name", DataTypes.STRING())
+ .build();
+
+ Schema schema2 =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT(), null, "100")
+ .physicalColumn("name", DataTypes.STRING())
+ .build();
+
+
assertThat(FlussConversions.sameCdcColumnsIgnoreCommentAndDefaultValue(schema1,
schema2))
+ .isTrue();
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtilsTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtilsTest.java
new file mode 100644
index 000000000..f0437061e
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtilsTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.utils;
+
+import org.apache.flink.cdc.common.event.TableId;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link PostgresSchemaUtils}. */
+class PostgresSchemaUtilsTest {
+
+ //
--------------------------------------------------------------------------------------------
+ // Tests for quote
+ //
--------------------------------------------------------------------------------------------
+
+ @Test
+ void testQuote() {
+
assertThat(PostgresSchemaUtils.quote("my_table")).isEqualTo("\"my_table\"");
+ }
+
+ @Test
+ void testQuoteWithSpecialCharacters() {
+
assertThat(PostgresSchemaUtils.quote("my-table.name")).isEqualTo("\"my-table.name\"");
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Tests for toDbzTableId
+ //
--------------------------------------------------------------------------------------------
+
+ @Test
+ void testToDbzTableIdWithNamespaceAndSchema() {
+ TableId cdcTableId = TableId.tableId("my_namespace", "my_schema",
"my_table");
+ io.debezium.relational.TableId dbzTableId =
PostgresSchemaUtils.toDbzTableId(cdcTableId);
+
+ assertThat(dbzTableId.catalog()).isEqualTo("my_namespace");
+ assertThat(dbzTableId.schema()).isEqualTo("my_schema");
+ assertThat(dbzTableId.table()).isEqualTo("my_table");
+ }
+
+ @Test
+ void testToDbzTableIdWithSchemaOnly() {
+ TableId cdcTableId = TableId.tableId("my_schema", "my_table");
+ io.debezium.relational.TableId dbzTableId =
PostgresSchemaUtils.toDbzTableId(cdcTableId);
+
+ assertThat(dbzTableId.schema()).isEqualTo("my_schema");
+ assertThat(dbzTableId.table()).isEqualTo("my_table");
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Tests for toCdcTableId
+ //
--------------------------------------------------------------------------------------------
+
+ @Test
+ void testToCdcTableIdSimple() {
+ io.debezium.relational.TableId dbzTableId =
+ new io.debezium.relational.TableId(null, "public", "users");
+ TableId cdcTableId = PostgresSchemaUtils.toCdcTableId(dbzTableId);
+
+ assertThat(cdcTableId.getSchemaName()).isEqualTo("public");
+ assertThat(cdcTableId.getTableName()).isEqualTo("users");
+ }
+
+ @Test
+ void testToCdcTableIdWithNullSchema() {
+ io.debezium.relational.TableId dbzTableId =
+ new io.debezium.relational.TableId(null, null, "users");
+ TableId cdcTableId = PostgresSchemaUtils.toCdcTableId(dbzTableId);
+
+ assertThat(cdcTableId.getTableName()).isEqualTo("users");
+ }
+
+ @Test
+ void testToCdcTableIdWithDatabaseIncluded() {
+ io.debezium.relational.TableId dbzTableId =
+ new io.debezium.relational.TableId(null, "public", "users");
+ TableId cdcTableId = PostgresSchemaUtils.toCdcTableId(dbzTableId,
"my_db", true);
+
+ assertThat(cdcTableId.getNamespace()).isEqualTo("my_db");
+ assertThat(cdcTableId.getSchemaName()).isEqualTo("public");
+ assertThat(cdcTableId.getTableName()).isEqualTo("users");
+ }
+
+ @Test
+ void testToCdcTableIdWithDatabaseNotIncluded() {
+ io.debezium.relational.TableId dbzTableId =
+ new io.debezium.relational.TableId(null, "public", "users");
+ TableId cdcTableId = PostgresSchemaUtils.toCdcTableId(dbzTableId,
"my_db", false);
+
+ // Database not included, should use schema and table only
+ assertThat(cdcTableId.getSchemaName()).isEqualTo("public");
+ assertThat(cdcTableId.getTableName()).isEqualTo("users");
+ }
+
+ @Test
+ void testToCdcTableIdWithNullDatabaseName() {
+ io.debezium.relational.TableId dbzTableId =
+ new io.debezium.relational.TableId(null, "public", "users");
+ TableId cdcTableId = PostgresSchemaUtils.toCdcTableId(dbzTableId,
null, true);
+
+ // Null database name, should fall back to schema + table
+ assertThat(cdcTableId.getSchemaName()).isEqualTo("public");
+ assertThat(cdcTableId.getTableName()).isEqualTo("users");
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Tests for round-trip conversion
+ //
--------------------------------------------------------------------------------------------
+
+ @Test
+ void testRoundTripConversion() {
+ TableId original = TableId.tableId("my_namespace", "my_schema",
"my_table");
+ io.debezium.relational.TableId dbzId =
PostgresSchemaUtils.toDbzTableId(original);
+ TableId roundTripped = PostgresSchemaUtils.toCdcTableId(dbzId,
"my_namespace", true);
+
+ assertThat(roundTripped).isEqualTo(original);
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtilsTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtilsTest.java
new file mode 100644
index 000000000..fda181169
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtilsTest.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.utils;
+
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.table.types.logical.DecimalType;
+
+import io.debezium.config.CommonConnectorConfig;
+import io.debezium.connector.postgresql.PostgresConnectorConfig;
+import io.debezium.jdbc.JdbcValueConverters;
+import io.debezium.jdbc.TemporalPrecisionMode;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link PostgresTypeUtils}. */
+class PostgresTypeUtilsTest {
+
+ //
--------------------------------------------------------------------------------------------
+ // Tests for handleNumericWithDecimalMode
+ //
--------------------------------------------------------------------------------------------
+
+ @Test
+ void testHandleNumericPreciseWithinRange() {
+ // precision=20 > DEFAULT_SCALE and <= MAX_PRECISION, should produce
DECIMAL(20, 5)
+ DataType result =
+ PostgresTypeUtils.handleNumericWithDecimalMode(
+ 20, 5, JdbcValueConverters.DecimalMode.PRECISE);
+ assertThat(result).isEqualTo(DataTypes.DECIMAL(20, 5));
+ }
+
+ @Test
+ void testHandleNumericPreciseWithZeroPrecision() {
+ // precision=0 (no explicit precision), should fall back to
(MAX_PRECISION, DEFAULT_SCALE)
+ DataType result =
+ PostgresTypeUtils.handleNumericWithDecimalMode(
+ 0, 0, JdbcValueConverters.DecimalMode.PRECISE);
+ assertThat(result)
+ .isEqualTo(DataTypes.DECIMAL(DecimalType.MAX_PRECISION,
DecimalType.DEFAULT_SCALE));
+ }
+
+ @Test
+ void testHandleNumericPreciseBoundaryAtDefaultScale() {
+ // precision == DEFAULT_SCALE is NOT > DEFAULT_SCALE, so falls back to
max
+ DataType result =
+ PostgresTypeUtils.handleNumericWithDecimalMode(
+ DecimalType.DEFAULT_SCALE, 2,
JdbcValueConverters.DecimalMode.PRECISE);
+ assertThat(result)
+ .isEqualTo(DataTypes.DECIMAL(DecimalType.MAX_PRECISION,
DecimalType.DEFAULT_SCALE));
+ }
+
+ @Test
+ void testHandleNumericPreciseJustAboveDefaultScale() {
+ // precision = DEFAULT_SCALE + 1 is the smallest value where the
condition
+ // (precision > DEFAULT_SCALE) becomes true, should use exact
precision and scale
+ int precision = DecimalType.DEFAULT_SCALE + 1;
+ int scale = 0;
+ DataType result =
+ PostgresTypeUtils.handleNumericWithDecimalMode(
+ precision, scale,
JdbcValueConverters.DecimalMode.PRECISE);
+ assertThat(result).isEqualTo(DataTypes.DECIMAL(precision, scale));
+ }
+
+ @Test
+ void testHandleNumericPreciseExceedsMaxPrecision() {
+ // precision > MAX_PRECISION, should still fall back to max
+ DataType result =
+ PostgresTypeUtils.handleNumericWithDecimalMode(
+ DecimalType.MAX_PRECISION + 1, 2,
JdbcValueConverters.DecimalMode.PRECISE);
+ assertThat(result)
+ .isEqualTo(DataTypes.DECIMAL(DecimalType.MAX_PRECISION,
DecimalType.DEFAULT_SCALE));
+ }
+
+ @Test
+ void testHandleNumericDoubleMode() {
+ DataType result =
+ PostgresTypeUtils.handleNumericWithDecimalMode(
+ 10, 5, JdbcValueConverters.DecimalMode.DOUBLE);
+ assertThat(result).isEqualTo(DataTypes.DOUBLE());
+ }
+
+ @Test
+ void testHandleNumericStringMode() {
+ DataType result =
+ PostgresTypeUtils.handleNumericWithDecimalMode(
+ 10, 5, JdbcValueConverters.DecimalMode.STRING);
+ assertThat(result).isEqualTo(DataTypes.STRING());
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Tests for handleBinaryWithBinaryMode
+ //
--------------------------------------------------------------------------------------------
+
+ @Test
+ void testHandleBinaryBytesMode() {
+ DataType result =
+ PostgresTypeUtils.handleBinaryWithBinaryMode(
+ CommonConnectorConfig.BinaryHandlingMode.BYTES);
+ assertThat(result).isEqualTo(DataTypes.BYTES());
+ }
+
+ @Test
+ void testHandleBinaryBase64Mode() {
+ DataType result =
+ PostgresTypeUtils.handleBinaryWithBinaryMode(
+ CommonConnectorConfig.BinaryHandlingMode.BASE64);
+ assertThat(result).isEqualTo(DataTypes.STRING());
+ }
+
+ @Test
+ void testHandleBinaryHexMode() {
+ DataType result =
+ PostgresTypeUtils.handleBinaryWithBinaryMode(
+ CommonConnectorConfig.BinaryHandlingMode.HEX);
+ assertThat(result).isEqualTo(DataTypes.STRING());
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Tests for handleMoneyWithDecimalMode
+ //
--------------------------------------------------------------------------------------------
+
+ @Test
+ void testHandleMoneyPreciseMode() {
+ DataType result =
+ PostgresTypeUtils.handleMoneyWithDecimalMode(
+ 2, JdbcValueConverters.DecimalMode.PRECISE);
+
assertThat(result).isEqualTo(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 2));
+ }
+
+ @Test
+ void testHandleMoneyDoubleMode() {
+ DataType result =
+ PostgresTypeUtils.handleMoneyWithDecimalMode(
+ 2, JdbcValueConverters.DecimalMode.DOUBLE);
+ assertThat(result).isEqualTo(DataTypes.DOUBLE());
+ }
+
+ @Test
+ void testHandleMoneyStringMode() {
+ DataType result =
+ PostgresTypeUtils.handleMoneyWithDecimalMode(
+ 2, JdbcValueConverters.DecimalMode.STRING);
+ assertThat(result).isEqualTo(DataTypes.STRING());
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Tests for handleIntervalWithIntervalHandlingMode
+ //
--------------------------------------------------------------------------------------------
+
+ @Test
+ void testHandleIntervalNumericMode() {
+ DataType result =
+ PostgresTypeUtils.handleIntervalWithIntervalHandlingMode(
+ PostgresConnectorConfig.IntervalHandlingMode.NUMERIC);
+ assertThat(result).isEqualTo(DataTypes.BIGINT());
+ }
+
+ @Test
+ void testHandleIntervalStringMode() {
+ DataType result =
+ PostgresTypeUtils.handleIntervalWithIntervalHandlingMode(
+ PostgresConnectorConfig.IntervalHandlingMode.STRING);
+ assertThat(result).isEqualTo(DataTypes.STRING());
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Tests for handleDateWithTemporalMode
+ //
--------------------------------------------------------------------------------------------
+
+ @Test
+ void testHandleDateAdaptiveMode() {
+ DataType result =
+
PostgresTypeUtils.handleDateWithTemporalMode(TemporalPrecisionMode.ADAPTIVE);
+ assertThat(result).isEqualTo(DataTypes.DATE());
+ }
+
+ @Test
+ void testHandleDateAdaptiveTimeMicrosecondsMode() {
+ DataType result =
+ PostgresTypeUtils.handleDateWithTemporalMode(
+ TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS);
+ assertThat(result).isEqualTo(DataTypes.DATE());
+ }
+
+ @Test
+ void testHandleDateConnectMode() {
+ DataType result =
+
PostgresTypeUtils.handleDateWithTemporalMode(TemporalPrecisionMode.CONNECT);
+ assertThat(result).isEqualTo(DataTypes.DATE());
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Tests for handleTimeWithTemporalMode
+ //
--------------------------------------------------------------------------------------------
+
+ @Test
+ void testHandleTimeAdaptiveMode() {
+ DataType result =
+
PostgresTypeUtils.handleTimeWithTemporalMode(TemporalPrecisionMode.ADAPTIVE, 6);
+ assertThat(result).isEqualTo(DataTypes.TIME(6));
+ }
+
+ @Test
+ void testHandleTimeAdaptiveTimeMicrosecondsMode() {
+ DataType result =
+ PostgresTypeUtils.handleTimeWithTemporalMode(
+ TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS, 3);
+ assertThat(result).isEqualTo(DataTypes.TIME(3));
+ }
+
+ @Test
+ void testHandleTimeConnectMode() {
+ DataType result =
+
PostgresTypeUtils.handleTimeWithTemporalMode(TemporalPrecisionMode.CONNECT, 0);
+ assertThat(result).isEqualTo(DataTypes.TIME(0));
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Tests for handleTimestampWithTemporalMode
+ //
--------------------------------------------------------------------------------------------
+
+ @Test
+ void testHandleTimestampAdaptiveMode() {
+ DataType result =
+ PostgresTypeUtils.handleTimestampWithTemporalMode(
+ TemporalPrecisionMode.ADAPTIVE, 6);
+ assertThat(result).isEqualTo(DataTypes.TIMESTAMP(6));
+ }
+
+ @Test
+ void testHandleTimestampAdaptiveTimeMicrosecondsMode() {
+ DataType result =
+ PostgresTypeUtils.handleTimestampWithTemporalMode(
+ TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS, 3);
+ assertThat(result).isEqualTo(DataTypes.TIMESTAMP(3));
+ }
+
+ @Test
+ void testHandleTimestampConnectMode() {
+ DataType result =
+
PostgresTypeUtils.handleTimestampWithTemporalMode(TemporalPrecisionMode.CONNECT,
0);
+ assertThat(result).isEqualTo(DataTypes.TIMESTAMP(0));
+ }
+
+ @Test
+ void testHandleTimestampScalePreserved() {
+ // Verify that scale (fractional seconds precision) is correctly
passed through
+ for (int scale = 0; scale <= 6; scale++) {
+ DataType result =
+ PostgresTypeUtils.handleTimestampWithTemporalMode(
+ TemporalPrecisionMode.ADAPTIVE, scale);
+ assertThat(result).isEqualTo(DataTypes.TIMESTAMP(scale));
+ }
+ }
+
+ @Test
+ void testHandleTimeScalePreserved() {
+ // Verify that scale (fractional seconds precision) is correctly
passed through
+ for (int scale = 0; scale <= 6; scale++) {
+ DataType result =
+ PostgresTypeUtils.handleTimeWithTemporalMode(
+ TemporalPrecisionMode.ADAPTIVE, scale);
+ assertThat(result).isEqualTo(DataTypes.TIME(scale));
+ }
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Tests for handleHstoreWithHstoreMode
+ //
--------------------------------------------------------------------------------------------
+
+ @Test
+ void testHandleHstoreJsonMode() {
+ DataType result =
+ PostgresTypeUtils.handleHstoreWithHstoreMode(
+ PostgresConnectorConfig.HStoreHandlingMode.JSON);
+ assertThat(result).isEqualTo(DataTypes.STRING());
+ }
+
+ @Test
+ void testHandleHstoreMapMode() {
+ DataType result =
+ PostgresTypeUtils.handleHstoreWithHstoreMode(
+ PostgresConnectorConfig.HStoreHandlingMode.MAP);
+ assertThat(result).isEqualTo(DataTypes.MAP(DataTypes.STRING(),
DataTypes.STRING()));
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtilsTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtilsTest.java
new file mode 100644
index 000000000..3e93c9fd5
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtilsTest.java
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.starrocks.sink;
+
+import org.apache.flink.cdc.common.data.DateData;
+import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.data.TimestampData;
+import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.BigIntType;
+import org.apache.flink.cdc.common.types.BooleanType;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.DateType;
+import org.apache.flink.cdc.common.types.DoubleType;
+import org.apache.flink.cdc.common.types.FloatType;
+import org.apache.flink.cdc.common.types.IntType;
+import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
+import org.apache.flink.cdc.common.types.SmallIntType;
+import org.apache.flink.cdc.common.types.TimestampType;
+import org.apache.flink.cdc.common.types.TinyIntType;
+import org.apache.flink.cdc.common.types.VarCharType;
+import org.apache.flink.cdc.common.types.ZonedTimestampType;
+import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+
+import com.starrocks.connector.flink.catalog.StarRocksColumn;
+import com.starrocks.connector.flink.catalog.StarRocksTable;
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link StarRocksUtils}. */
+class StarRocksUtilsTest {
+
+ //
--------------------------------------------------------------------------------------------
+ // Tests for toStarRocksTable
+ //
--------------------------------------------------------------------------------------------
+
+ @Test
+ void testToStarRocksTableBasic() {
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.BIGINT().notNull())
+ .physicalColumn("name", DataTypes.VARCHAR(100))
+ .physicalColumn("age", DataTypes.INT())
+ .primaryKey("id")
+ .build();
+
+ TableId tableId = TableId.tableId("my_db", "my_table");
+ TableCreateConfig tableCreateConfig = new TableCreateConfig(null,
Collections.emptyMap());
+
+ StarRocksTable table = StarRocksUtils.toStarRocksTable(tableId,
schema, tableCreateConfig);
+
+ assertThat(table.getDatabaseName()).isEqualTo("my_db");
+ assertThat(table.getTableName()).isEqualTo("my_table");
+
assertThat(table.getTableType()).isEqualTo(StarRocksTable.TableType.PRIMARY_KEY);
+ assertThat(table.getTableKeys()).hasValue(Arrays.asList("id"));
+ assertThat(table.getDistributionKeys()).hasValue(Arrays.asList("id"));
+
+ // Verify columns: primary key column should be first
+ List<StarRocksColumn> columns = table.getColumns();
+ assertThat(columns).hasSize(3);
+ assertThat(columns.get(0).getColumnName()).isEqualTo("id");
+ assertThat(columns.get(1).getColumnName()).isEqualTo("name");
+ assertThat(columns.get(2).getColumnName()).isEqualTo("age");
+ }
+
+ @Test
+ void testToStarRocksTablePrimaryKeyReordering() {
+ // Define schema where primary key columns are NOT at the front
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("name", DataTypes.VARCHAR(100))
+ .physicalColumn("age", DataTypes.INT())
+ .physicalColumn("id", DataTypes.BIGINT().notNull())
+ .primaryKey("id")
+ .build();
+
+ TableId tableId = TableId.tableId("db", "table");
+ TableCreateConfig config = new TableCreateConfig(null,
Collections.emptyMap());
+
+ StarRocksTable table = StarRocksUtils.toStarRocksTable(tableId,
schema, config);
+
+ // Primary key "id" should be reordered to the front
+ List<StarRocksColumn> columns = table.getColumns();
+ assertThat(columns).hasSize(3);
+ assertThat(columns.get(0).getColumnName()).isEqualTo("id");
+ assertThat(columns.get(1).getColumnName()).isEqualTo("name");
+ assertThat(columns.get(2).getColumnName()).isEqualTo("age");
+ }
+
+ @Test
+ void testToStarRocksTableCompositePrimaryKey() {
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("data", DataTypes.VARCHAR(200))
+ .physicalColumn("id1", DataTypes.INT().notNull())
+ .physicalColumn("id2", DataTypes.BIGINT().notNull())
+ .primaryKey("id1", "id2")
+ .build();
+
+ TableId tableId = TableId.tableId("db", "table");
+ TableCreateConfig config = new TableCreateConfig(null,
Collections.emptyMap());
+
+ StarRocksTable table = StarRocksUtils.toStarRocksTable(tableId,
schema, config);
+
+ // Both primary key columns should be at the front, in the order of
primaryKeys definition
+ List<StarRocksColumn> columns = table.getColumns();
+ assertThat(columns).hasSize(3);
+ assertThat(columns.get(0).getColumnName()).isEqualTo("id1");
+ assertThat(columns.get(1).getColumnName()).isEqualTo("id2");
+ assertThat(columns.get(2).getColumnName()).isEqualTo("data");
+
+ assertThat(table.getTableKeys()).hasValue(Arrays.asList("id1", "id2"));
+ assertThat(table.getDistributionKeys()).hasValue(Arrays.asList("id1",
"id2"));
+ }
+
+ @Test
+ void testToStarRocksTableNoPrimaryKeyThrowsException() {
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.BIGINT())
+ .physicalColumn("name", DataTypes.VARCHAR(100))
+ .build();
+
+ TableId tableId = TableId.tableId("db", "table");
+ TableCreateConfig config = new TableCreateConfig(null,
Collections.emptyMap());
+
+ assertThatThrownBy(() -> StarRocksUtils.toStarRocksTable(tableId,
schema, config))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining("has no primary keys");
+ }
+
+ @Test
+ void testToStarRocksTableWithNumBuckets() {
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.BIGINT().notNull())
+ .primaryKey("id")
+ .build();
+
+ TableId tableId = TableId.tableId("db", "table");
+ TableCreateConfig config = new TableCreateConfig(10,
Collections.emptyMap());
+
+ StarRocksTable table = StarRocksUtils.toStarRocksTable(tableId,
schema, config);
+ assertThat(table.getNumBuckets()).hasValue(10);
+ }
+
+ @Test
+ void testToStarRocksTableWithProperties() {
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.BIGINT().notNull())
+ .primaryKey("id")
+ .build();
+
+ TableId tableId = TableId.tableId("db", "table");
+ Map<String, String> properties = new HashMap<>();
+ properties.put("replication_num", "3");
+ TableCreateConfig config = new TableCreateConfig(null, properties);
+
+ StarRocksTable table = StarRocksUtils.toStarRocksTable(tableId,
schema, config);
+ assertThat(table.getProperties()).containsEntry("replication_num",
"3");
+ }
+
+ @Test
+ void testToStarRocksTableWithComment() {
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.BIGINT().notNull())
+ .primaryKey("id")
+ .comment("test table comment")
+ .build();
+
+ TableId tableId = TableId.tableId("db", "table");
+ TableCreateConfig config = new TableCreateConfig(null,
Collections.emptyMap());
+
+ StarRocksTable table = StarRocksUtils.toStarRocksTable(tableId,
schema, config);
+ assertThat(table.getComment()).hasValue("test table comment");
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Tests for convertInvalidTimestampDefaultValue
+ //
--------------------------------------------------------------------------------------------
+
+ @Test
+ void testConvertInvalidTimestampDefaultValueWithNull() {
+ assertThat(StarRocksUtils.convertInvalidTimestampDefaultValue(null,
new TimestampType()))
+ .isNull();
+ }
+
+ @Test
+ void testConvertInvalidTimestampDefaultValueWithValidValue() {
+ String validDefault = "2024-01-01 12:00:00";
+ assertThat(
+ StarRocksUtils.convertInvalidTimestampDefaultValue(
+ validDefault, new TimestampType()))
+ .isEqualTo(validDefault);
+ }
+
+ @Test
+ void testConvertInvalidTimestampDefaultValueWithInvalidTimestamp() {
+ assertThat(
+ StarRocksUtils.convertInvalidTimestampDefaultValue(
+ "0000-00-00 00:00:00", new TimestampType()))
+ .isEqualTo(StarRocksUtils.DEFAULT_DATETIME);
+ }
+
+ @Test
+ void testConvertInvalidTimestampDefaultValueWithLocalZonedTimestamp() {
+ assertThat(
+ StarRocksUtils.convertInvalidTimestampDefaultValue(
+ "0000-00-00 00:00:00", new
LocalZonedTimestampType()))
+ .isEqualTo(StarRocksUtils.DEFAULT_DATETIME);
+ }
+
+ @Test
+ void testConvertInvalidTimestampDefaultValueWithZonedTimestamp() {
+ // ZonedTimestampType is also a timestamp type, should convert invalid
values
+ assertThat(
+ StarRocksUtils.convertInvalidTimestampDefaultValue(
+ "0000-00-00 00:00:00", new
ZonedTimestampType()))
+ .isEqualTo(StarRocksUtils.DEFAULT_DATETIME);
+ }
+
+ @Test
+ void testConvertInvalidTimestampDefaultValueWithNonTimestampType() {
+ // For non-timestamp types, the invalid datetime string should be
returned as-is
+ assertThat(
+ StarRocksUtils.convertInvalidTimestampDefaultValue(
+ "0000-00-00 00:00:00", DataTypes.VARCHAR(100)))
+ .isEqualTo("0000-00-00 00:00:00");
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Tests for toStarRocksDataType
+ //
--------------------------------------------------------------------------------------------
+
+ @Test
+ void testToStarRocksDataTypeNullableFlag() {
+ // nullable type should set isNullable=true
+ StarRocksColumn.Builder nullableBuilder =
+ new
StarRocksColumn.Builder().setColumnName("col").setOrdinalPosition(0);
+ StarRocksUtils.toStarRocksDataType(new IntType(), false,
nullableBuilder);
+ StarRocksColumn nullableCol = nullableBuilder.build();
+ assertThat(nullableCol.getDataType()).isEqualTo(StarRocksUtils.INT);
+ assertThat(nullableCol.isNullable()).isTrue();
+
+ // not-null type should set isNullable=false
+ StarRocksColumn.Builder notNullBuilder =
+ new
StarRocksColumn.Builder().setColumnName("col").setOrdinalPosition(0);
+ StarRocksUtils.toStarRocksDataType(new IntType(false), false,
notNullBuilder);
+ StarRocksColumn notNullCol = notNullBuilder.build();
+ assertThat(notNullCol.getDataType()).isEqualTo(StarRocksUtils.INT);
+ assertThat(notNullCol.isNullable()).isFalse();
+ }
+
+ @Test
+ void testToStarRocksDataTypeAllBasicTypes() {
+ // Verify data type mapping for all basic CDC types
+ assertStarRocksDataType(new BooleanType(), StarRocksUtils.BOOLEAN);
+ assertStarRocksDataType(new TinyIntType(), StarRocksUtils.TINYINT);
+ assertStarRocksDataType(new SmallIntType(), StarRocksUtils.SMALLINT);
+ assertStarRocksDataType(new IntType(), StarRocksUtils.INT);
+ assertStarRocksDataType(new BigIntType(), StarRocksUtils.BIGINT);
+ assertStarRocksDataType(new FloatType(), StarRocksUtils.FLOAT);
+ assertStarRocksDataType(new DoubleType(), StarRocksUtils.DOUBLE);
+ assertStarRocksDataType(new DateType(), StarRocksUtils.DATE);
+ assertStarRocksDataType(new TimestampType(), StarRocksUtils.DATETIME);
+ assertStarRocksDataType(new LocalZonedTimestampType(),
StarRocksUtils.DATETIME);
+ }
+
+ @Test
+ void testToStarRocksDataTypeUnsupported() {
+ StarRocksColumn.Builder builder =
+ new
StarRocksColumn.Builder().setColumnName("col").setOrdinalPosition(0);
+ assertThatThrownBy(
+ () ->
StarRocksUtils.toStarRocksDataType(DataTypes.BYTES(), false, builder))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessageContaining("Unsupported CDC data type");
+ }
+
+ private void assertStarRocksDataType(DataType cdcType, String
expectedStarRocksType) {
+ StarRocksColumn.Builder builder =
+ new
StarRocksColumn.Builder().setColumnName("col").setOrdinalPosition(0);
+ StarRocksUtils.toStarRocksDataType(cdcType, false, builder);
+
assertThat(builder.build().getDataType()).isEqualTo(expectedStarRocksType);
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Tests for createFieldGetter
+ //
--------------------------------------------------------------------------------------------
+
+ @Test
+ void testCreateFieldGetterBoolean() {
+ RecordData.FieldGetter getter =
+ StarRocksUtils.createFieldGetter(new BooleanType(), 0,
ZoneId.of("UTC"));
+
+ BinaryRecordDataGenerator generator =
+ new BinaryRecordDataGenerator(new DataType[] {new
BooleanType()});
+ BinaryRecordData record = generator.generate(new Object[] {true});
+ assertThat(getter.getFieldOrNull(record)).isEqualTo(true);
+ }
+
+ @Test
+ void testCreateFieldGetterInt() {
+ RecordData.FieldGetter getter =
+ StarRocksUtils.createFieldGetter(new IntType(), 0,
ZoneId.of("UTC"));
+
+ BinaryRecordDataGenerator generator =
+ new BinaryRecordDataGenerator(new DataType[] {new IntType()});
+ BinaryRecordData record = generator.generate(new Object[] {42});
+ assertThat(getter.getFieldOrNull(record)).isEqualTo(42);
+ }
+
+ @Test
+ void testCreateFieldGetterVarChar() {
+ RecordData.FieldGetter getter =
+ StarRocksUtils.createFieldGetter(new VarCharType(100), 0,
ZoneId.of("UTC"));
+
+ BinaryRecordDataGenerator generator =
+ new BinaryRecordDataGenerator(new DataType[] {new
VarCharType(100)});
+ BinaryRecordData record =
+ generator.generate(new Object[]
{BinaryStringData.fromString("hello")});
+ assertThat(getter.getFieldOrNull(record)).isEqualTo("hello");
+ }
+
+ @Test
+ void testCreateFieldGetterDate() {
+ // Date field getter should format output as "yyyy-MM-dd"
+ RecordData.FieldGetter getter =
+ StarRocksUtils.createFieldGetter(new DateType(), 0,
ZoneId.of("UTC"));
+
+ BinaryRecordDataGenerator generator =
+ new BinaryRecordDataGenerator(new DataType[] {new DateType()});
+ BinaryRecordData record =
+ generator.generate(
+ new Object[]
{DateData.fromLocalDate(LocalDate.of(2024, 1, 15))});
+ assertThat(getter.getFieldOrNull(record)).isEqualTo("2024-01-15");
+ }
+
+ @Test
+ void testCreateFieldGetterTimestamp() {
+ // Timestamp field getter should format output as "yyyy-MM-dd HH:mm:ss"
+ RecordData.FieldGetter getter =
+ StarRocksUtils.createFieldGetter(new TimestampType(3), 0,
ZoneId.of("UTC"));
+
+ BinaryRecordDataGenerator generator =
+ new BinaryRecordDataGenerator(new DataType[] {new
TimestampType(3)});
+ BinaryRecordData record =
+ generator.generate(
+ new Object[] {
+ TimestampData.fromLocalDateTime(
+ LocalDateTime.of(2024, 1, 15, 10, 30, 0))
+ });
+ assertThat(getter.getFieldOrNull(record)).isEqualTo("2024-01-15
10:30:00");
+ }
+
+ @Test
+ void testCreateFieldGetterLocalZonedTimestamp() {
+ // LocalZonedTimestamp field getter should convert to the specified
zone
+ ZoneId zoneId = ZoneId.of("Asia/Shanghai");
+ RecordData.FieldGetter getter =
+ StarRocksUtils.createFieldGetter(new
LocalZonedTimestampType(6), 0, zoneId);
+
+ BinaryRecordDataGenerator generator =
+ new BinaryRecordDataGenerator(new DataType[] {new
LocalZonedTimestampType(6)});
+ BinaryRecordData record =
+ generator.generate(
+ new Object[] {
+ LocalZonedTimestampData.fromInstant(
+ LocalDateTime.of(2024, 1, 15, 10, 30, 0)
+ .toInstant(ZoneOffset.UTC))
+ });
+ // UTC 10:30 -> Asia/Shanghai 18:30
+ assertThat(getter.getFieldOrNull(record)).isEqualTo("2024-01-15
18:30:00");
+ }
+
+ @Test
+ void testCreateFieldGetterNullable() {
+ // For nullable types, null values should return null
+ RecordData.FieldGetter getter =
+ StarRocksUtils.createFieldGetter(DataTypes.INT(), 0,
ZoneId.of("UTC"));
+
+ BinaryRecordDataGenerator generator =
+ new BinaryRecordDataGenerator(new DataType[]
{DataTypes.INT()});
+ BinaryRecordData record = generator.generate(new Object[] {null});
+ assertThat(getter.getFieldOrNull(record)).isNull();
+ }
+
+ @Test
+ void testCreateFieldGetterUnsupportedType() {
+ assertThatThrownBy(
+ () ->
+ StarRocksUtils.createFieldGetter(
+ DataTypes.BYTES(), 0,
ZoneId.of("UTC")))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessageContaining("Don't support data type");
+ }
+}