yuxiqian commented on code in PR #3150:
URL: https://github.com/apache/flink-cdc/pull/3150#discussion_r1547445765


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java:
##########
@@ -297,10 +297,29 @@ public StarRocksColumn.Builder visit(DoubleType 
doubleType) {
 
         @Override
         public StarRocksColumn.Builder visit(DecimalType decimalType) {
-            builder.setDataType(DECIMAL);
+            // StarRocks is not support Decimal as primary key, so decimal 
should be cast to INT,

Review Comment:
   ```suggestion
               // StarRocks does not support Decimal as primary key, so decimal 
should be cast to INT,
   ```



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java:
##########
@@ -297,10 +297,29 @@ public StarRocksColumn.Builder visit(DoubleType 
doubleType) {
 
         @Override
         public StarRocksColumn.Builder visit(DecimalType decimalType) {
-            builder.setDataType(DECIMAL);
+            // StarRocks is not support Decimal as primary key, so decimal 
should be cast to INT,
+            // BIGINT, LARGEINT or VARHCAR.
+            if (!isPrimaryKeys) {
+                builder.setDataType(DECIMAL);
+                builder.setColumnSize(decimalType.getPrecision());
+                builder.setDecimalDigits(decimalType.getScale());
+            } else if (decimalType.getPrecision() < 10 && 
decimalType.getScale() == 0) {
+                // Int is range from [-2,147,483,648, 2,147,483,647],
+                // some data with precision of 10 is out of range.
+                builder.setDataType(INT);
+            } else if (decimalType.getPrecision() < 19 && 
decimalType.getScale() == 0) {
+                // BigInt is range from 
[-9,223.372,036,854,775,808~9,223.372,036,854,775,807],
+                // some data with precision of 19 is out of range.
+                builder.setDataType(BIGINT);
+            } else if (decimalType.getPrecision() < 38 && 
decimalType.getScale() == 0) {
+                // LargeInt is range from [-1.701411835E38 ~ 1.701411835E38],

Review Comment:
   ```suggestion
                   // LargeInt ranges from -1.701411835E38 to 1.701411835E38,
   ```



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/CdcDataTypeTransformerTest.java:
##########
@@ -75,6 +76,68 @@ public void testCharTypeForPrimaryKey() {
         assertTrue(smallLengthColumn.isNullable());
     }
 
+    @Test
+    public void testDecimalForPrimaryKey() {
+        // map to Int of StarRocks if primary key column with precision < 10 
and scale = 0
+        StarRocksColumn.Builder intLengthBuilder =
+                new 
StarRocksColumn.Builder().setColumnName("primary_key").setOrdinalPosition(0);
+        new DecimalType(9, 0)
+                .accept(new StarRocksUtils.CdcDataTypeTransformer(true, 
intLengthBuilder));
+        StarRocksColumn intLengthColumn = intLengthBuilder.build();
+        assertEquals("primary_key", intLengthColumn.getColumnName());
+        assertEquals(0, intLengthColumn.getOrdinalPosition());
+        assertEquals(StarRocksUtils.INT, intLengthColumn.getDataType());
+        assertTrue(intLengthColumn.isNullable());
+
+        // map to BigInt of StarRocks if primary key column with precision < 
19 and scale = 0
+        StarRocksColumn.Builder bigIntLengthBuilder =
+                new 
StarRocksColumn.Builder().setColumnName("primary_key").setOrdinalPosition(0);
+        new DecimalType(10, 0)
+                .accept(new StarRocksUtils.CdcDataTypeTransformer(true, 
bigIntLengthBuilder));
+        StarRocksColumn bigIntLengthColumn = bigIntLengthBuilder.build();
+        assertEquals("primary_key", bigIntLengthColumn.getColumnName());
+        assertEquals(0, bigIntLengthColumn.getOrdinalPosition());
+        assertEquals(StarRocksUtils.BIGINT, bigIntLengthColumn.getDataType());
+        assertTrue(bigIntLengthColumn.isNullable());
+
+        // map to LARGEINT of StarRocks if primary key column with precision < 
18 and scale = 0
+        StarRocksColumn.Builder unsignedBigIntLengthBuilder =
+                new 
StarRocksColumn.Builder().setColumnName("primary_key").setOrdinalPosition(0);
+        new DecimalType(20, 0)

Review Comment:
   Should add checks when `DecimalType` precision goes across specific 
boundaries like from 9 to 10, 18 to 19, and 37 to 38. Maybe add test cases with 
these edge cases.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java:
##########
@@ -297,10 +297,29 @@ public StarRocksColumn.Builder visit(DoubleType 
doubleType) {
 
         @Override
         public StarRocksColumn.Builder visit(DecimalType decimalType) {
-            builder.setDataType(DECIMAL);
+            // StarRocks is not support Decimal as primary key, so decimal 
should be cast to INT,
+            // BIGINT, LARGEINT or VARHCAR.
+            if (!isPrimaryKeys) {
+                builder.setDataType(DECIMAL);
+                builder.setColumnSize(decimalType.getPrecision());
+                builder.setDecimalDigits(decimalType.getScale());
+            } else if (decimalType.getPrecision() < 10 && 
decimalType.getScale() == 0) {
+                // Int is range from [-2,147,483,648, 2,147,483,647],
+                // some data with precision of 10 is out of range.
+                builder.setDataType(INT);
+            } else if (decimalType.getPrecision() < 19 && 
decimalType.getScale() == 0) {
+                // BigInt is range from 
[-9,223.372,036,854,775,808~9,223.372,036,854,775,807],

Review Comment:
   ```suggestion
                   // BigInt ranges from -9,223,372,036,854,775,808 to 
9,223,372,036,854,775,807,
   ```



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java:
##########
@@ -129,7 +129,7 @@ public static void toStarRocksDataType(
      * @param fieldType the element type of the RecordData
      * @param fieldPos the element position of the RecordData
      * @param zoneId the time zone used when converting from <code>TIMESTAMP 
WITH LOCAL TIME ZONE
-     *     </code>
+     *                  </code>

Review Comment:
   Is this change intended?



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java:
##########
@@ -297,10 +297,29 @@ public StarRocksColumn.Builder visit(DoubleType 
doubleType) {
 
         @Override
         public StarRocksColumn.Builder visit(DecimalType decimalType) {
-            builder.setDataType(DECIMAL);
+            // StarRocks is not support Decimal as primary key, so decimal 
should be cast to INT,
+            // BIGINT, LARGEINT or VARHCAR.
+            if (!isPrimaryKeys) {
+                builder.setDataType(DECIMAL);
+                builder.setColumnSize(decimalType.getPrecision());
+                builder.setDecimalDigits(decimalType.getScale());
+            } else if (decimalType.getPrecision() < 10 && 
decimalType.getScale() == 0) {
+                // Int is range from [-2,147,483,648, 2,147,483,647],

Review Comment:
   ```suggestion
                   // Int ranges from -2,147,483,648 to 2,147,483,647,
   ```



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java:
##########
@@ -297,10 +297,29 @@ public StarRocksColumn.Builder visit(DoubleType 
doubleType) {
 
         @Override
         public StarRocksColumn.Builder visit(DecimalType decimalType) {
-            builder.setDataType(DECIMAL);
+            // StarRocks is not support Decimal as primary key, so decimal 
should be cast to INT,
+            // BIGINT, LARGEINT or VARHCAR.
+            if (!isPrimaryKeys) {
+                builder.setDataType(DECIMAL);
+                builder.setColumnSize(decimalType.getPrecision());
+                builder.setDecimalDigits(decimalType.getScale());
+            } else if (decimalType.getPrecision() < 10 && 
decimalType.getScale() == 0) {

Review Comment:
   `getScale() == 0` check was repeated 3 times. Maybe check it first, and 
fallback to `VARCHAR` if such DecimalType were used?



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/CdcDataTypeTransformerTest.java:
##########
@@ -75,6 +76,68 @@ public void testCharTypeForPrimaryKey() {
         assertTrue(smallLengthColumn.isNullable());
     }
 
+    @Test
+    public void testDecimalForPrimaryKey() {
+        // map to Int of StarRocks if primary key column with precision < 10 
and scale = 0
+        StarRocksColumn.Builder intLengthBuilder =
+                new 
StarRocksColumn.Builder().setColumnName("primary_key").setOrdinalPosition(0);
+        new DecimalType(9, 0)
+                .accept(new StarRocksUtils.CdcDataTypeTransformer(true, 
intLengthBuilder));
+        StarRocksColumn intLengthColumn = intLengthBuilder.build();
+        assertEquals("primary_key", intLengthColumn.getColumnName());
+        assertEquals(0, intLengthColumn.getOrdinalPosition());
+        assertEquals(StarRocksUtils.INT, intLengthColumn.getDataType());
+        assertTrue(intLengthColumn.isNullable());
+
+        // map to BigInt of StarRocks if primary key column with precision < 
19 and scale = 0
+        StarRocksColumn.Builder bigIntLengthBuilder =
+                new 
StarRocksColumn.Builder().setColumnName("primary_key").setOrdinalPosition(0);
+        new DecimalType(10, 0)
+                .accept(new StarRocksUtils.CdcDataTypeTransformer(true, 
bigIntLengthBuilder));
+        StarRocksColumn bigIntLengthColumn = bigIntLengthBuilder.build();
+        assertEquals("primary_key", bigIntLengthColumn.getColumnName());
+        assertEquals(0, bigIntLengthColumn.getOrdinalPosition());
+        assertEquals(StarRocksUtils.BIGINT, bigIntLengthColumn.getDataType());
+        assertTrue(bigIntLengthColumn.isNullable());
+
+        // map to LARGEINT of StarRocks if primary key column with precision < 
18 and scale = 0
+        StarRocksColumn.Builder unsignedBigIntLengthBuilder =
+                new 
StarRocksColumn.Builder().setColumnName("primary_key").setOrdinalPosition(0);
+        new DecimalType(20, 0)

Review Comment:
   Should add checks when `DecimalType` precision goes across specific 
boundaries like from 9 to 10, 18 to 19, and 37 to 38.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/CdcDataTypeTransformerTest.java:
##########
@@ -75,6 +76,68 @@ public void testCharTypeForPrimaryKey() {
         assertTrue(smallLengthColumn.isNullable());
     }
 
+    @Test
+    public void testDecimalForPrimaryKey() {
+        // map to Int of StarRocks if primary key column with precision < 10 
and scale = 0
+        StarRocksColumn.Builder intLengthBuilder =
+                new 
StarRocksColumn.Builder().setColumnName("primary_key").setOrdinalPosition(0);
+        new DecimalType(9, 0)
+                .accept(new StarRocksUtils.CdcDataTypeTransformer(true, 
intLengthBuilder));
+        StarRocksColumn intLengthColumn = intLengthBuilder.build();
+        assertEquals("primary_key", intLengthColumn.getColumnName());
+        assertEquals(0, intLengthColumn.getOrdinalPosition());
+        assertEquals(StarRocksUtils.INT, intLengthColumn.getDataType());
+        assertTrue(intLengthColumn.isNullable());
+
+        // map to BigInt of StarRocks if primary key column with precision < 
19 and scale = 0
+        StarRocksColumn.Builder bigIntLengthBuilder =
+                new 
StarRocksColumn.Builder().setColumnName("primary_key").setOrdinalPosition(0);
+        new DecimalType(10, 0)
+                .accept(new StarRocksUtils.CdcDataTypeTransformer(true, 
bigIntLengthBuilder));
+        StarRocksColumn bigIntLengthColumn = bigIntLengthBuilder.build();
+        assertEquals("primary_key", bigIntLengthColumn.getColumnName());
+        assertEquals(0, bigIntLengthColumn.getOrdinalPosition());
+        assertEquals(StarRocksUtils.BIGINT, bigIntLengthColumn.getDataType());
+        assertTrue(bigIntLengthColumn.isNullable());
+
+        // map to LARGEINT of StarRocks if primary key column with precision < 
18 and scale = 0
+        StarRocksColumn.Builder unsignedBigIntLengthBuilder =
+                new 
StarRocksColumn.Builder().setColumnName("primary_key").setOrdinalPosition(0);
+        new DecimalType(20, 0)
+                .accept(
+                        new StarRocksUtils.CdcDataTypeTransformer(
+                                true, unsignedBigIntLengthBuilder));
+        StarRocksColumn unsignedBigIntLengthColumn = 
unsignedBigIntLengthBuilder.build();
+        assertEquals("primary_key", 
unsignedBigIntLengthColumn.getColumnName());
+        assertEquals(0, unsignedBigIntLengthColumn.getOrdinalPosition());
+        assertEquals(StarRocksUtils.LARGEINT, 
unsignedBigIntLengthColumn.getDataType());
+        assertTrue(unsignedBigIntLengthColumn.isNullable());
+
+        // map to Varchar of StarRocks if primary key column with precision >= 
38 and scale = 0
+        StarRocksColumn.Builder outOfBoundLengthBuilder =
+                new 
StarRocksColumn.Builder().setColumnName("primary_key").setOrdinalPosition(0);
+        new DecimalType(38, 0)
+                .accept(new StarRocksUtils.CdcDataTypeTransformer(true, 
outOfBoundLengthBuilder));
+        StarRocksColumn outOfBoundColumn = outOfBoundLengthBuilder.build();
+        assertEquals("primary_key", outOfBoundColumn.getColumnName());
+        assertEquals(0, outOfBoundColumn.getOrdinalPosition());
+        assertEquals(StarRocksUtils.VARCHAR, outOfBoundColumn.getDataType());
+        assertEquals(Integer.valueOf(38), 
outOfBoundColumn.getColumnSize().orElse(null));
+        assertTrue(unsignedBigIntLengthColumn.isNullable());
+
+        // map to Varchar of StarRocks if primary key column with scale > 0
+        StarRocksColumn.Builder scaleBuilder =
+                new 
StarRocksColumn.Builder().setColumnName("primary_key").setOrdinalPosition(0);
+        new DecimalType(12, 1)
+                .accept(new StarRocksUtils.CdcDataTypeTransformer(true, 
scaleBuilder));
+        StarRocksColumn scaleBoundColumn = scaleBuilder.build();
+        assertEquals("primary_key", scaleBoundColumn.getColumnName());
+        assertEquals(0, scaleBoundColumn.getOrdinalPosition());
+        assertEquals(StarRocksUtils.VARCHAR, scaleBoundColumn.getDataType());
+        assertEquals(Integer.valueOf(12), 
scaleBoundColumn.getColumnSize().orElse(null));
+        assertTrue(unsignedBigIntLengthColumn.isNullable());

Review Comment:
   Minor suggestion: maybe create an ad-hoc function like 
`checkPrimaryColumnTypeCast(DataType in, DataType expected)` to reduce 
duplicated codes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to