yuxiqian commented on code in PR #3150:
URL: https://github.com/apache/flink-cdc/pull/3150#discussion_r1547445765
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java:
##########
@@ -297,10 +297,29 @@ public StarRocksColumn.Builder visit(DoubleType
doubleType) {
@Override
public StarRocksColumn.Builder visit(DecimalType decimalType) {
- builder.setDataType(DECIMAL);
+ // StarRocks is not support Decimal as primary key, so decimal
should be cast to INT,
Review Comment:
```suggestion
// StarRocks does not support Decimal as primary key, so decimal
should be cast to INT,
```
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java:
##########
@@ -297,10 +297,29 @@ public StarRocksColumn.Builder visit(DoubleType
doubleType) {
@Override
public StarRocksColumn.Builder visit(DecimalType decimalType) {
- builder.setDataType(DECIMAL);
+ // StarRocks is not support Decimal as primary key, so decimal
should be cast to INT,
+ // BIGINT, LARGEINT or VARHCAR.
+ if (!isPrimaryKeys) {
+ builder.setDataType(DECIMAL);
+ builder.setColumnSize(decimalType.getPrecision());
+ builder.setDecimalDigits(decimalType.getScale());
+ } else if (decimalType.getPrecision() < 10 &&
decimalType.getScale() == 0) {
+ // Int is range from [-2,147,483,648, 2,147,483,647],
+ // some data with precision of 10 is out of range.
+ builder.setDataType(INT);
+ } else if (decimalType.getPrecision() < 19 &&
decimalType.getScale() == 0) {
+ // BigInt is range from
[-9,223.372,036,854,775,808~9,223.372,036,854,775,807],
+ // some data with precision of 19 is out of range.
+ builder.setDataType(BIGINT);
+ } else if (decimalType.getPrecision() < 38 &&
decimalType.getScale() == 0) {
+ // LargeInt is range from [-1.701411835E38 ~ 1.701411835E38],
Review Comment:
```suggestion
// LargeInt ranges from -1.701411835E38 to 1.701411835E38,
```
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/CdcDataTypeTransformerTest.java:
##########
@@ -75,6 +76,68 @@ public void testCharTypeForPrimaryKey() {
assertTrue(smallLengthColumn.isNullable());
}
+ @Test
+ public void testDecimalForPrimaryKey() {
+ // map to Int of StarRocks if primary key column with precision < 10
and scale = 0
+ StarRocksColumn.Builder intLengthBuilder =
+ new
StarRocksColumn.Builder().setColumnName("primary_key").setOrdinalPosition(0);
+ new DecimalType(9, 0)
+ .accept(new StarRocksUtils.CdcDataTypeTransformer(true,
intLengthBuilder));
+ StarRocksColumn intLengthColumn = intLengthBuilder.build();
+ assertEquals("primary_key", intLengthColumn.getColumnName());
+ assertEquals(0, intLengthColumn.getOrdinalPosition());
+ assertEquals(StarRocksUtils.INT, intLengthColumn.getDataType());
+ assertTrue(intLengthColumn.isNullable());
+
+ // map to BigInt of StarRocks if primary key column with precision <
19 and scale = 0
+ StarRocksColumn.Builder bigIntLengthBuilder =
+ new
StarRocksColumn.Builder().setColumnName("primary_key").setOrdinalPosition(0);
+ new DecimalType(10, 0)
+ .accept(new StarRocksUtils.CdcDataTypeTransformer(true,
bigIntLengthBuilder));
+ StarRocksColumn bigIntLengthColumn = bigIntLengthBuilder.build();
+ assertEquals("primary_key", bigIntLengthColumn.getColumnName());
+ assertEquals(0, bigIntLengthColumn.getOrdinalPosition());
+ assertEquals(StarRocksUtils.BIGINT, bigIntLengthColumn.getDataType());
+ assertTrue(bigIntLengthColumn.isNullable());
+
+ // map to LARGEINT of StarRocks if primary key column with precision <
18 and scale = 0
+ StarRocksColumn.Builder unsignedBigIntLengthBuilder =
+ new
StarRocksColumn.Builder().setColumnName("primary_key").setOrdinalPosition(0);
+ new DecimalType(20, 0)
Review Comment:
Should add checks when `DecimalType` precision goes across specific
boundaries like from 9 to 10, 18 to 19, and 37 to 38. Maybe add test cases with
these edge cases.
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java:
##########
@@ -297,10 +297,29 @@ public StarRocksColumn.Builder visit(DoubleType
doubleType) {
@Override
public StarRocksColumn.Builder visit(DecimalType decimalType) {
- builder.setDataType(DECIMAL);
+ // StarRocks is not support Decimal as primary key, so decimal
should be cast to INT,
+ // BIGINT, LARGEINT or VARHCAR.
+ if (!isPrimaryKeys) {
+ builder.setDataType(DECIMAL);
+ builder.setColumnSize(decimalType.getPrecision());
+ builder.setDecimalDigits(decimalType.getScale());
+ } else if (decimalType.getPrecision() < 10 &&
decimalType.getScale() == 0) {
+ // Int is range from [-2,147,483,648, 2,147,483,647],
+ // some data with precision of 10 is out of range.
+ builder.setDataType(INT);
+ } else if (decimalType.getPrecision() < 19 &&
decimalType.getScale() == 0) {
+ // BigInt is range from
[-9,223.372,036,854,775,808~9,223.372,036,854,775,807],
Review Comment:
```suggestion
// BigInt ranges from -9,223,372,036,854,775,808 to
9,223,372,036,854,775,807,
```
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java:
##########
@@ -129,7 +129,7 @@ public static void toStarRocksDataType(
* @param fieldType the element type of the RecordData
* @param fieldPos the element position of the RecordData
* @param zoneId the time zone used when converting from <code>TIMESTAMP
WITH LOCAL TIME ZONE
- * </code>
+ * </code>
Review Comment:
Is this change intended?
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java:
##########
@@ -297,10 +297,29 @@ public StarRocksColumn.Builder visit(DoubleType
doubleType) {
@Override
public StarRocksColumn.Builder visit(DecimalType decimalType) {
- builder.setDataType(DECIMAL);
+ // StarRocks is not support Decimal as primary key, so decimal
should be cast to INT,
+ // BIGINT, LARGEINT or VARHCAR.
+ if (!isPrimaryKeys) {
+ builder.setDataType(DECIMAL);
+ builder.setColumnSize(decimalType.getPrecision());
+ builder.setDecimalDigits(decimalType.getScale());
+ } else if (decimalType.getPrecision() < 10 &&
decimalType.getScale() == 0) {
+ // Int is range from [-2,147,483,648, 2,147,483,647],
Review Comment:
```suggestion
// Int ranges from -2,147,483,648 to 2,147,483,647,
```
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java:
##########
@@ -297,10 +297,29 @@ public StarRocksColumn.Builder visit(DoubleType
doubleType) {
@Override
public StarRocksColumn.Builder visit(DecimalType decimalType) {
- builder.setDataType(DECIMAL);
+ // StarRocks is not support Decimal as primary key, so decimal
should be cast to INT,
+ // BIGINT, LARGEINT or VARHCAR.
+ if (!isPrimaryKeys) {
+ builder.setDataType(DECIMAL);
+ builder.setColumnSize(decimalType.getPrecision());
+ builder.setDecimalDigits(decimalType.getScale());
+ } else if (decimalType.getPrecision() < 10 &&
decimalType.getScale() == 0) {
Review Comment:
`getScale() == 0` check was repeated 3 times. Maybe check it first, and
fallback to `VARCHAR` if such DecimalType were used?
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/CdcDataTypeTransformerTest.java:
##########
@@ -75,6 +76,68 @@ public void testCharTypeForPrimaryKey() {
assertTrue(smallLengthColumn.isNullable());
}
+ @Test
+ public void testDecimalForPrimaryKey() {
+ // map to Int of StarRocks if primary key column with precision < 10
and scale = 0
+ StarRocksColumn.Builder intLengthBuilder =
+ new
StarRocksColumn.Builder().setColumnName("primary_key").setOrdinalPosition(0);
+ new DecimalType(9, 0)
+ .accept(new StarRocksUtils.CdcDataTypeTransformer(true,
intLengthBuilder));
+ StarRocksColumn intLengthColumn = intLengthBuilder.build();
+ assertEquals("primary_key", intLengthColumn.getColumnName());
+ assertEquals(0, intLengthColumn.getOrdinalPosition());
+ assertEquals(StarRocksUtils.INT, intLengthColumn.getDataType());
+ assertTrue(intLengthColumn.isNullable());
+
+ // map to BigInt of StarRocks if primary key column with precision <
19 and scale = 0
+ StarRocksColumn.Builder bigIntLengthBuilder =
+ new
StarRocksColumn.Builder().setColumnName("primary_key").setOrdinalPosition(0);
+ new DecimalType(10, 0)
+ .accept(new StarRocksUtils.CdcDataTypeTransformer(true,
bigIntLengthBuilder));
+ StarRocksColumn bigIntLengthColumn = bigIntLengthBuilder.build();
+ assertEquals("primary_key", bigIntLengthColumn.getColumnName());
+ assertEquals(0, bigIntLengthColumn.getOrdinalPosition());
+ assertEquals(StarRocksUtils.BIGINT, bigIntLengthColumn.getDataType());
+ assertTrue(bigIntLengthColumn.isNullable());
+
+ // map to LARGEINT of StarRocks if primary key column with precision <
18 and scale = 0
+ StarRocksColumn.Builder unsignedBigIntLengthBuilder =
+ new
StarRocksColumn.Builder().setColumnName("primary_key").setOrdinalPosition(0);
+ new DecimalType(20, 0)
Review Comment:
Should add checks when `DecimalType` precision goes across specific
boundaries like from 9 to 10, 18 to 19, and 37 to 38.
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/CdcDataTypeTransformerTest.java:
##########
@@ -75,6 +76,68 @@ public void testCharTypeForPrimaryKey() {
assertTrue(smallLengthColumn.isNullable());
}
+ @Test
+ public void testDecimalForPrimaryKey() {
+ // map to Int of StarRocks if primary key column with precision < 10
and scale = 0
+ StarRocksColumn.Builder intLengthBuilder =
+ new
StarRocksColumn.Builder().setColumnName("primary_key").setOrdinalPosition(0);
+ new DecimalType(9, 0)
+ .accept(new StarRocksUtils.CdcDataTypeTransformer(true,
intLengthBuilder));
+ StarRocksColumn intLengthColumn = intLengthBuilder.build();
+ assertEquals("primary_key", intLengthColumn.getColumnName());
+ assertEquals(0, intLengthColumn.getOrdinalPosition());
+ assertEquals(StarRocksUtils.INT, intLengthColumn.getDataType());
+ assertTrue(intLengthColumn.isNullable());
+
+ // map to BigInt of StarRocks if primary key column with precision <
19 and scale = 0
+ StarRocksColumn.Builder bigIntLengthBuilder =
+ new
StarRocksColumn.Builder().setColumnName("primary_key").setOrdinalPosition(0);
+ new DecimalType(10, 0)
+ .accept(new StarRocksUtils.CdcDataTypeTransformer(true,
bigIntLengthBuilder));
+ StarRocksColumn bigIntLengthColumn = bigIntLengthBuilder.build();
+ assertEquals("primary_key", bigIntLengthColumn.getColumnName());
+ assertEquals(0, bigIntLengthColumn.getOrdinalPosition());
+ assertEquals(StarRocksUtils.BIGINT, bigIntLengthColumn.getDataType());
+ assertTrue(bigIntLengthColumn.isNullable());
+
+ // map to LARGEINT of StarRocks if primary key column with precision <
18 and scale = 0
+ StarRocksColumn.Builder unsignedBigIntLengthBuilder =
+ new
StarRocksColumn.Builder().setColumnName("primary_key").setOrdinalPosition(0);
+ new DecimalType(20, 0)
+ .accept(
+ new StarRocksUtils.CdcDataTypeTransformer(
+ true, unsignedBigIntLengthBuilder));
+ StarRocksColumn unsignedBigIntLengthColumn =
unsignedBigIntLengthBuilder.build();
+ assertEquals("primary_key",
unsignedBigIntLengthColumn.getColumnName());
+ assertEquals(0, unsignedBigIntLengthColumn.getOrdinalPosition());
+ assertEquals(StarRocksUtils.LARGEINT,
unsignedBigIntLengthColumn.getDataType());
+ assertTrue(unsignedBigIntLengthColumn.isNullable());
+
+ // map to Varchar of StarRocks if primary key column with precision >=
38 and scale = 0
+ StarRocksColumn.Builder outOfBoundLengthBuilder =
+ new
StarRocksColumn.Builder().setColumnName("primary_key").setOrdinalPosition(0);
+ new DecimalType(38, 0)
+ .accept(new StarRocksUtils.CdcDataTypeTransformer(true,
outOfBoundLengthBuilder));
+ StarRocksColumn outOfBoundColumn = outOfBoundLengthBuilder.build();
+ assertEquals("primary_key", outOfBoundColumn.getColumnName());
+ assertEquals(0, outOfBoundColumn.getOrdinalPosition());
+ assertEquals(StarRocksUtils.VARCHAR, outOfBoundColumn.getDataType());
+ assertEquals(Integer.valueOf(38),
outOfBoundColumn.getColumnSize().orElse(null));
+ assertTrue(unsignedBigIntLengthColumn.isNullable());
+
+ // map to Varchar of StarRocks if primary key column with scale > 0
+ StarRocksColumn.Builder scaleBuilder =
+ new
StarRocksColumn.Builder().setColumnName("primary_key").setOrdinalPosition(0);
+ new DecimalType(12, 1)
+ .accept(new StarRocksUtils.CdcDataTypeTransformer(true,
scaleBuilder));
+ StarRocksColumn scaleBoundColumn = scaleBuilder.build();
+ assertEquals("primary_key", scaleBoundColumn.getColumnName());
+ assertEquals(0, scaleBoundColumn.getOrdinalPosition());
+ assertEquals(StarRocksUtils.VARCHAR, scaleBoundColumn.getDataType());
+ assertEquals(Integer.valueOf(12),
scaleBoundColumn.getColumnSize().orElse(null));
+ assertTrue(unsignedBigIntLengthColumn.isNullable());
Review Comment:
Minor suggestion: maybe create an ad-hoc function like
`checkPrimaryColumnTypeCast(DataType in, DataType expected)` to reduce
duplicated codes?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]