This is an automated email from the ASF dual-hosted git repository.
renqs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new abb98ee25 [FLINK-34690] Cast decimal to VARCHAR as primary key in
starrocks sink (#3150)
abb98ee25 is described below
commit abb98ee257527079a013462de0893bb9e2276693
Author: Hongshun Wang <[email protected]>
AuthorDate: Thu Apr 11 10:57:49 2024 +0800
[FLINK-34690] Cast decimal to VARCHAR as primary key in starrocks sink
(#3150)
---
.../connectors/starrocks/sink/StarRocksUtils.java | 16 ++++++--
.../starrocks/sink/CdcDataTypeTransformerTest.java | 45 ++++++++++++++++++++++
2 files changed, 58 insertions(+), 3 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
index ea43f0898..642a9dd3d 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
@@ -297,10 +297,20 @@ public class StarRocksUtils {
@Override
public StarRocksColumn.Builder visit(DecimalType decimalType) {
- builder.setDataType(DECIMAL);
+ // StarRocks does not support Decimal as primary key, so decimal
should be cast to
+ // VARCHAR.
+ if (!isPrimaryKeys) {
+ builder.setDataType(DECIMAL);
+ builder.setColumnSize(decimalType.getPrecision());
+ builder.setDecimalDigits(decimalType.getScale());
+ } else {
+ builder.setDataType(VARCHAR);
+ // For a DecimalType with precision N, we may need N + 1 or N
+ 2 characters to store it as a
+ // string (one for negative sign, and one for decimal point)
+ builder.setColumnSize(Math.min(
+ decimalType.getScale() != 0?
decimalType.getPrecision() + 2:decimalType.getPrecision() + 1,
MAX_VARCHAR_SIZE));
+ }
builder.setNullable(decimalType.isNullable());
- builder.setColumnSize(decimalType.getPrecision());
- builder.setDecimalDigits(decimalType.getScale());
return builder;
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/CdcDataTypeTransformerTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/CdcDataTypeTransformerTest.java
index 778937705..ad76e4c5a 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/CdcDataTypeTransformerTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/CdcDataTypeTransformerTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.cdc.connectors.starrocks.sink;
import org.apache.flink.cdc.common.types.CharType;
+import org.apache.flink.cdc.common.types.DecimalType;
import org.apache.flink.cdc.common.types.VarCharType;
import com.starrocks.connector.flink.catalog.StarRocksColumn;
@@ -75,6 +76,50 @@ public class CdcDataTypeTransformerTest {
assertTrue(smallLengthColumn.isNullable());
}
+ @Test
+ public void testDecimalForPrimaryKey() {
+ // Map to DECIMAL of StarRocks if column is DECIMAL type and not
primary key.
+ StarRocksColumn.Builder noPrimaryKeyBuilder =
+ new
StarRocksColumn.Builder().setColumnName("no_primary_key").setOrdinalPosition(0);
+ new DecimalType(20, 1)
+ .accept(new StarRocksUtils.CdcDataTypeTransformer(false,
noPrimaryKeyBuilder));
+ StarRocksColumn noPrimaryKeyColumn = noPrimaryKeyBuilder.build();
+ assertEquals("no_primary_key", noPrimaryKeyColumn.getColumnName());
+ assertEquals(0, noPrimaryKeyColumn.getOrdinalPosition());
+ assertEquals(StarRocksUtils.DECIMAL, noPrimaryKeyColumn.getDataType());
+ assertEquals(Integer.valueOf(20),
noPrimaryKeyColumn.getColumnSize().orElse(null));
+ assertEquals(Integer.valueOf(1),
noPrimaryKeyColumn.getDecimalDigits().get());
+ assertTrue(noPrimaryKeyColumn.isNullable());
+
+ // Map to VARCHAR of StarRocks if column is DECIMAL type and primary
key.
+ StarRocksColumn.Builder primaryKeyBuilder =
+ new
StarRocksColumn.Builder().setColumnName("primary_key").setOrdinalPosition(1);
+ new DecimalType(20, 1)
+ .notNull()
+ .accept(new StarRocksUtils.CdcDataTypeTransformer(true,
primaryKeyBuilder));
+ StarRocksColumn primaryKeyColumn = primaryKeyBuilder.build();
+ assertEquals("primary_key", primaryKeyColumn.getColumnName());
+ assertEquals(1, primaryKeyColumn.getOrdinalPosition());
+ assertEquals(StarRocksUtils.VARCHAR, primaryKeyColumn.getDataType());
+ assertEquals(Integer.valueOf(22),
primaryKeyColumn.getColumnSize().orElse(null));
+ assertTrue(!primaryKeyColumn.isNullable());
+
+ // Map to VARCHAR of StarRocks if column is DECIMAL type and primary
key
+ // DECIMAL(20,0) is common in cdc pipeline, for example, the upstream
cdc source is unsigned
+ // BIGINT.
+ StarRocksColumn.Builder unsignedBigIntKeyBuilder =
+ new
StarRocksColumn.Builder().setColumnName("primary_key").setOrdinalPosition(1);
+ new DecimalType(20, 0)
+ .notNull()
+ .accept(new StarRocksUtils.CdcDataTypeTransformer(true,
unsignedBigIntKeyBuilder));
+ StarRocksColumn unsignedBigIntColumn =
unsignedBigIntKeyBuilder.build();
+ assertEquals("primary_key", unsignedBigIntColumn.getColumnName());
+ assertEquals(1, unsignedBigIntColumn.getOrdinalPosition());
+ assertEquals(StarRocksUtils.VARCHAR,
unsignedBigIntColumn.getDataType());
+ assertEquals(Integer.valueOf(21),
unsignedBigIntColumn.getColumnSize().orElse(null));
+ assertTrue(!unsignedBigIntColumn.isNullable());
+ }
+
@Test
public void testVarCharType() {
// the length fo StarRocks should be 3 times as that of CDC if CDC
length * 3 <=