This is an automated email from the ASF dual-hosted git repository.

renqs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new abb98ee25 [FLINK-34690] Cast decimal to VARCHAR as primary key in 
starrocks sink (#3150)
abb98ee25 is described below

commit abb98ee257527079a013462de0893bb9e2276693
Author: Hongshun Wang <[email protected]>
AuthorDate: Thu Apr 11 10:57:49 2024 +0800

    [FLINK-34690] Cast decimal to VARCHAR as primary key in starrocks sink 
(#3150)
---
 .../connectors/starrocks/sink/StarRocksUtils.java  | 16 ++++++--
 .../starrocks/sink/CdcDataTypeTransformerTest.java | 45 ++++++++++++++++++++++
 2 files changed, 58 insertions(+), 3 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
index ea43f0898..642a9dd3d 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
@@ -297,10 +297,20 @@ public class StarRocksUtils {
 
         @Override
         public StarRocksColumn.Builder visit(DecimalType decimalType) {
-            builder.setDataType(DECIMAL);
+            // StarRocks does not support Decimal as primary key, so decimal 
should be cast to
+            // VARCHAR.
+            if (!isPrimaryKeys) {
+                builder.setDataType(DECIMAL);
+                builder.setColumnSize(decimalType.getPrecision());
+                builder.setDecimalDigits(decimalType.getScale());
+            } else {
+                builder.setDataType(VARCHAR);
+                // For a DecimalType with precision N, we may need N + 1 or N 
+ 2 characters to store it as a
+                // string (one for negative sign, and one for decimal point)
+                builder.setColumnSize(Math.min(
+                        decimalType.getScale() != 0? 
decimalType.getPrecision() + 2:decimalType.getPrecision() + 1, 
MAX_VARCHAR_SIZE));
+            }
             builder.setNullable(decimalType.isNullable());
-            builder.setColumnSize(decimalType.getPrecision());
-            builder.setDecimalDigits(decimalType.getScale());
             return builder;
         }
 
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/CdcDataTypeTransformerTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/CdcDataTypeTransformerTest.java
index 778937705..ad76e4c5a 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/CdcDataTypeTransformerTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/CdcDataTypeTransformerTest.java
@@ -18,6 +18,7 @@
 package org.apache.flink.cdc.connectors.starrocks.sink;
 
 import org.apache.flink.cdc.common.types.CharType;
+import org.apache.flink.cdc.common.types.DecimalType;
 import org.apache.flink.cdc.common.types.VarCharType;
 
 import com.starrocks.connector.flink.catalog.StarRocksColumn;
@@ -75,6 +76,50 @@ public class CdcDataTypeTransformerTest {
         assertTrue(smallLengthColumn.isNullable());
     }
 
+    @Test
+    public void testDecimalForPrimaryKey() {
+        // Map to DECIMAL of StarRocks if column is DECIMAL type and not 
primary key.
+        StarRocksColumn.Builder noPrimaryKeyBuilder =
+                new 
StarRocksColumn.Builder().setColumnName("no_primary_key").setOrdinalPosition(0);
+        new DecimalType(20, 1)
+                .accept(new StarRocksUtils.CdcDataTypeTransformer(false, 
noPrimaryKeyBuilder));
+        StarRocksColumn noPrimaryKeyColumn = noPrimaryKeyBuilder.build();
+        assertEquals("no_primary_key", noPrimaryKeyColumn.getColumnName());
+        assertEquals(0, noPrimaryKeyColumn.getOrdinalPosition());
+        assertEquals(StarRocksUtils.DECIMAL, noPrimaryKeyColumn.getDataType());
+        assertEquals(Integer.valueOf(20), 
noPrimaryKeyColumn.getColumnSize().orElse(null));
+        assertEquals(Integer.valueOf(1), 
noPrimaryKeyColumn.getDecimalDigits().get());
+        assertTrue(noPrimaryKeyColumn.isNullable());
+
+        // Map to VARCHAR of StarRocks if column is DECIMAL type and primary 
key.
+        StarRocksColumn.Builder primaryKeyBuilder =
+                new 
StarRocksColumn.Builder().setColumnName("primary_key").setOrdinalPosition(1);
+        new DecimalType(20, 1)
+                .notNull()
+                .accept(new StarRocksUtils.CdcDataTypeTransformer(true, 
primaryKeyBuilder));
+        StarRocksColumn primaryKeyColumn = primaryKeyBuilder.build();
+        assertEquals("primary_key", primaryKeyColumn.getColumnName());
+        assertEquals(1, primaryKeyColumn.getOrdinalPosition());
+        assertEquals(StarRocksUtils.VARCHAR, primaryKeyColumn.getDataType());
+        assertEquals(Integer.valueOf(22), 
primaryKeyColumn.getColumnSize().orElse(null));
+        assertTrue(!primaryKeyColumn.isNullable());
+
+        // Map to VARCHAR of StarRocks if column is DECIMAL type and primary 
key
+        // DECIMAL(20,0) is common in cdc pipeline, for example, the upstream 
cdc source is unsigned
+        // BIGINT.
+        StarRocksColumn.Builder unsignedBigIntKeyBuilder =
+                new 
StarRocksColumn.Builder().setColumnName("primary_key").setOrdinalPosition(1);
+        new DecimalType(20, 0)
+                .notNull()
+                .accept(new StarRocksUtils.CdcDataTypeTransformer(true, 
unsignedBigIntKeyBuilder));
+        StarRocksColumn unsignedBigIntColumn = 
unsignedBigIntKeyBuilder.build();
+        assertEquals("primary_key", unsignedBigIntColumn.getColumnName());
+        assertEquals(1, unsignedBigIntColumn.getOrdinalPosition());
+        assertEquals(StarRocksUtils.VARCHAR, 
unsignedBigIntColumn.getDataType());
+        assertEquals(Integer.valueOf(21), 
unsignedBigIntColumn.getColumnSize().orElse(null));
+        assertTrue(!unsignedBigIntColumn.isNullable());
+    }
+
     @Test
     public void testVarCharType() {
         // the length fo StarRocks should be 3 times as that of CDC if CDC 
length * 3 <=

Reply via email to