This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch release-3.2
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git

commit debd43cdd75eb53ceed8c2483e0a14edc1b265c6
Author: yuxiqian <[email protected]>
AuthorDate: Mon Aug 26 17:46:37 2024 +0800

    [FLINK-36128][cdc-runtime] Fix potential unrecoverable in-flight data 
exception by promoting LENIENT as the default schema change behavior
    
    This closes #3574.
    
    (cherry picked from commit 2e938a92f5335515f7eb46077254aee25acd9107)
---
 .../cli/parser/YamlPipelineDefinitionParser.java   | 38 ++++++++----
 .../parser/YamlPipelineDefinitionParserTest.java   | 36 +++++++++++-
 .../cdc/common/event/SchemaChangeEventType.java    | 20 +++++--
 .../flink/cdc/common/pipeline/PipelineOptions.java |  2 +-
 .../flink/cdc/common/utils/ChangeEventUtils.java   |  8 +--
 .../cdc/common/utils/ChangeEventUtilsTest.java     | 67 ++++++++++++----------
 .../flink/FlinkPipelineComposerITCase.java         | 21 +++++++
 .../flink/FlinkPipelineTransformITCase.java        | 13 +++++
 .../cdc/composer/flink/FlinkPipelineUdfITCase.java | 21 +++++++
 .../flink/cdc/pipeline/tests/MysqlE2eITCase.java   |  3 +-
 .../tests/SchemaEvolvingTransformE2eITCase.java    | 10 ++--
 .../cdc/pipeline/tests/TransformE2eITCase.java     | 18 ++++--
 .../runtime/partitioning/PrePartitionOperator.java |  6 +-
 13 files changed, 195 insertions(+), 68 deletions(-)

diff --git 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
index d5df8eda1..ffba60784 100644
--- 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
+++ 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
@@ -19,6 +19,8 @@ package org.apache.flink.cdc.cli.parser;
 
 import org.apache.flink.cdc.common.configuration.Configuration;
 import org.apache.flink.cdc.common.event.SchemaChangeEventType;
+import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
+import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
 import org.apache.flink.cdc.common.utils.StringUtils;
 import org.apache.flink.cdc.composer.definition.PipelineDef;
 import org.apache.flink.cdc.composer.definition.RouteDef;
@@ -35,11 +37,13 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.yaml.YA
 
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
+import static 
org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR;
 import static 
org.apache.flink.cdc.common.utils.ChangeEventUtils.resolveSchemaEvolutionOptions;
 import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull;
 
@@ -99,6 +103,19 @@ public class YamlPipelineDefinitionParser implements 
PipelineDefinitionParser {
 
     private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration 
globalPipelineConfig)
             throws Exception {
+
+        // UDFs are optional. We parse UDF first and remove it from the 
pipelineDefJsonNode since
+        // it's not of plain data types and must be removed before calling 
toPipelineConfig.
+        List<UdfDef> udfDefs = new ArrayList<>();
+        Optional.ofNullable(((ObjectNode) 
pipelineDefJsonNode.get(PIPELINE_KEY)).remove(UDF_KEY))
+                .ifPresent(node -> node.forEach(udf -> 
udfDefs.add(toUdfDef(udf))));
+
+        // Pipeline configs are optional
+        Configuration userPipelineConfig = 
toPipelineConfig(pipelineDefJsonNode.get(PIPELINE_KEY));
+
+        SchemaChangeBehavior schemaChangeBehavior =
+                userPipelineConfig.get(PIPELINE_SCHEMA_CHANGE_BEHAVIOR);
+
         // Source is required
         SourceDef sourceDef =
                 toSourceDef(
@@ -113,7 +130,8 @@ public class YamlPipelineDefinitionParser implements 
PipelineDefinitionParser {
                         checkNotNull(
                                 pipelineDefJsonNode.get(SINK_KEY),
                                 "Missing required field \"%s\" in pipeline 
definition",
-                                SINK_KEY));
+                                SINK_KEY),
+                        schemaChangeBehavior);
 
         // Transforms are optional
         List<TransformDef> transformDefs = new ArrayList<>();
@@ -128,14 +146,6 @@ public class YamlPipelineDefinitionParser implements 
PipelineDefinitionParser {
         Optional.ofNullable(pipelineDefJsonNode.get(ROUTE_KEY))
                 .ifPresent(node -> node.forEach(route -> 
routeDefs.add(toRouteDef(route))));
 
-        // UDFs are optional
-        List<UdfDef> udfDefs = new ArrayList<>();
-        Optional.ofNullable(((ObjectNode) 
pipelineDefJsonNode.get(PIPELINE_KEY)).remove(UDF_KEY))
-                .ifPresent(node -> node.forEach(udf -> 
udfDefs.add(toUdfDef(udf))));
-
-        // Pipeline configs are optional
-        Configuration userPipelineConfig = 
toPipelineConfig(pipelineDefJsonNode.get(PIPELINE_KEY));
-
         // Merge user config into global config
         Configuration pipelineConfig = new Configuration();
         pipelineConfig.addAll(globalPipelineConfig);
@@ -162,7 +172,7 @@ public class YamlPipelineDefinitionParser implements 
PipelineDefinitionParser {
         return new SourceDef(type, name, Configuration.fromMap(sourceMap));
     }
 
-    private SinkDef toSinkDef(JsonNode sinkNode) {
+    private SinkDef toSinkDef(JsonNode sinkNode, SchemaChangeBehavior 
schemaChangeBehavior) {
         List<String> includedSETypes = new ArrayList<>();
         List<String> excludedSETypes = new ArrayList<>();
 
@@ -172,6 +182,14 @@ public class YamlPipelineDefinitionParser implements 
PipelineDefinitionParser {
         Optional.ofNullable(sinkNode.get(EXCLUDE_SCHEMA_EVOLUTION_TYPES))
                 .ifPresent(e -> e.forEach(tag -> 
excludedSETypes.add(tag.asText())));
 
+        if (includedSETypes.isEmpty()) {
+            // If no schema evolution types are specified, include all schema 
evolution types by
+            // default.
+            Arrays.stream(SchemaChangeEventTypeFamily.ALL)
+                    .map(SchemaChangeEventType::getTag)
+                    .forEach(includedSETypes::add);
+        }
+
         Set<SchemaChangeEventType> declaredSETypes =
                 resolveSchemaEvolutionOptions(includedSETypes, 
excludedSETypes);
 
diff --git 
a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
 
b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
index f57dd62c7..2ecf45870 100644
--- 
a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
+++ 
b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.cdc.composer.definition.TransformDef;
 import org.apache.flink.cdc.composer.definition.UdfDef;
 
 import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableSet;
 import org.apache.flink.shaded.guava31.com.google.common.io.Resources;
 
 import org.junit.jupiter.api.Test;
@@ -37,6 +38,11 @@ import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 
+import static 
org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN;
+import static 
org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_TYPE;
+import static 
org.apache.flink.cdc.common.event.SchemaChangeEventType.CREATE_TABLE;
+import static 
org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_COLUMN;
+import static 
org.apache.flink.cdc.common.event.SchemaChangeEventType.RENAME_COLUMN;
 import static 
org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_LOCAL_TIME_ZONE;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
@@ -384,7 +390,13 @@ class YamlPipelineDefinitionParserTest {
                             Configuration.fromMap(
                                     ImmutableMap.<String, String>builder()
                                             .put("bootstrap-servers", 
"localhost:9092")
-                                            .build())),
+                                            .build()),
+                            ImmutableSet.of(
+                                    DROP_COLUMN,
+                                    ALTER_COLUMN_TYPE,
+                                    ADD_COLUMN,
+                                    CREATE_TABLE,
+                                    RENAME_COLUMN)),
                     Collections.singletonList(
                             new RouteDef(
                                     "mydb.default.app_order_.*",
@@ -401,7 +413,16 @@ class YamlPipelineDefinitionParserTest {
     private final PipelineDef minimizedDef =
             new PipelineDef(
                     new SourceDef("mysql", null, new Configuration()),
-                    new SinkDef("kafka", null, new Configuration()),
+                    new SinkDef(
+                            "kafka",
+                            null,
+                            new Configuration(),
+                            ImmutableSet.of(
+                                    DROP_COLUMN,
+                                    ALTER_COLUMN_TYPE,
+                                    ADD_COLUMN,
+                                    CREATE_TABLE,
+                                    RENAME_COLUMN)),
                     Collections.emptyList(),
                     Collections.emptyList(),
                     Collections.emptyList(),
@@ -474,7 +495,16 @@ class YamlPipelineDefinitionParserTest {
     private final PipelineDef pipelineDefWithUdf =
             new PipelineDef(
                     new SourceDef("values", null, new Configuration()),
-                    new SinkDef("values", null, new Configuration()),
+                    new SinkDef(
+                            "values",
+                            null,
+                            new Configuration(),
+                            ImmutableSet.of(
+                                    DROP_COLUMN,
+                                    ALTER_COLUMN_TYPE,
+                                    ADD_COLUMN,
+                                    CREATE_TABLE,
+                                    RENAME_COLUMN)),
                     Collections.emptyList(),
                     Collections.singletonList(
                             new TransformDef(
diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventType.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventType.java
index 668ea76a4..3e03ec4cf 100644
--- 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventType.java
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventType.java
@@ -22,11 +22,21 @@ import 
org.apache.flink.cdc.common.annotation.PublicEvolving;
 /** An enumeration of schema change event types for {@link SchemaChangeEvent}. 
*/
 @PublicEvolving
 public enum SchemaChangeEventType {
-    ADD_COLUMN,
-    ALTER_COLUMN_TYPE,
-    CREATE_TABLE,
-    DROP_COLUMN,
-    RENAME_COLUMN;
+    ADD_COLUMN("add.column"),
+    ALTER_COLUMN_TYPE("alter.column.type"),
+    CREATE_TABLE("create.table"),
+    DROP_COLUMN("drop.column"),
+    RENAME_COLUMN("rename.column");
+
+    private final String tag;
+
+    SchemaChangeEventType(String tag) {
+        this.tag = tag;
+    }
+
+    public String getTag() {
+        return tag;
+    }
 
     public static SchemaChangeEventType ofEvent(SchemaChangeEvent event) {
         if (event instanceof AddColumnEvent) {
diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java
index 48a4fbb13..343e99270 100644
--- 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java
@@ -48,7 +48,7 @@ public class PipelineOptions {
     public static final ConfigOption<SchemaChangeBehavior> 
PIPELINE_SCHEMA_CHANGE_BEHAVIOR =
             ConfigOptions.key("schema.change.behavior")
                     .enumType(SchemaChangeBehavior.class)
-                    .defaultValue(SchemaChangeBehavior.EVOLVE)
+                    .defaultValue(SchemaChangeBehavior.LENIENT)
                     .withDescription(
                             Description.builder()
                                     .text("Behavior for handling schema change 
events. ")
diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/ChangeEventUtils.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/ChangeEventUtils.java
index ee72ef6e2..e2c289e35 100644
--- 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/ChangeEventUtils.java
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/ChangeEventUtils.java
@@ -95,12 +95,8 @@ public class ChangeEventUtils {
             List<String> includedSchemaEvolutionTypes, List<String> 
excludedSchemaEvolutionTypes) {
         List<SchemaChangeEventType> resultTypes = new ArrayList<>();
 
-        if (includedSchemaEvolutionTypes.isEmpty()) {
-            resultTypes.addAll(Arrays.asList(SchemaChangeEventTypeFamily.ALL));
-        } else {
-            for (String includeTag : includedSchemaEvolutionTypes) {
-                resultTypes.addAll(resolveSchemaEvolutionTag(includeTag));
-            }
+        for (String includeTag : includedSchemaEvolutionTypes) {
+            resultTypes.addAll(resolveSchemaEvolutionTag(includeTag));
         }
 
         for (String excludeTag : excludedSchemaEvolutionTypes) {
diff --git 
a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/ChangeEventUtilsTest.java
 
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/ChangeEventUtilsTest.java
index 0a34a7501..e159fb1d1 100644
--- 
a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/ChangeEventUtilsTest.java
+++ 
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/ChangeEventUtilsTest.java
@@ -17,59 +17,67 @@
 
 package org.apache.flink.cdc.common.utils;
 
-import org.assertj.core.api.Assertions;
+import org.apache.flink.cdc.common.event.SchemaChangeEventType;
+import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
+
 import org.assertj.core.util.Sets;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
 
 import static 
org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN;
 import static 
org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_TYPE;
 import static 
org.apache.flink.cdc.common.event.SchemaChangeEventType.CREATE_TABLE;
 import static 
org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_COLUMN;
 import static 
org.apache.flink.cdc.common.event.SchemaChangeEventType.RENAME_COLUMN;
+import static 
org.apache.flink.cdc.common.testutils.assertions.EventAssertions.assertThat;
 
 /** A test for the {@link org.apache.flink.cdc.common.utils.ChangeEventUtils}. 
*/
 public class ChangeEventUtilsTest {
     @Test
     public void testResolveSchemaEvolutionOptions() {
-        Assertions.assertThat(
-                        ChangeEventUtils.resolveSchemaEvolutionOptions(
-                                Collections.emptyList(), 
Collections.emptyList()))
+
+        List<String> allTags =
+                Arrays.stream(SchemaChangeEventTypeFamily.ALL)
+                        .map(SchemaChangeEventType::getTag)
+                        .collect(Collectors.toList());
+        assertThat(ChangeEventUtils.resolveSchemaEvolutionOptions(allTags, 
Collections.emptyList()))
                 .isEqualTo(
                         Sets.set(
+                                RENAME_COLUMN,
                                 CREATE_TABLE,
-                                ADD_COLUMN,
                                 ALTER_COLUMN_TYPE,
-                                DROP_COLUMN,
-                                RENAME_COLUMN));
+                                ADD_COLUMN,
+                                DROP_COLUMN));
 
-        Assertions.assertThat(
+        assertThat(
                         ChangeEventUtils.resolveSchemaEvolutionOptions(
-                                Collections.emptyList(), 
Collections.singletonList("drop")))
-                .isEqualTo(Sets.set(CREATE_TABLE, ADD_COLUMN, 
ALTER_COLUMN_TYPE, RENAME_COLUMN));
+                                allTags, Collections.singletonList("drop")))
+                .isEqualTo(Sets.set(ADD_COLUMN, ALTER_COLUMN_TYPE, 
RENAME_COLUMN, CREATE_TABLE));
 
-        Assertions.assertThat(
+        assertThat(
                         ChangeEventUtils.resolveSchemaEvolutionOptions(
                                 Arrays.asList("create", "add"), 
Collections.emptyList()))
-                .isEqualTo(Sets.set(CREATE_TABLE, ADD_COLUMN));
+                .isEqualTo(Sets.set(ADD_COLUMN, CREATE_TABLE));
 
-        Assertions.assertThat(
+        assertThat(
                         ChangeEventUtils.resolveSchemaEvolutionOptions(
                                 Collections.singletonList("column"),
                                 Collections.singletonList("drop.column")))
                 .isEqualTo(Sets.set(ADD_COLUMN, ALTER_COLUMN_TYPE, 
RENAME_COLUMN));
 
-        Assertions.assertThat(
+        assertThat(
                         ChangeEventUtils.resolveSchemaEvolutionOptions(
-                                Collections.emptyList(), 
Collections.singletonList("drop.column")))
-                .isEqualTo(Sets.set(CREATE_TABLE, ADD_COLUMN, 
ALTER_COLUMN_TYPE, RENAME_COLUMN));
+                                allTags, 
Collections.singletonList("drop.column")))
+                .isEqualTo(Sets.set(ADD_COLUMN, RENAME_COLUMN, 
ALTER_COLUMN_TYPE, CREATE_TABLE));
     }
 
     @Test
     public void testResolveSchemaEvolutionTag() {
-        
Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("all"))
+        assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("all"))
                 .isEqualTo(
                         Arrays.asList(
                                 ADD_COLUMN,
@@ -78,41 +86,38 @@ public class ChangeEventUtilsTest {
                                 DROP_COLUMN,
                                 RENAME_COLUMN));
 
-        
Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("column"))
+        assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("column"))
                 .isEqualTo(
                         Arrays.asList(ADD_COLUMN, ALTER_COLUMN_TYPE, 
DROP_COLUMN, RENAME_COLUMN));
 
-        
Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("table"))
+        assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("table"))
                 .isEqualTo(Collections.singletonList(CREATE_TABLE));
 
-        
Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("rename"))
-                .isEqualTo(Collections.singletonList(RENAME_COLUMN));
-
-        
Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("rename.column"))
+        assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("rename.column"))
                 .isEqualTo(Collections.singletonList(RENAME_COLUMN));
 
-        
Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("drop"))
+        assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("drop"))
                 .isEqualTo(Collections.singletonList(DROP_COLUMN));
 
-        
Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("drop.column"))
+        assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("drop.column"))
                 .isEqualTo(Collections.singletonList(DROP_COLUMN));
 
-        
Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("create"))
+        assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("create"))
                 .isEqualTo(Collections.singletonList(CREATE_TABLE));
 
-        
Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("create.table"))
+        assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("create.table"))
                 .isEqualTo(Collections.singletonList(CREATE_TABLE));
 
-        
Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("alter"))
+        assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("alter"))
                 .isEqualTo(Collections.singletonList(ALTER_COLUMN_TYPE));
 
-        
Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("alter.column.type"))
+        
assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("alter.column.type"))
                 .isEqualTo(Collections.singletonList(ALTER_COLUMN_TYPE));
 
-        
Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("add"))
+        assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("add"))
                 .isEqualTo(Collections.singletonList(ADD_COLUMN));
 
-        
Assertions.assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("add.column"))
+        assertThat(ChangeEventUtils.resolveSchemaEvolutionTag("add.column"))
                 .isEqualTo(Collections.singletonList(ADD_COLUMN));
     }
 }
diff --git 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
index 8e471d886..4e16d2380 100644
--- 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
+++ 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.cdc.common.event.Event;
 import org.apache.flink.cdc.common.event.RenameColumnEvent;
 import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.pipeline.PipelineOptions;
+import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
 import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.types.DataType;
@@ -134,6 +135,8 @@ class FlinkPipelineComposerITCase {
         // Setup pipeline
         Configuration pipelineConfig = new Configuration();
         pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        pipelineConfig.set(
+                PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
                         sourceDef,
@@ -191,6 +194,8 @@ class FlinkPipelineComposerITCase {
         // Setup pipeline
         Configuration pipelineConfig = new Configuration();
         pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        pipelineConfig.set(
+                PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
                         sourceDef,
@@ -313,6 +318,8 @@ class FlinkPipelineComposerITCase {
         // Setup pipeline
         Configuration pipelineConfig = new Configuration();
         pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        pipelineConfig.set(
+                PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
                         sourceDef,
@@ -373,6 +380,8 @@ class FlinkPipelineComposerITCase {
         // Setup pipeline
         Configuration pipelineConfig = new Configuration();
         pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        pipelineConfig.set(
+                PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
                         sourceDef,
@@ -441,6 +450,8 @@ class FlinkPipelineComposerITCase {
         // Setup pipeline
         Configuration pipelineConfig = new Configuration();
         pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        pipelineConfig.set(
+                PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
                         sourceDef,
@@ -496,6 +507,8 @@ class FlinkPipelineComposerITCase {
         // Setup pipeline
         Configuration pipelineConfig = new Configuration();
         pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        pipelineConfig.set(
+                PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
                         sourceDef,
@@ -569,6 +582,8 @@ class FlinkPipelineComposerITCase {
         // Setup pipeline
         Configuration pipelineConfig = new Configuration();
         pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        pipelineConfig.set(
+                PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
                         sourceDef,
@@ -766,6 +781,8 @@ class FlinkPipelineComposerITCase {
         // Setup pipeline
         Configuration pipelineConfig = new Configuration();
         pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        pipelineConfig.set(
+                PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
                         sourceDef,
@@ -972,6 +989,8 @@ class FlinkPipelineComposerITCase {
         // Setup pipeline
         Configuration pipelineConfig = new Configuration();
         pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        pipelineConfig.set(
+                PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
                         sourceDef,
@@ -1035,6 +1054,8 @@ class FlinkPipelineComposerITCase {
         // Setup pipeline
         Configuration pipelineConfig = new Configuration();
         pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        pipelineConfig.set(
+                PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
                         sourceDef,
diff --git 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
index 322a3358e..54ae2c225 100644
--- 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
+++ 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.cdc.common.event.Event;
 import org.apache.flink.cdc.common.event.RenameColumnEvent;
 import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.pipeline.PipelineOptions;
+import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
 import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.types.DataType;
@@ -213,6 +214,8 @@ class FlinkPipelineTransformITCase {
         // Setup pipeline
         Configuration pipelineConfig = new Configuration();
         pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        pipelineConfig.set(
+                PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
         pipelineConfig.set(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE, 
"America/Los_Angeles");
         PipelineDef pipelineDef =
                 new PipelineDef(
@@ -266,6 +269,8 @@ class FlinkPipelineTransformITCase {
         // Setup pipeline
         Configuration pipelineConfig = new Configuration();
         pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        pipelineConfig.set(
+                PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
                         sourceDef,
@@ -350,6 +355,8 @@ class FlinkPipelineTransformITCase {
         // Setup pipeline
         Configuration pipelineConfig = new Configuration();
         pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        pipelineConfig.set(
+                PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
                         sourceDef,
@@ -436,6 +443,8 @@ class FlinkPipelineTransformITCase {
         // Setup pipeline
         Configuration pipelineConfig = new Configuration();
         pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        pipelineConfig.set(
+                PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
                         sourceDef,
@@ -516,6 +525,8 @@ class FlinkPipelineTransformITCase {
         // Setup pipeline
         Configuration pipelineConfig = new Configuration();
         pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        pipelineConfig.set(
+                PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
                         sourceDef,
@@ -608,6 +619,8 @@ class FlinkPipelineTransformITCase {
         // Setup pipeline
         Configuration pipelineConfig = new Configuration();
         pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        pipelineConfig.set(
+                PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
                         sourceDef,
diff --git 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java
 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java
index f9be7d7dd..c3b412dc7 100644
--- 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java
+++ 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java
@@ -19,6 +19,7 @@ package org.apache.flink.cdc.composer.flink;
 
 import org.apache.flink.cdc.common.configuration.Configuration;
 import org.apache.flink.cdc.common.pipeline.PipelineOptions;
+import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
 import org.apache.flink.cdc.composer.PipelineExecution;
 import org.apache.flink.cdc.composer.definition.PipelineDef;
 import org.apache.flink.cdc.composer.definition.SinkDef;
@@ -139,6 +140,8 @@ public class FlinkPipelineUdfITCase {
         // Setup pipeline
         Configuration pipelineConfig = new Configuration();
         pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        pipelineConfig.set(
+                PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
                         sourceDef,
@@ -207,6 +210,8 @@ public class FlinkPipelineUdfITCase {
         // Setup pipeline
         Configuration pipelineConfig = new Configuration();
         pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        pipelineConfig.set(
+                PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
                         sourceDef,
@@ -273,6 +278,8 @@ public class FlinkPipelineUdfITCase {
         // Setup pipeline
         Configuration pipelineConfig = new Configuration();
         pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        pipelineConfig.set(
+                PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
                         sourceDef,
@@ -341,6 +348,8 @@ public class FlinkPipelineUdfITCase {
         // Setup pipeline
         Configuration pipelineConfig = new Configuration();
         pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        pipelineConfig.set(
+                PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
                         sourceDef,
@@ -411,6 +420,8 @@ public class FlinkPipelineUdfITCase {
         // Setup pipeline
         Configuration pipelineConfig = new Configuration();
         pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        pipelineConfig.set(
+                PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
                         sourceDef,
@@ -472,6 +483,8 @@ public class FlinkPipelineUdfITCase {
         // Setup pipeline
         Configuration pipelineConfig = new Configuration();
         pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        pipelineConfig.set(
+                PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
                         sourceDef,
@@ -565,6 +578,8 @@ public class FlinkPipelineUdfITCase {
         // Setup pipeline
         Configuration pipelineConfig = new Configuration();
         pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        pipelineConfig.set(
+                PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
                         sourceDef,
@@ -632,6 +647,8 @@ public class FlinkPipelineUdfITCase {
         // Setup pipeline
         Configuration pipelineConfig = new Configuration();
         pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        pipelineConfig.set(
+                PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
                         sourceDef,
@@ -697,6 +714,8 @@ public class FlinkPipelineUdfITCase {
         // Setup pipeline
         Configuration pipelineConfig = new Configuration();
         pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        pipelineConfig.set(
+                PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
                         sourceDef,
@@ -758,6 +777,8 @@ public class FlinkPipelineUdfITCase {
         // Setup pipeline
         Configuration pipelineConfig = new Configuration();
         pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        pipelineConfig.set(
+                PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
         PipelineDef pipelineDef =
                 new PipelineDef(
                         sourceDef,
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
index c16f6de9c..5cb22d089 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
@@ -210,7 +210,8 @@ public class MysqlE2eITCase extends PipelineTestEnvironment 
{
                                 + "  type: values\n"
                                 + "\n"
                                 + "pipeline:\n"
-                                + "  parallelism: %d",
+                                + "  parallelism: %d\n"
+                                + "  schema.change.behavior: evolve",
                         INTER_CONTAINER_MYSQL_ALIAS,
                         MYSQL_TEST_USER,
                         MYSQL_TEST_PASSWORD,
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java
index a4a21c992..8b306d1e7 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java
@@ -228,11 +228,12 @@ public class SchemaEvolvingTransformE2eITCase extends 
PipelineTestEnvironment {
                                 + "\n"
                                 + "pipeline:\n"
                                 + "  schema.change.behavior: unexpected\n"
-                                + "  parallelism: 1",
+                                + "  parallelism: %d",
                         INTER_CONTAINER_MYSQL_ALIAS,
                         MYSQL_TEST_USER,
                         MYSQL_TEST_PASSWORD,
-                        schemaEvolveDatabase.getDatabaseName());
+                        schemaEvolveDatabase.getDatabaseName(),
+                        parallelism);
         Path mysqlCdcJar = 
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
         Path valuesCdcJar = 
TestUtils.getResource("values-cdc-pipeline-connector.jar");
         Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@@ -305,7 +306,7 @@ public class SchemaEvolvingTransformE2eITCase extends 
PipelineTestEnvironment {
                                 + "\n"
                                 + "pipeline:\n"
                                 + "  schema.change.behavior: %s\n"
-                                + "  parallelism: 1",
+                                + "  parallelism: %d",
                         INTER_CONTAINER_MYSQL_ALIAS,
                         MYSQL_TEST_USER,
                         MYSQL_TEST_PASSWORD,
@@ -313,7 +314,8 @@ public class SchemaEvolvingTransformE2eITCase extends 
PipelineTestEnvironment {
                         mergeTable ? "(members|new_members)" : "members",
                         dbName,
                         dbName,
-                        behavior);
+                        behavior,
+                        parallelism);
         Path mysqlCdcJar = 
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
         Path valuesCdcJar = 
TestUtils.getResource("values-cdc-pipeline-connector.jar");
         Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
index c1c8e8109..0a3b0aace 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
@@ -738,12 +738,14 @@ public class TransformE2eITCase extends 
PipelineTestEnvironment {
                                 + "    projection: ID, 'id -> ' || ID AS UID, 
PRICEALPHA AS PRICE\n"
                                 + "    filter: ID > 1008\n"
                                 + "pipeline:\n"
-                                + "  parallelism: 1",
+                                + "  parallelism: %d\n"
+                                + "  schema.change.behavior: evolve",
                         INTER_CONTAINER_MYSQL_ALIAS,
                         MYSQL_TEST_USER,
                         MYSQL_TEST_PASSWORD,
                         transformTestDatabase.getDatabaseName(),
-                        transformTestDatabase.getDatabaseName());
+                        transformTestDatabase.getDatabaseName(),
+                        parallelism);
         Path mysqlCdcJar = 
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
         Path valuesCdcJar = 
TestUtils.getResource("values-cdc-pipeline-connector.jar");
         Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@@ -835,12 +837,14 @@ public class TransformE2eITCase extends 
PipelineTestEnvironment {
                                 + "    projection: \\*, 'id -> ' || ID AS 
UID\n"
                                 + "    filter: ID > 1008\n"
                                 + "pipeline:\n"
-                                + "  parallelism: 1",
+                                + "  parallelism: %d\n"
+                                + "  schema.change.behavior: evolve",
                         INTER_CONTAINER_MYSQL_ALIAS,
                         MYSQL_TEST_USER,
                         MYSQL_TEST_PASSWORD,
                         transformTestDatabase.getDatabaseName(),
-                        transformTestDatabase.getDatabaseName());
+                        transformTestDatabase.getDatabaseName(),
+                        parallelism);
         Path mysqlCdcJar = 
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
         Path valuesCdcJar = 
TestUtils.getResource("values-cdc-pipeline-connector.jar");
         Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@@ -940,12 +944,14 @@ public class TransformE2eITCase extends 
PipelineTestEnvironment {
                                 + "    projection: ID || ' <- id' AS UID, *\n"
                                 + "    filter: ID > 1008\n"
                                 + "pipeline:\n"
-                                + "  parallelism: 1",
+                                + "  parallelism: %d\n"
+                                + "  schema.change.behavior: evolve",
                         INTER_CONTAINER_MYSQL_ALIAS,
                         MYSQL_TEST_USER,
                         MYSQL_TEST_PASSWORD,
                         transformTestDatabase.getDatabaseName(),
-                        transformTestDatabase.getDatabaseName());
+                        transformTestDatabase.getDatabaseName(),
+                        parallelism);
         Path mysqlCdcJar = 
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
         Path valuesCdcJar = 
TestUtils.getResource("values-cdc-pipeline-connector.jar");
         Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java
index 6d66fa878..1171ad07b 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.cdc.common.function.HashFunctionProvider;
 import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
 import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
+import org.apache.flink.cdc.runtime.serializer.event.EventSerializer;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@@ -108,7 +109,10 @@ public class PrePartitionOperator extends 
AbstractStreamOperator<PartitioningEve
 
     private void broadcastEvent(Event toBroadcast) {
         for (int i = 0; i < downstreamParallelism; i++) {
-            output.collect(new StreamRecord<>(new 
PartitioningEvent(toBroadcast, i)));
+            // Deep-copying each event is required since downstream subTasks 
might run in the same
+            // JVM
+            Event copiedEvent = EventSerializer.INSTANCE.copy(toBroadcast);
+            output.collect(new StreamRecord<>(new 
PartitioningEvent(copiedEvent, i)));
         }
     }
 


Reply via email to