This is an automated email from the ASF dual-hosted git repository.
lvyanquan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 7355d76c1 [FLINK-39758] Fix DECIMAL OOB in SchemaMergingUtils (#4419)
7355d76c1 is described below
commit 7355d76c132e19e7ddb9af25dd79a178ce53f1ff
Author: haruki <[email protected]>
AuthorDate: Wed Jun 3 09:39:54 2026 +0800
[FLINK-39758] Fix DECIMAL OOB in SchemaMergingUtils (#4419)
Co-authored-by: yuxiqian.yxq <[email protected]>
---
.../flink/cdc/common/utils/SchemaMergingUtils.java | 25 ++-
.../apache/flink/cdc/common/utils/SchemaUtils.java | 19 +-
.../flink/cdc/common/utils/SchemaUtilsTest.java | 22 +--
.../flink/FlinkPipelineComposerITCase.java | 210 ++++++++++++++++-----
4 files changed, 191 insertions(+), 85 deletions(-)
diff --git
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java
index f1a8cde78..ad781d19a 100644
---
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java
+++
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java
@@ -435,15 +435,7 @@ public class SchemaMergingUtils {
lhsDecimal.getPrecision() - lhsDecimal.getScale(),
rhsDecimal.getPrecision() - rhsDecimal.getScale());
int resultScale = Math.max(lhsDecimal.getScale(),
rhsDecimal.getScale());
- Preconditions.checkArgument(
- resultIntDigits + resultScale <= DecimalType.MAX_PRECISION,
- String.format(
- "Failed to merge %s and %s type into DECIMAL. %d
precision digits required, %d available",
- lType,
- rType,
- resultIntDigits + resultScale,
- DecimalType.MAX_PRECISION));
- return DataTypes.DECIMAL(resultIntDigits + resultScale,
resultScale);
+ return createDecimalBounded(resultIntDigits + resultScale,
resultScale);
} else if (lType instanceof DecimalType &&
rType.is(DataTypeFamily.EXACT_NUMERIC)) {
// Merge decimal and int
return mergeExactNumericsIntoDecimal((DecimalType) lType, rType);
@@ -461,11 +453,7 @@ public class SchemaMergingUtils {
Math.max(
decimalType.getPrecision(),
decimalType.getScale() +
getNumericPrecision(otherType));
- if (resultPrecision <= DecimalType.MAX_PRECISION) {
- return DataTypes.DECIMAL(resultPrecision, decimalType.getScale());
- } else {
- return DataTypes.STRING();
- }
+ return createDecimalBounded(resultPrecision, decimalType.getScale());
}
@VisibleForTesting
@@ -935,4 +923,13 @@ public class SchemaMergingUtils {
mergingTree.put(VariantType.class, ImmutableList.of(stringType));
return mergingTree;
}
+
+ static DecimalType createDecimalBounded(int precision, int scale) {
+ if (precision > DecimalType.MAX_PRECISION) {
+ int lossDigits = precision - DecimalType.MAX_PRECISION;
+ return DataTypes.DECIMAL(precision - lossDigits, scale -
lossDigits);
+ } else {
+ return DataTypes.DECIMAL(precision, scale);
+ }
+ }
}
diff --git
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java
index 8ef9cd298..622555fa3 100644
---
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java
+++
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java
@@ -51,6 +51,8 @@ import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static
org.apache.flink.cdc.common.utils.SchemaMergingUtils.createDecimalBounded;
+
/** Utils for {@link Schema} to perform the ability of evolution. */
@PublicEvolving
public class SchemaUtils {
@@ -575,15 +577,7 @@ public class SchemaUtils {
lhsDecimal.getPrecision() - lhsDecimal.getScale(),
rhsDecimal.getPrecision() - rhsDecimal.getScale());
int resultScale = Math.max(lhsDecimal.getScale(),
rhsDecimal.getScale());
- Preconditions.checkArgument(
- resultIntDigits + resultScale <= DecimalType.MAX_PRECISION,
- String.format(
- "Failed to merge %s and %s type into DECIMAL. %d
precision digits required, %d available",
- lType,
- rType,
- resultIntDigits + resultScale,
- DecimalType.MAX_PRECISION));
- mergedType = DataTypes.DECIMAL(resultIntDigits + resultScale,
resultScale);
+ mergedType = createDecimalBounded(resultIntDigits + resultScale,
resultScale);
} else if (lType instanceof DecimalType &&
rType.is(DataTypeFamily.EXACT_NUMERIC)) {
// Merge decimal and int
mergedType = mergeExactNumericsIntoDecimal((DecimalType) lType,
rType);
@@ -608,12 +602,7 @@ public class SchemaUtils {
Math.max(
decimalType.getPrecision(),
decimalType.getScale() +
getNumericPrecision(otherType));
- Preconditions.checkArgument(
- resultPrecision <= DecimalType.MAX_PRECISION,
- String.format(
- "Failed to merge %s and %s type into DECIMAL. %d
precision digits required, %d available",
- decimalType, otherType, resultPrecision,
DecimalType.MAX_PRECISION));
- return DataTypes.DECIMAL(resultPrecision, decimalType.getScale());
+ return createDecimalBounded(resultPrecision, decimalType.getScale());
}
@Deprecated
diff --git
a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java
index a5f3f86ad..304b9ae42 100644
---
a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java
+++
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java
@@ -314,21 +314,15 @@ class SchemaUtilsTest {
.isEqualTo(DataTypes.DECIMAL(12, 4));
// Test overflow decimal conversions
- Assertions.assertThatThrownBy(
- () ->
- SchemaUtils.inferWiderType(
- DataTypes.DECIMAL(5, 5),
DataTypes.DECIMAL(38, 0)))
- .isExactlyInstanceOf(IllegalArgumentException.class)
- .hasMessage(
- "Failed to merge DECIMAL(5, 5) NOT NULL and
DECIMAL(38, 0) NOT NULL type into DECIMAL. 43 precision digits required, 38
available");
+ Assertions.assertThat(
+ SchemaUtils.inferWiderType(
+ DataTypes.DECIMAL(5, 5), DataTypes.DECIMAL(38,
0)))
+ .isEqualTo(DataTypes.DECIMAL(38, 0));
- Assertions.assertThatThrownBy(
- () ->
- SchemaUtils.inferWiderType(
- DataTypes.DECIMAL(38, 0),
DataTypes.DECIMAL(5, 5)))
- .isExactlyInstanceOf(IllegalArgumentException.class)
- .hasMessage(
- "Failed to merge DECIMAL(38, 0) NOT NULL and
DECIMAL(5, 5) NOT NULL type into DECIMAL. 43 precision digits required, 38
available");
+ Assertions.assertThat(
+ SchemaUtils.inferWiderType(
+ DataTypes.DECIMAL(38, 0), DataTypes.DECIMAL(5,
5)))
+ .isEqualTo(DataTypes.DECIMAL(38, 0));
// Test merging with nullability
Assertions.assertThat(
diff --git
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
index 80606a38a..f5b1e44d4 100644
---
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
+++
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
@@ -64,7 +64,9 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
@@ -1371,6 +1373,169 @@ class FlinkPipelineComposerITCase {
@ParameterizedTest
@EnumSource
void testMergingDecimalWithVariousPrecisions(ValuesDataSink.SinkApi
sinkApi) throws Exception {
+ List<Event> events = generateDecimalColumnEvents("default_table_");
+ List<String> expected =
+ Stream.of(
+ "CreateTableEvent{tableId={},
schema=columns={`id` INT,`name` STRING,`age` INT,`fav_num` TINYINT},
primaryKeys=id, options=()}",
+ "DataChangeEvent{tableId={}, before=[],
after=[1, Alice, 17, 1], op=INSERT, meta=()}",
+ "AlterColumnTypeEvent{tableId={},
typeMapping={fav_num=SMALLINT}, oldTypeMapping={fav_num=TINYINT}, comments={}}",
+ "DataChangeEvent{tableId={}, before=[],
after=[2, Alice, 17, 22], op=INSERT, meta=()}",
+ "AlterColumnTypeEvent{tableId={},
typeMapping={fav_num=INT}, oldTypeMapping={fav_num=SMALLINT}, comments={}}",
+ "DataChangeEvent{tableId={}, before=[],
after=[3, Alice, 17, 3333], op=INSERT, meta=()}",
+ "AlterColumnTypeEvent{tableId={},
typeMapping={fav_num=BIGINT}, oldTypeMapping={fav_num=INT}, comments={}}",
+ "DataChangeEvent{tableId={}, before=[],
after=[4, Alice, 17, 44444444], op=INSERT, meta=()}",
+ "AlterColumnTypeEvent{tableId={},
typeMapping={fav_num=DECIMAL(19, 0)}, oldTypeMapping={fav_num=BIGINT},
comments={}}",
+ "DataChangeEvent{tableId={}, before=[],
after=[5, Alice, 17, 555555555555555], op=INSERT, meta=()}",
+ "AlterColumnTypeEvent{tableId={},
typeMapping={fav_num=DECIMAL(24, 5)}, oldTypeMapping={fav_num=DECIMAL(19, 0)},
comments={}}",
+ "DataChangeEvent{tableId={}, before=[],
after=[6, Alice, 17, 66666.66666], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId={}, before=[],
after=[7, Alice, 17, 77777777.17000], op=INSERT, meta=()}",
+ "AlterColumnTypeEvent{tableId={},
typeMapping={fav_num=DECIMAL(38, 19)}, oldTypeMapping={fav_num=DECIMAL(24, 5)},
comments={}}",
+ "DataChangeEvent{tableId={}, before=[],
after=[8, Alice, 17, 888888888.8888888888888888888], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId={}, before=[],
after=[101, Zen, 19, 1.0000000000000000000], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId={}, before=[],
after=[102, Zen, 19, 22.0000000000000000000], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId={}, before=[],
after=[103, Zen, 19, 3333.0000000000000000000], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId={}, before=[],
after=[104, Zen, 19, 44444444.0000000000000000000], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId={}, before=[],
after=[105, Zen, 19, 555555555555555.0000000000000000000], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId={}, before=[],
after=[106, Zen, 19, 66666.6666600000000000000], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId={}, before=[],
after=[107, Zen, 19, 77777777.1700000000000000000], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId={}, before=[],
after=[108, Zen, 19, 888888888.8888888888888888888], op=INSERT, meta=()}")
+ .map(
+ s ->
+ s.replace(
+ "tableId={}",
+
"tableId=default_namespace.default_schema.default_everything_merged"))
+ .collect(Collectors.toList());
+
+ runGenericMergingTest(
+ sinkApi,
+ List.of(),
+ List.of(
+ new RouteDef(
+
"default_namespace.default_schema.default_table_\\.*",
+
"default_namespace.default_schema.default_everything_merged",
+ null,
+ "Merge all decimal columns with different
precision")),
+ events,
+ expected);
+ }
+
+ static Stream<Arguments> decimalOOB() {
+ return Stream.of(
+ Arguments.of(
+ 10,
+ 5,
+ "12345.54321",
+ 19,
+ 3,
+ "1234567890123456.789",
+ List.of(
+
"CreateTableEvent{tableId=test_database.merged, schema=columns={`id` BIGINT NOT
NULL,`dec` DECIMAL(10, 5)}, primaryKeys=id, options=()}",
+
"AlterColumnTypeEvent{tableId=test_database.merged,
typeMapping={dec=DECIMAL(21, 5)}, oldTypeMapping={dec=DECIMAL(10, 5)},
comments={}}",
+ "DataChangeEvent{tableId=test_database.merged,
before=[], after=[1, 12345.54321], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=test_database.merged,
before=[], after=[2, 1234567890123456.78900], op=INSERT, meta=()}")),
+ Arguments.of(
+ 25,
+ 16,
+ "123456789.1234567890123456",
+ 32,
+ 32,
+ "0.12345678901234567890123456789012",
+ List.of(
+
"CreateTableEvent{tableId=test_database.merged, schema=columns={`id` BIGINT NOT
NULL,`dec` DECIMAL(25, 16)}, primaryKeys=id, options=()}",
+
"AlterColumnTypeEvent{tableId=test_database.merged,
typeMapping={dec=DECIMAL(38, 29)}, oldTypeMapping={dec=DECIMAL(25, 16)},
comments={}}",
+ "DataChangeEvent{tableId=test_database.merged,
before=[], after=[1, 123456789.12345678901234560000000000000], op=INSERT,
meta=()}",
+ "DataChangeEvent{tableId=test_database.merged,
before=[], after=[2, 0.12345678901234567890123456789], op=INSERT, meta=()}")),
+ Arguments.of(
+ 38,
+ 38,
+ "0.12345678901234567890123456789012345678",
+ 38,
+ 0,
+ "12345678901234567890123456789012345678",
+ List.of(
+
"CreateTableEvent{tableId=test_database.merged, schema=columns={`id` BIGINT NOT
NULL,`dec` DECIMAL(38, 38)}, primaryKeys=id, options=()}",
+
"AlterColumnTypeEvent{tableId=test_database.merged,
typeMapping={dec=DECIMAL(38, 0)}, oldTypeMapping={dec=DECIMAL(38, 38)},
comments={}}",
+ "DataChangeEvent{tableId=test_database.merged,
before=[], after=[1, 0], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=test_database.merged,
before=[], after=[2, 12345678901234567890123456789012345678], op=INSERT,
meta=()}")));
+ }
+
+ @ParameterizedTest(name = "merge Decimal({0}, {1}) and Decimal({3}, {4})")
+ @MethodSource("decimalOOB")
+ void testMergingDecimalWithOutOfBoundPrecisions(
+ int decimal1Precision,
+ int decimal1Scale,
+ String decimal1,
+ int decimal2Precision,
+ int decimal2Scale,
+ String decimal2,
+ List<String> expectedOutput)
+ throws Exception {
+ TableId tableId1 = TableId.tableId("test_database", "test_table_1");
+ TableId tableId2 = TableId.tableId("test_database", "test_table_2");
+ Schema schema1 =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.BIGINT().notNull())
+ .physicalColumn("dec",
DataTypes.DECIMAL(decimal1Precision, decimal1Scale))
+ .primaryKey("id")
+ .build();
+ Schema schema2 =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.BIGINT().notNull())
+ .physicalColumn("dec",
DataTypes.DECIMAL(decimal2Precision, decimal2Scale))
+ .primaryKey("id")
+ .build();
+ BinaryRecordDataGenerator generator1 =
+ new BinaryRecordDataGenerator(
+ schema1.getColumnDataTypes().toArray(new DataType[0]));
+ BinaryRecordDataGenerator generator2 =
+ new BinaryRecordDataGenerator(
+ schema2.getColumnDataTypes().toArray(new DataType[0]));
+ List<Event> events = new ArrayList<>();
+ events.add(new CreateTableEvent(tableId1, schema1));
+ events.add(new CreateTableEvent(tableId2, schema2));
+ events.add(
+ DataChangeEvent.insertEvent(
+ tableId1,
+ generator1.generate(
+ new Object[] {
+ 1L,
+ DecimalData.fromBigDecimal(
+ new BigDecimal(decimal1),
+ decimal1Precision,
+ decimal1Scale)
+ })));
+ events.add(
+ DataChangeEvent.insertEvent(
+ tableId2,
+ generator2.generate(
+ new Object[] {
+ 2L,
+ DecimalData.fromBigDecimal(
+ new BigDecimal(decimal2),
+ decimal2Precision,
+ decimal2Scale)
+ })));
+
+ runGenericMergingTest(
+ ValuesDataSink.SinkApi.SINK_V2,
+ List.of(),
+ List.of(
+ new RouteDef(
+ "test_database.test_table_\\.*",
+ "test_database.merged",
+ null,
+ "Merge all decimal columns with different
precision")),
+ events,
+ expectedOutput);
+ }
+
+ void runGenericMergingTest(
+ ValuesDataSink.SinkApi sinkApi,
+ List<TransformDef> transformDef,
+ List<RouteDef> routeDef,
+ List<Event> events,
+ List<String> expectedOutput)
+ throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
// Setup value source
@@ -1379,7 +1544,6 @@ class FlinkPipelineComposerITCase {
ValuesDataSourceOptions.EVENT_SET_ID,
ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);
- List<Event> events = generateDecimalColumnEvents("default_table_");
ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));
SourceDef sourceDef =
@@ -1399,13 +1563,8 @@ class FlinkPipelineComposerITCase {
new PipelineDef(
sourceDef,
sinkDef,
- Collections.singletonList(
- new RouteDef(
-
"default_namespace.default_schema.default_table_\\.*",
-
"default_namespace.default_schema.default_everything_merged",
- null,
- "Merge all decimal columns with
different precision")),
- Collections.emptyList(),
+ routeDef,
+ transformDef,
Collections.emptyList(),
pipelineConfig);
@@ -1416,40 +1575,7 @@ class FlinkPipelineComposerITCase {
// Check the order and content of all received events
String[] outputEvents = outCaptor.toString().trim().split("\n");
-
- String[] expected =
- Stream.of(
- "CreateTableEvent{tableId={},
schema=columns={`id` INT,`name` STRING,`age` INT,`fav_num` TINYINT},
primaryKeys=id, options=()}",
- "DataChangeEvent{tableId={}, before=[],
after=[1, Alice, 17, 1], op=INSERT, meta=()}",
- "AlterColumnTypeEvent{tableId={},
typeMapping={fav_num=SMALLINT}, oldTypeMapping={fav_num=TINYINT}, comments={}}",
- "DataChangeEvent{tableId={}, before=[],
after=[2, Alice, 17, 22], op=INSERT, meta=()}",
- "AlterColumnTypeEvent{tableId={},
typeMapping={fav_num=INT}, oldTypeMapping={fav_num=SMALLINT}, comments={}}",
- "DataChangeEvent{tableId={}, before=[],
after=[3, Alice, 17, 3333], op=INSERT, meta=()}",
- "AlterColumnTypeEvent{tableId={},
typeMapping={fav_num=BIGINT}, oldTypeMapping={fav_num=INT}, comments={}}",
- "DataChangeEvent{tableId={}, before=[],
after=[4, Alice, 17, 44444444], op=INSERT, meta=()}",
- "AlterColumnTypeEvent{tableId={},
typeMapping={fav_num=DECIMAL(19, 0)}, oldTypeMapping={fav_num=BIGINT},
comments={}}",
- "DataChangeEvent{tableId={}, before=[],
after=[5, Alice, 17, 555555555555555], op=INSERT, meta=()}",
- "AlterColumnTypeEvent{tableId={},
typeMapping={fav_num=DECIMAL(24, 5)}, oldTypeMapping={fav_num=DECIMAL(19, 0)},
comments={}}",
- "DataChangeEvent{tableId={}, before=[],
after=[6, Alice, 17, 66666.66666], op=INSERT, meta=()}",
- "DataChangeEvent{tableId={}, before=[],
after=[7, Alice, 17, 77777777.17000], op=INSERT, meta=()}",
- "AlterColumnTypeEvent{tableId={},
typeMapping={fav_num=DECIMAL(38, 19)}, oldTypeMapping={fav_num=DECIMAL(24, 5)},
comments={}}",
- "DataChangeEvent{tableId={}, before=[],
after=[8, Alice, 17, 888888888.8888888888888888888], op=INSERT, meta=()}",
- "DataChangeEvent{tableId={}, before=[],
after=[101, Zen, 19, 1.0000000000000000000], op=INSERT, meta=()}",
- "DataChangeEvent{tableId={}, before=[],
after=[102, Zen, 19, 22.0000000000000000000], op=INSERT, meta=()}",
- "DataChangeEvent{tableId={}, before=[],
after=[103, Zen, 19, 3333.0000000000000000000], op=INSERT, meta=()}",
- "DataChangeEvent{tableId={}, before=[],
after=[104, Zen, 19, 44444444.0000000000000000000], op=INSERT, meta=()}",
- "DataChangeEvent{tableId={}, before=[],
after=[105, Zen, 19, 555555555555555.0000000000000000000], op=INSERT, meta=()}",
- "DataChangeEvent{tableId={}, before=[],
after=[106, Zen, 19, 66666.6666600000000000000], op=INSERT, meta=()}",
- "DataChangeEvent{tableId={}, before=[],
after=[107, Zen, 19, 77777777.1700000000000000000], op=INSERT, meta=()}",
- "DataChangeEvent{tableId={}, before=[],
after=[108, Zen, 19, 888888888.8888888888888888888], op=INSERT, meta=()}")
- .map(
- s ->
- s.replace(
- "tableId={}",
-
"tableId=default_namespace.default_schema.default_everything_merged"))
- .toArray(String[]::new);
-
- assertThat(outputEvents).containsExactlyInAnyOrder(expected);
+
assertThat(outputEvents).containsExactlyInAnyOrderElementsOf(expectedOutput);
}
private List<Event> generateTemporalColumnEvents(String tableNamePrefix) {