This is an automated email from the ASF dual-hosted git repository.

lvyanquan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 7355d76c1 [FLINK-39758] Fix DECIMAL OOB in SchemaMergingUtils (#4419)
7355d76c1 is described below

commit 7355d76c132e19e7ddb9af25dd79a178ce53f1ff
Author: haruki <[email protected]>
AuthorDate: Wed Jun 3 09:39:54 2026 +0800

    [FLINK-39758] Fix DECIMAL OOB in SchemaMergingUtils (#4419)
    
    Co-authored-by: yuxiqian.yxq <[email protected]>
---
 .../flink/cdc/common/utils/SchemaMergingUtils.java |  25 ++-
 .../apache/flink/cdc/common/utils/SchemaUtils.java |  19 +-
 .../flink/cdc/common/utils/SchemaUtilsTest.java    |  22 +--
 .../flink/FlinkPipelineComposerITCase.java         | 210 ++++++++++++++++-----
 4 files changed, 191 insertions(+), 85 deletions(-)

diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java
index f1a8cde78..ad781d19a 100644
--- 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java
@@ -435,15 +435,7 @@ public class SchemaMergingUtils {
                             lhsDecimal.getPrecision() - lhsDecimal.getScale(),
                             rhsDecimal.getPrecision() - rhsDecimal.getScale());
             int resultScale = Math.max(lhsDecimal.getScale(), 
rhsDecimal.getScale());
-            Preconditions.checkArgument(
-                    resultIntDigits + resultScale <= DecimalType.MAX_PRECISION,
-                    String.format(
-                            "Failed to merge %s and %s type into DECIMAL. %d 
precision digits required, %d available",
-                            lType,
-                            rType,
-                            resultIntDigits + resultScale,
-                            DecimalType.MAX_PRECISION));
-            return DataTypes.DECIMAL(resultIntDigits + resultScale, 
resultScale);
+            return createDecimalBounded(resultIntDigits + resultScale, 
resultScale);
         } else if (lType instanceof DecimalType && 
rType.is(DataTypeFamily.EXACT_NUMERIC)) {
             // Merge decimal and int
             return mergeExactNumericsIntoDecimal((DecimalType) lType, rType);
@@ -461,11 +453,7 @@ public class SchemaMergingUtils {
                 Math.max(
                         decimalType.getPrecision(),
                         decimalType.getScale() + 
getNumericPrecision(otherType));
-        if (resultPrecision <= DecimalType.MAX_PRECISION) {
-            return DataTypes.DECIMAL(resultPrecision, decimalType.getScale());
-        } else {
-            return DataTypes.STRING();
-        }
+        return createDecimalBounded(resultPrecision, decimalType.getScale());
     }
 
     @VisibleForTesting
@@ -935,4 +923,13 @@ public class SchemaMergingUtils {
         mergingTree.put(VariantType.class, ImmutableList.of(stringType));
         return mergingTree;
     }
+
+    static DecimalType createDecimalBounded(int precision, int scale) {
+        if (precision > DecimalType.MAX_PRECISION) {
+            int lossDigits = precision - DecimalType.MAX_PRECISION;
+            return DataTypes.DECIMAL(precision - lossDigits, scale - 
lossDigits);
+        } else {
+            return DataTypes.DECIMAL(precision, scale);
+        }
+    }
 }
diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java
index 8ef9cd298..622555fa3 100644
--- 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java
@@ -51,6 +51,8 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static 
org.apache.flink.cdc.common.utils.SchemaMergingUtils.createDecimalBounded;
+
 /** Utils for {@link Schema} to perform the ability of evolution. */
 @PublicEvolving
 public class SchemaUtils {
@@ -575,15 +577,7 @@ public class SchemaUtils {
                             lhsDecimal.getPrecision() - lhsDecimal.getScale(),
                             rhsDecimal.getPrecision() - rhsDecimal.getScale());
             int resultScale = Math.max(lhsDecimal.getScale(), 
rhsDecimal.getScale());
-            Preconditions.checkArgument(
-                    resultIntDigits + resultScale <= DecimalType.MAX_PRECISION,
-                    String.format(
-                            "Failed to merge %s and %s type into DECIMAL. %d 
precision digits required, %d available",
-                            lType,
-                            rType,
-                            resultIntDigits + resultScale,
-                            DecimalType.MAX_PRECISION));
-            mergedType = DataTypes.DECIMAL(resultIntDigits + resultScale, 
resultScale);
+            mergedType = createDecimalBounded(resultIntDigits + resultScale, 
resultScale);
         } else if (lType instanceof DecimalType && 
rType.is(DataTypeFamily.EXACT_NUMERIC)) {
             // Merge decimal and int
             mergedType = mergeExactNumericsIntoDecimal((DecimalType) lType, 
rType);
@@ -608,12 +602,7 @@ public class SchemaUtils {
                 Math.max(
                         decimalType.getPrecision(),
                         decimalType.getScale() + 
getNumericPrecision(otherType));
-        Preconditions.checkArgument(
-                resultPrecision <= DecimalType.MAX_PRECISION,
-                String.format(
-                        "Failed to merge %s and %s type into DECIMAL. %d 
precision digits required, %d available",
-                        decimalType, otherType, resultPrecision, 
DecimalType.MAX_PRECISION));
-        return DataTypes.DECIMAL(resultPrecision, decimalType.getScale());
+        return createDecimalBounded(resultPrecision, decimalType.getScale());
     }
 
     @Deprecated
diff --git 
a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java
 
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java
index a5f3f86ad..304b9ae42 100644
--- 
a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java
+++ 
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java
@@ -314,21 +314,15 @@ class SchemaUtilsTest {
                 .isEqualTo(DataTypes.DECIMAL(12, 4));
 
         // Test overflow decimal conversions
-        Assertions.assertThatThrownBy(
-                        () ->
-                                SchemaUtils.inferWiderType(
-                                        DataTypes.DECIMAL(5, 5), 
DataTypes.DECIMAL(38, 0)))
-                .isExactlyInstanceOf(IllegalArgumentException.class)
-                .hasMessage(
-                        "Failed to merge DECIMAL(5, 5) NOT NULL and 
DECIMAL(38, 0) NOT NULL type into DECIMAL. 43 precision digits required, 38 
available");
+        Assertions.assertThat(
+                        SchemaUtils.inferWiderType(
+                                DataTypes.DECIMAL(5, 5), DataTypes.DECIMAL(38, 
0)))
+                .isEqualTo(DataTypes.DECIMAL(38, 0));
 
-        Assertions.assertThatThrownBy(
-                        () ->
-                                SchemaUtils.inferWiderType(
-                                        DataTypes.DECIMAL(38, 0), 
DataTypes.DECIMAL(5, 5)))
-                .isExactlyInstanceOf(IllegalArgumentException.class)
-                .hasMessage(
-                        "Failed to merge DECIMAL(38, 0) NOT NULL and 
DECIMAL(5, 5) NOT NULL type into DECIMAL. 43 precision digits required, 38 
available");
+        Assertions.assertThat(
+                        SchemaUtils.inferWiderType(
+                                DataTypes.DECIMAL(38, 0), DataTypes.DECIMAL(5, 
5)))
+                .isEqualTo(DataTypes.DECIMAL(38, 0));
 
         // Test merging with nullability
         Assertions.assertThat(
diff --git 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
index 80606a38a..f5b1e44d4 100644
--- 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
+++ 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
@@ -64,7 +64,9 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
@@ -1371,6 +1373,169 @@ class FlinkPipelineComposerITCase {
     @ParameterizedTest
     @EnumSource
     void testMergingDecimalWithVariousPrecisions(ValuesDataSink.SinkApi 
sinkApi) throws Exception {
+        List<Event> events = generateDecimalColumnEvents("default_table_");
+        List<String> expected =
+                Stream.of(
+                                "CreateTableEvent{tableId={}, 
schema=columns={`id` INT,`name` STRING,`age` INT,`fav_num` TINYINT}, 
primaryKeys=id, options=()}",
+                                "DataChangeEvent{tableId={}, before=[], 
after=[1, Alice, 17, 1], op=INSERT, meta=()}",
+                                "AlterColumnTypeEvent{tableId={}, 
typeMapping={fav_num=SMALLINT}, oldTypeMapping={fav_num=TINYINT}, comments={}}",
+                                "DataChangeEvent{tableId={}, before=[], 
after=[2, Alice, 17, 22], op=INSERT, meta=()}",
+                                "AlterColumnTypeEvent{tableId={}, 
typeMapping={fav_num=INT}, oldTypeMapping={fav_num=SMALLINT}, comments={}}",
+                                "DataChangeEvent{tableId={}, before=[], 
after=[3, Alice, 17, 3333], op=INSERT, meta=()}",
+                                "AlterColumnTypeEvent{tableId={}, 
typeMapping={fav_num=BIGINT}, oldTypeMapping={fav_num=INT}, comments={}}",
+                                "DataChangeEvent{tableId={}, before=[], 
after=[4, Alice, 17, 44444444], op=INSERT, meta=()}",
+                                "AlterColumnTypeEvent{tableId={}, 
typeMapping={fav_num=DECIMAL(19, 0)}, oldTypeMapping={fav_num=BIGINT}, 
comments={}}",
+                                "DataChangeEvent{tableId={}, before=[], 
after=[5, Alice, 17, 555555555555555], op=INSERT, meta=()}",
+                                "AlterColumnTypeEvent{tableId={}, 
typeMapping={fav_num=DECIMAL(24, 5)}, oldTypeMapping={fav_num=DECIMAL(19, 0)}, 
comments={}}",
+                                "DataChangeEvent{tableId={}, before=[], 
after=[6, Alice, 17, 66666.66666], op=INSERT, meta=()}",
+                                "DataChangeEvent{tableId={}, before=[], 
after=[7, Alice, 17, 77777777.17000], op=INSERT, meta=()}",
+                                "AlterColumnTypeEvent{tableId={}, 
typeMapping={fav_num=DECIMAL(38, 19)}, oldTypeMapping={fav_num=DECIMAL(24, 5)}, 
comments={}}",
+                                "DataChangeEvent{tableId={}, before=[], 
after=[8, Alice, 17, 888888888.8888888888888888888], op=INSERT, meta=()}",
+                                "DataChangeEvent{tableId={}, before=[], 
after=[101, Zen, 19, 1.0000000000000000000], op=INSERT, meta=()}",
+                                "DataChangeEvent{tableId={}, before=[], 
after=[102, Zen, 19, 22.0000000000000000000], op=INSERT, meta=()}",
+                                "DataChangeEvent{tableId={}, before=[], 
after=[103, Zen, 19, 3333.0000000000000000000], op=INSERT, meta=()}",
+                                "DataChangeEvent{tableId={}, before=[], 
after=[104, Zen, 19, 44444444.0000000000000000000], op=INSERT, meta=()}",
+                                "DataChangeEvent{tableId={}, before=[], 
after=[105, Zen, 19, 555555555555555.0000000000000000000], op=INSERT, meta=()}",
+                                "DataChangeEvent{tableId={}, before=[], 
after=[106, Zen, 19, 66666.6666600000000000000], op=INSERT, meta=()}",
+                                "DataChangeEvent{tableId={}, before=[], 
after=[107, Zen, 19, 77777777.1700000000000000000], op=INSERT, meta=()}",
+                                "DataChangeEvent{tableId={}, before=[], 
after=[108, Zen, 19, 888888888.8888888888888888888], op=INSERT, meta=()}")
+                        .map(
+                                s ->
+                                        s.replace(
+                                                "tableId={}",
+                                                
"tableId=default_namespace.default_schema.default_everything_merged"))
+                        .collect(Collectors.toList());
+
+        runGenericMergingTest(
+                sinkApi,
+                List.of(),
+                List.of(
+                        new RouteDef(
+                                
"default_namespace.default_schema.default_table_\\.*",
+                                
"default_namespace.default_schema.default_everything_merged",
+                                null,
+                                "Merge all decimal columns with different 
precision")),
+                events,
+                expected);
+    }
+
+    static Stream<Arguments> decimalOOB() {
+        return Stream.of(
+                Arguments.of(
+                        10,
+                        5,
+                        "12345.54321",
+                        19,
+                        3,
+                        "1234567890123456.789",
+                        List.of(
+                                
"CreateTableEvent{tableId=test_database.merged, schema=columns={`id` BIGINT NOT 
NULL,`dec` DECIMAL(10, 5)}, primaryKeys=id, options=()}",
+                                
"AlterColumnTypeEvent{tableId=test_database.merged, 
typeMapping={dec=DECIMAL(21, 5)}, oldTypeMapping={dec=DECIMAL(10, 5)}, 
comments={}}",
+                                "DataChangeEvent{tableId=test_database.merged, 
before=[], after=[1, 12345.54321], op=INSERT, meta=()}",
+                                "DataChangeEvent{tableId=test_database.merged, 
before=[], after=[2, 1234567890123456.78900], op=INSERT, meta=()}")),
+                Arguments.of(
+                        25,
+                        16,
+                        "123456789.1234567890123456",
+                        32,
+                        32,
+                        "0.12345678901234567890123456789012",
+                        List.of(
+                                
"CreateTableEvent{tableId=test_database.merged, schema=columns={`id` BIGINT NOT 
NULL,`dec` DECIMAL(25, 16)}, primaryKeys=id, options=()}",
+                                
"AlterColumnTypeEvent{tableId=test_database.merged, 
typeMapping={dec=DECIMAL(38, 29)}, oldTypeMapping={dec=DECIMAL(25, 16)}, 
comments={}}",
+                                "DataChangeEvent{tableId=test_database.merged, 
before=[], after=[1, 123456789.12345678901234560000000000000], op=INSERT, 
meta=()}",
+                                "DataChangeEvent{tableId=test_database.merged, 
before=[], after=[2, 0.12345678901234567890123456789], op=INSERT, meta=()}")),
+                Arguments.of(
+                        38,
+                        38,
+                        "0.12345678901234567890123456789012345678",
+                        38,
+                        0,
+                        "12345678901234567890123456789012345678",
+                        List.of(
+                                
"CreateTableEvent{tableId=test_database.merged, schema=columns={`id` BIGINT NOT 
NULL,`dec` DECIMAL(38, 38)}, primaryKeys=id, options=()}",
+                                
"AlterColumnTypeEvent{tableId=test_database.merged, 
typeMapping={dec=DECIMAL(38, 0)}, oldTypeMapping={dec=DECIMAL(38, 38)}, 
comments={}}",
+                                "DataChangeEvent{tableId=test_database.merged, 
before=[], after=[1, 0], op=INSERT, meta=()}",
+                                "DataChangeEvent{tableId=test_database.merged, 
before=[], after=[2, 12345678901234567890123456789012345678], op=INSERT, 
meta=()}")));
+    }
+
+    @ParameterizedTest(name = "merge Decimal({0}, {1}) and Decimal({3}, {4})")
+    @MethodSource("decimalOOB")
+    void testMergingDecimalWithOutOfBoundPrecisions(
+            int decimal1Precision,
+            int decimal1Scale,
+            String decimal1,
+            int decimal2Precision,
+            int decimal2Scale,
+            String decimal2,
+            List<String> expectedOutput)
+            throws Exception {
+        TableId tableId1 = TableId.tableId("test_database", "test_table_1");
+        TableId tableId2 = TableId.tableId("test_database", "test_table_2");
+        Schema schema1 =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.BIGINT().notNull())
+                        .physicalColumn("dec", 
DataTypes.DECIMAL(decimal1Precision, decimal1Scale))
+                        .primaryKey("id")
+                        .build();
+        Schema schema2 =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.BIGINT().notNull())
+                        .physicalColumn("dec", 
DataTypes.DECIMAL(decimal2Precision, decimal2Scale))
+                        .primaryKey("id")
+                        .build();
+        BinaryRecordDataGenerator generator1 =
+                new BinaryRecordDataGenerator(
+                        schema1.getColumnDataTypes().toArray(new DataType[0]));
+        BinaryRecordDataGenerator generator2 =
+                new BinaryRecordDataGenerator(
+                        schema2.getColumnDataTypes().toArray(new DataType[0]));
+        List<Event> events = new ArrayList<>();
+        events.add(new CreateTableEvent(tableId1, schema1));
+        events.add(new CreateTableEvent(tableId2, schema2));
+        events.add(
+                DataChangeEvent.insertEvent(
+                        tableId1,
+                        generator1.generate(
+                                new Object[] {
+                                    1L,
+                                    DecimalData.fromBigDecimal(
+                                            new BigDecimal(decimal1),
+                                            decimal1Precision,
+                                            decimal1Scale)
+                                })));
+        events.add(
+                DataChangeEvent.insertEvent(
+                        tableId2,
+                        generator2.generate(
+                                new Object[] {
+                                    2L,
+                                    DecimalData.fromBigDecimal(
+                                            new BigDecimal(decimal2),
+                                            decimal2Precision,
+                                            decimal2Scale)
+                                })));
+
+        runGenericMergingTest(
+                ValuesDataSink.SinkApi.SINK_V2,
+                List.of(),
+                List.of(
+                        new RouteDef(
+                                "test_database.test_table_\\.*",
+                                "test_database.merged",
+                                null,
+                                "Merge all decimal columns with different 
precision")),
+                events,
+                expectedOutput);
+    }
+
+    void runGenericMergingTest(
+            ValuesDataSink.SinkApi sinkApi,
+            List<TransformDef> transformDef,
+            List<RouteDef> routeDef,
+            List<Event> events,
+            List<String> expectedOutput)
+            throws Exception {
         FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
 
         // Setup value source
@@ -1379,7 +1544,6 @@ class FlinkPipelineComposerITCase {
                 ValuesDataSourceOptions.EVENT_SET_ID,
                 ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);
 
-        List<Event> events = generateDecimalColumnEvents("default_table_");
         
ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));
 
         SourceDef sourceDef =
@@ -1399,13 +1563,8 @@ class FlinkPipelineComposerITCase {
                 new PipelineDef(
                         sourceDef,
                         sinkDef,
-                        Collections.singletonList(
-                                new RouteDef(
-                                        
"default_namespace.default_schema.default_table_\\.*",
-                                        
"default_namespace.default_schema.default_everything_merged",
-                                        null,
-                                        "Merge all decimal columns with 
different precision")),
-                        Collections.emptyList(),
+                        routeDef,
+                        transformDef,
                         Collections.emptyList(),
                         pipelineConfig);
 
@@ -1416,40 +1575,7 @@ class FlinkPipelineComposerITCase {
 
         // Check the order and content of all received events
         String[] outputEvents = outCaptor.toString().trim().split("\n");
-
-        String[] expected =
-                Stream.of(
-                                "CreateTableEvent{tableId={}, 
schema=columns={`id` INT,`name` STRING,`age` INT,`fav_num` TINYINT}, 
primaryKeys=id, options=()}",
-                                "DataChangeEvent{tableId={}, before=[], 
after=[1, Alice, 17, 1], op=INSERT, meta=()}",
-                                "AlterColumnTypeEvent{tableId={}, 
typeMapping={fav_num=SMALLINT}, oldTypeMapping={fav_num=TINYINT}, comments={}}",
-                                "DataChangeEvent{tableId={}, before=[], 
after=[2, Alice, 17, 22], op=INSERT, meta=()}",
-                                "AlterColumnTypeEvent{tableId={}, 
typeMapping={fav_num=INT}, oldTypeMapping={fav_num=SMALLINT}, comments={}}",
-                                "DataChangeEvent{tableId={}, before=[], 
after=[3, Alice, 17, 3333], op=INSERT, meta=()}",
-                                "AlterColumnTypeEvent{tableId={}, 
typeMapping={fav_num=BIGINT}, oldTypeMapping={fav_num=INT}, comments={}}",
-                                "DataChangeEvent{tableId={}, before=[], 
after=[4, Alice, 17, 44444444], op=INSERT, meta=()}",
-                                "AlterColumnTypeEvent{tableId={}, 
typeMapping={fav_num=DECIMAL(19, 0)}, oldTypeMapping={fav_num=BIGINT}, 
comments={}}",
-                                "DataChangeEvent{tableId={}, before=[], 
after=[5, Alice, 17, 555555555555555], op=INSERT, meta=()}",
-                                "AlterColumnTypeEvent{tableId={}, 
typeMapping={fav_num=DECIMAL(24, 5)}, oldTypeMapping={fav_num=DECIMAL(19, 0)}, 
comments={}}",
-                                "DataChangeEvent{tableId={}, before=[], 
after=[6, Alice, 17, 66666.66666], op=INSERT, meta=()}",
-                                "DataChangeEvent{tableId={}, before=[], 
after=[7, Alice, 17, 77777777.17000], op=INSERT, meta=()}",
-                                "AlterColumnTypeEvent{tableId={}, 
typeMapping={fav_num=DECIMAL(38, 19)}, oldTypeMapping={fav_num=DECIMAL(24, 5)}, 
comments={}}",
-                                "DataChangeEvent{tableId={}, before=[], 
after=[8, Alice, 17, 888888888.8888888888888888888], op=INSERT, meta=()}",
-                                "DataChangeEvent{tableId={}, before=[], 
after=[101, Zen, 19, 1.0000000000000000000], op=INSERT, meta=()}",
-                                "DataChangeEvent{tableId={}, before=[], 
after=[102, Zen, 19, 22.0000000000000000000], op=INSERT, meta=()}",
-                                "DataChangeEvent{tableId={}, before=[], 
after=[103, Zen, 19, 3333.0000000000000000000], op=INSERT, meta=()}",
-                                "DataChangeEvent{tableId={}, before=[], 
after=[104, Zen, 19, 44444444.0000000000000000000], op=INSERT, meta=()}",
-                                "DataChangeEvent{tableId={}, before=[], 
after=[105, Zen, 19, 555555555555555.0000000000000000000], op=INSERT, meta=()}",
-                                "DataChangeEvent{tableId={}, before=[], 
after=[106, Zen, 19, 66666.6666600000000000000], op=INSERT, meta=()}",
-                                "DataChangeEvent{tableId={}, before=[], 
after=[107, Zen, 19, 77777777.1700000000000000000], op=INSERT, meta=()}",
-                                "DataChangeEvent{tableId={}, before=[], 
after=[108, Zen, 19, 888888888.8888888888888888888], op=INSERT, meta=()}")
-                        .map(
-                                s ->
-                                        s.replace(
-                                                "tableId={}",
-                                                
"tableId=default_namespace.default_schema.default_everything_merged"))
-                        .toArray(String[]::new);
-
-        assertThat(outputEvents).containsExactlyInAnyOrder(expected);
+        
assertThat(outputEvents).containsExactlyInAnyOrderElementsOf(expectedOutput);
     }
 
     private List<Event> generateTemporalColumnEvents(String tableNamePrefix) {

Reply via email to