This is an automated email from the ASF dual-hosted git repository.

lvyanquan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 343778f39 [FLINK-39756] Fix unable to coerce complex types to STRING 
(#4414)
343778f39 is described below

commit 343778f39a8fc64a7e20a48f86a3ae373ae63c12
Author: haruki <[email protected]>
AuthorDate: Wed May 27 12:01:58 2026 +0800

    [FLINK-39756] Fix unable to coerce complex types to STRING (#4414)
    
    Added complex data type coercion support (Array, Map, Record) to 
SchemaMergingUtils,
    along with corresponding unit tests to verify these conversion behaviors. 
Also included
    related scenario tests in integration tests.
    
    Additionally, fixed missing imports for ArrayData and JavaObjectConverter 
classes
    that were causing compilation errors during the complex type string 
coercion process.
    
    Co-authored-by: 春栖 <[email protected]>
---
 .../flink/cdc/common/utils/SchemaMergingUtils.java |  10 ++
 .../cdc/common/utils/SchemaMergingUtilsTest.java   |  49 ++++++++-
 .../flink/FlinkPipelineBatchComposerITCase.java    | 122 +++++++++++++++++++++
 3 files changed, 180 insertions(+), 1 deletion(-)

diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java
index a7a5e2bf9..f1a8cde78 100644
--- 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java
@@ -20,9 +20,13 @@ package org.apache.flink.cdc.common.utils;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cdc.common.annotation.PublicEvolving;
 import org.apache.flink.cdc.common.annotation.VisibleForTesting;
+import org.apache.flink.cdc.common.converter.JavaObjectConverter;
+import org.apache.flink.cdc.common.data.ArrayData;
 import org.apache.flink.cdc.common.data.DateData;
 import org.apache.flink.cdc.common.data.DecimalData;
 import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
+import org.apache.flink.cdc.common.data.MapData;
+import org.apache.flink.cdc.common.data.RecordData;
 import org.apache.flink.cdc.common.data.StringData;
 import org.apache.flink.cdc.common.data.TimeData;
 import org.apache.flink.cdc.common.data.TimestampData;
@@ -609,6 +613,12 @@ public class SchemaMergingUtils {
             return BinaryStringData.fromString(((Variant) 
originalField).toJson());
         }
 
+        if (originalField instanceof MapData
+                || originalField instanceof ArrayData
+                || originalField instanceof RecordData) {
+            Object javaObject = 
JavaObjectConverter.convertToJava(originalField, originalType);
+            return BinaryStringData.fromString(javaObject.toString());
+        }
         return BinaryStringData.fromString(originalField.toString());
     }
 
diff --git 
a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaMergingUtilsTest.java
 
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaMergingUtilsTest.java
index 1f53668d4..f6a0e5c0f 100644
--- 
a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaMergingUtilsTest.java
+++ 
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaMergingUtilsTest.java
@@ -21,10 +21,15 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.cdc.common.data.DateData;
 import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.data.GenericArrayData;
+import org.apache.flink.cdc.common.data.GenericMapData;
+import org.apache.flink.cdc.common.data.GenericRecordData;
 import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
 import org.apache.flink.cdc.common.data.TimeData;
 import org.apache.flink.cdc.common.data.TimestampData;
 import org.apache.flink.cdc.common.data.ZonedTimestampData;
+import org.apache.flink.cdc.common.data.binary.BinaryArrayData;
+import org.apache.flink.cdc.common.data.binary.BinaryMapData;
 import org.apache.flink.cdc.common.data.binary.BinaryStringData;
 import org.apache.flink.cdc.common.event.AddColumnEvent;
 import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
@@ -823,7 +828,29 @@ class SchemaMergingUtilsTest {
                                 TIMESTAMP_TZ,
                                 zTsOf("2019", "02", "02"),
                                 STRING,
-                                binStrOf("2019-02-02T00:00:00Z")));
+                                binStrOf("2019-02-02T00:00:00Z")),
+
+                        // From ARRAY
+                        Tuple4.of(
+                                ARRAY,
+                                new GenericArrayData(
+                                        new Object[] {binStrOf("hello"), 
binStrOf("world")}),
+                                STRING,
+                                binStrOf("[hello, world]")),
+
+                        // From MAP
+                        Tuple4.of(
+                                MAP,
+                                new 
GenericMapData(Collections.singletonMap(42, binStrOf("value"))),
+                                STRING,
+                                binStrOf("{42=value}")),
+
+                        // From ROW
+                        Tuple4.of(
+                                ROW,
+                                GenericRecordData.of(42, binStrOf("Alice")),
+                                STRING,
+                                binStrOf("[42, Alice]")));
 
         conversionExpects.forEach(
                 rule ->
@@ -832,6 +859,26 @@ class SchemaMergingUtilsTest {
                                 .isEqualTo(rule.f3));
     }
 
+    @Test
+    void testCoerceObjectBinaryTypes() {
+        DataType intArrayType = DataTypes.ARRAY(DataTypes.INT());
+        DataType intIntMapType = DataTypes.MAP(DataTypes.INT(), 
DataTypes.INT());
+
+        // BinaryArrayData to STRING
+        BinaryArrayData binaryArray = BinaryArrayData.fromPrimitiveArray(new 
int[] {1, 2, 3});
+        Assertions.assertThat(coerceObject("UTC", binaryArray, intArrayType, 
STRING))
+                .as("Try coercing BinaryArrayData to STRING")
+                .isEqualTo(binStrOf("[1, 2, 3]"));
+
+        // BinaryMapData to STRING
+        BinaryArrayData keys = BinaryArrayData.fromPrimitiveArray(new int[] 
{10});
+        BinaryArrayData values = BinaryArrayData.fromPrimitiveArray(new int[] 
{100});
+        BinaryMapData binaryMap = BinaryMapData.valueOf(keys, values);
+        Assertions.assertThat(coerceObject("UTC", binaryMap, intIntMapType, 
STRING))
+                .as("Try coercing BinaryMapData to STRING")
+                .isEqualTo(binStrOf("{10=100}"));
+    }
+
     @Test
     void testCoerceRow() {
         Assertions.assertThat(
diff --git 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineBatchComposerITCase.java
 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineBatchComposerITCase.java
index 5e521c3df..ac17ec130 100644
--- 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineBatchComposerITCase.java
+++ 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineBatchComposerITCase.java
@@ -19,6 +19,9 @@ package org.apache.flink.cdc.composer.flink;
 
 import org.apache.flink.cdc.common.configuration.Configuration;
 import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.data.GenericArrayData;
+import org.apache.flink.cdc.common.data.GenericMapData;
+import org.apache.flink.cdc.common.data.GenericRecordData;
 import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
 import org.apache.flink.cdc.common.data.TimestampData;
 import org.apache.flink.cdc.common.data.ZonedTimestampData;
@@ -886,6 +889,125 @@ public class FlinkPipelineBatchComposerITCase {
                         
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], 
after=[4, Donald, 25, student], op=INSERT, meta=()}");
     }
 
+    @ParameterizedTest
+    @EnumSource
+    void testMergingComplexTypesWithRouteInBatchMode(ValuesDataSink.SinkApi 
sinkApi)
+            throws Exception {
+        FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+        // Setup value source
+        Configuration sourceConfig = new Configuration();
+        sourceConfig.set(
+                ValuesDataSourceOptions.EVENT_SET_ID,
+                ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);
+        sourceConfig.set(ValuesDataSourceOptions.BATCH_MODE_ENABLED, true);
+
+        TableId myTable1 = TableId.tableId("default_namespace", 
"default_schema", "mytable1");
+        TableId myTable2 = TableId.tableId("default_namespace", 
"default_schema", "mytable2");
+
+        // Table 1 has complex types: ARRAY, MAP, ROW
+        Schema table1Schema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT())
+                        .physicalColumn("arr", 
DataTypes.ARRAY(DataTypes.INT()))
+                        .physicalColumn("mp", 
DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()))
+                        .physicalColumn("rw", DataTypes.ROW(DataTypes.INT(), 
DataTypes.STRING()))
+                        .primaryKey("id")
+                        .build();
+
+        // Table 2 has STRING columns at the same positions, forcing coercion
+        Schema table2Schema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT())
+                        .physicalColumn("arr", DataTypes.STRING())
+                        .physicalColumn("mp", DataTypes.STRING())
+                        .physicalColumn("rw", DataTypes.STRING())
+                        .primaryKey("id")
+                        .build();
+
+        BinaryRecordDataGenerator table1dataGenerator =
+                new BinaryRecordDataGenerator(
+                        table1Schema.getColumnDataTypes().toArray(new 
DataType[0]));
+        BinaryRecordDataGenerator table2dataGenerator =
+                new BinaryRecordDataGenerator(
+                        table2Schema.getColumnDataTypes().toArray(new 
DataType[0]));
+
+        List<Event> events = new ArrayList<>();
+        events.add(new CreateTableEvent(myTable1, table1Schema));
+        events.add(new CreateTableEvent(myTable2, table2Schema));
+
+        // Table 1: insert with complex typed data
+        events.add(
+                DataChangeEvent.insertEvent(
+                        myTable1,
+                        table1dataGenerator.generate(
+                                new Object[] {
+                                    1,
+                                    new GenericArrayData(new int[] {10, 20, 
30}),
+                                    new GenericMapData(
+                                            Collections.singletonMap(
+                                                    
BinaryStringData.fromString("key"), 42)),
+                                    GenericRecordData.of(7, 
BinaryStringData.fromString("hello"))
+                                })));
+
+        // Table 2: insert with plain strings
+        events.add(
+                DataChangeEvent.insertEvent(
+                        myTable2,
+                        table2dataGenerator.generate(
+                                new Object[] {
+                                    2,
+                                    BinaryStringData.fromString("plain_arr"),
+                                    BinaryStringData.fromString("plain_mp"),
+                                    BinaryStringData.fromString("plain_rw")
+                                })));
+
+        
ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));
+
+        SourceDef sourceDef =
+                new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", 
sourceConfig);
+
+        // Setup value sink
+        Configuration sinkConfig = new Configuration();
+        sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+        sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
+        SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value 
Sink", sinkConfig);
+
+        // Setup route
+        TableId mergedTable = TableId.tableId("default_namespace", 
"default_schema", "merged");
+        List<RouteDef> routeDef =
+                Collections.singletonList(
+                        new RouteDef(
+                                
"default_namespace.default_schema.mytable[0-9]",
+                                mergedTable.toString(),
+                                null,
+                                null));
+
+        // Setup pipeline
+        Configuration pipelineConfig = new Configuration();
+        pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        pipelineConfig.set(
+                PipelineOptions.PIPELINE_EXECUTION_RUNTIME_MODE, 
RuntimeExecutionMode.BATCH);
+        PipelineDef pipelineDef =
+                new PipelineDef(
+                        sourceDef,
+                        sinkDef,
+                        routeDef,
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        pipelineConfig);
+
+        // Execute the pipeline
+        PipelineExecution execution = composer.compose(pipelineDef);
+        execution.execute();
+        String[] outputEvents = outCaptor.toString().trim().split("\n");
+        assertThat(outputEvents)
+                .containsExactly(
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.merged, 
schema=columns={`id` INT,`arr` STRING,`mp` STRING,`rw` STRING}, primaryKeys=id, 
options=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], 
after=[1, [10, 20, 30], {key=42}, [7, hello]], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], 
after=[2, plain_arr, plain_mp, plain_rw], op=INSERT, meta=()}");
+    }
+
     @ParameterizedTest
     @EnumSource
     void testTransformMergingWithRoute(ValuesDataSink.SinkApi sinkApi) throws 
Exception {

Reply via email to