This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 2e938a92f [FLINK-36128][cdc-runtime] Fix potential unrecoverable
in-flight data exception by promoting LENIENT as the default schema change
behavior
2e938a92f is described below
commit 2e938a92f5335515f7eb46077254aee25acd9107
Author: yuxiqian <[email protected]>
AuthorDate: Mon Aug 26 17:46:37 2024 +0800
[FLINK-36128][cdc-runtime] Fix potential unrecoverable in-flight data
exception by promoting LENIENT as the default schema change behavior
This closes #3574.
---
.../cli/parser/YamlPipelineDefinitionParser.java | 48 +++++++++++++++++-----
.../parser/YamlPipelineDefinitionParserTest.java | 36 ++++++++++++++--
.../cdc/common/event/SchemaChangeEventType.java | 24 +++++++----
.../flink/cdc/common/pipeline/PipelineOptions.java | 2 +-
.../flink/cdc/common/utils/ChangeEventUtils.java | 8 +---
.../cdc/common/utils/ChangeEventUtilsTest.java | 18 +++++---
.../flink/FlinkPipelineComposerITCase.java | 21 ++++++++++
.../flink/FlinkPipelineTransformITCase.java | 13 ++++++
.../cdc/composer/flink/FlinkPipelineUdfITCase.java | 21 ++++++++++
.../flink/cdc/pipeline/tests/MysqlE2eITCase.java | 3 +-
.../cdc/pipeline/tests/SchemaEvolveE2eITCase.java | 1 -
.../tests/SchemaEvolvingTransformE2eITCase.java | 11 ++---
.../cdc/pipeline/tests/TransformE2eITCase.java | 18 +++++---
.../coordinator/SchemaRegistryRequestHandler.java | 4 --
.../runtime/partitioning/PrePartitionOperator.java | 6 ++-
15 files changed, 184 insertions(+), 50 deletions(-)
diff --git
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
index d5df8eda1..8179fdbff 100644
---
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
+++
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
@@ -19,6 +19,8 @@ package org.apache.flink.cdc.cli.parser;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
+import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
+import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.utils.StringUtils;
import org.apache.flink.cdc.composer.definition.PipelineDef;
import org.apache.flink.cdc.composer.definition.RouteDef;
@@ -35,11 +37,14 @@ import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.yaml.YA
import java.nio.file.Path;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.stream.Stream;
+import static
org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR;
import static
org.apache.flink.cdc.common.utils.ChangeEventUtils.resolveSchemaEvolutionOptions;
import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull;
@@ -99,6 +104,19 @@ public class YamlPipelineDefinitionParser implements
PipelineDefinitionParser {
private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration
globalPipelineConfig)
throws Exception {
+
+ // UDFs are optional. We parse UDF first and remove it from the
pipelineDefJsonNode since
+ // it's not of plain data types and must be removed before calling
toPipelineConfig.
+ List<UdfDef> udfDefs = new ArrayList<>();
+ Optional.ofNullable(((ObjectNode)
pipelineDefJsonNode.get(PIPELINE_KEY)).remove(UDF_KEY))
+ .ifPresent(node -> node.forEach(udf ->
udfDefs.add(toUdfDef(udf))));
+
+ // Pipeline configs are optional
+ Configuration userPipelineConfig =
toPipelineConfig(pipelineDefJsonNode.get(PIPELINE_KEY));
+
+ SchemaChangeBehavior schemaChangeBehavior =
+ userPipelineConfig.get(PIPELINE_SCHEMA_CHANGE_BEHAVIOR);
+
// Source is required
SourceDef sourceDef =
toSourceDef(
@@ -113,7 +131,8 @@ public class YamlPipelineDefinitionParser implements
PipelineDefinitionParser {
checkNotNull(
pipelineDefJsonNode.get(SINK_KEY),
"Missing required field \"%s\" in pipeline
definition",
- SINK_KEY));
+ SINK_KEY),
+ schemaChangeBehavior);
// Transforms are optional
List<TransformDef> transformDefs = new ArrayList<>();
@@ -128,14 +147,6 @@ public class YamlPipelineDefinitionParser implements
PipelineDefinitionParser {
Optional.ofNullable(pipelineDefJsonNode.get(ROUTE_KEY))
.ifPresent(node -> node.forEach(route ->
routeDefs.add(toRouteDef(route))));
- // UDFs are optional
- List<UdfDef> udfDefs = new ArrayList<>();
- Optional.ofNullable(((ObjectNode)
pipelineDefJsonNode.get(PIPELINE_KEY)).remove(UDF_KEY))
- .ifPresent(node -> node.forEach(udf ->
udfDefs.add(toUdfDef(udf))));
-
- // Pipeline configs are optional
- Configuration userPipelineConfig =
toPipelineConfig(pipelineDefJsonNode.get(PIPELINE_KEY));
-
// Merge user config into global config
Configuration pipelineConfig = new Configuration();
pipelineConfig.addAll(globalPipelineConfig);
@@ -162,7 +173,7 @@ public class YamlPipelineDefinitionParser implements
PipelineDefinitionParser {
return new SourceDef(type, name, Configuration.fromMap(sourceMap));
}
- private SinkDef toSinkDef(JsonNode sinkNode) {
+ private SinkDef toSinkDef(JsonNode sinkNode, SchemaChangeBehavior
schemaChangeBehavior) {
List<String> includedSETypes = new ArrayList<>();
List<String> excludedSETypes = new ArrayList<>();
@@ -172,6 +183,23 @@ public class YamlPipelineDefinitionParser implements
PipelineDefinitionParser {
Optional.ofNullable(sinkNode.get(EXCLUDE_SCHEMA_EVOLUTION_TYPES))
.ifPresent(e -> e.forEach(tag ->
excludedSETypes.add(tag.asText())));
+ if (includedSETypes.isEmpty()) {
+ // If no schema evolution types are specified, include all schema
evolution types by
+ // default.
+ Arrays.stream(SchemaChangeEventTypeFamily.ALL)
+ .map(SchemaChangeEventType::getTag)
+ .forEach(includedSETypes::add);
+ }
+
+ if (excludedSETypes.isEmpty()
+ && SchemaChangeBehavior.LENIENT.equals(schemaChangeBehavior)) {
+ // In lenient mode, we exclude DROP_TABLE and TRUNCATE_TABLE by
default. This could be
+ // overridden by manually specifying excluded types.
+ Stream.of(SchemaChangeEventType.DROP_TABLE,
SchemaChangeEventType.TRUNCATE_TABLE)
+ .map(SchemaChangeEventType::getTag)
+ .forEach(excludedSETypes::add);
+ }
+
Set<SchemaChangeEventType> declaredSETypes =
resolveSchemaEvolutionOptions(includedSETypes,
excludedSETypes);
diff --git
a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
index f57dd62c7..2ecf45870 100644
---
a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
+++
b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.cdc.composer.definition.TransformDef;
import org.apache.flink.cdc.composer.definition.UdfDef;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableSet;
import org.apache.flink.shaded.guava31.com.google.common.io.Resources;
import org.junit.jupiter.api.Test;
@@ -37,6 +38,11 @@ import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
+import static
org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN;
+import static
org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_TYPE;
+import static
org.apache.flink.cdc.common.event.SchemaChangeEventType.CREATE_TABLE;
+import static
org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_COLUMN;
+import static
org.apache.flink.cdc.common.event.SchemaChangeEventType.RENAME_COLUMN;
import static
org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_LOCAL_TIME_ZONE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
@@ -384,7 +390,13 @@ class YamlPipelineDefinitionParserTest {
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("bootstrap-servers",
"localhost:9092")
- .build())),
+ .build()),
+ ImmutableSet.of(
+ DROP_COLUMN,
+ ALTER_COLUMN_TYPE,
+ ADD_COLUMN,
+ CREATE_TABLE,
+ RENAME_COLUMN)),
Collections.singletonList(
new RouteDef(
"mydb.default.app_order_.*",
@@ -401,7 +413,16 @@ class YamlPipelineDefinitionParserTest {
private final PipelineDef minimizedDef =
new PipelineDef(
new SourceDef("mysql", null, new Configuration()),
- new SinkDef("kafka", null, new Configuration()),
+ new SinkDef(
+ "kafka",
+ null,
+ new Configuration(),
+ ImmutableSet.of(
+ DROP_COLUMN,
+ ALTER_COLUMN_TYPE,
+ ADD_COLUMN,
+ CREATE_TABLE,
+ RENAME_COLUMN)),
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
@@ -474,7 +495,16 @@ class YamlPipelineDefinitionParserTest {
private final PipelineDef pipelineDefWithUdf =
new PipelineDef(
new SourceDef("values", null, new Configuration()),
- new SinkDef("values", null, new Configuration()),
+ new SinkDef(
+ "values",
+ null,
+ new Configuration(),
+ ImmutableSet.of(
+ DROP_COLUMN,
+ ALTER_COLUMN_TYPE,
+ ADD_COLUMN,
+ CREATE_TABLE,
+ RENAME_COLUMN)),
Collections.emptyList(),
Collections.singletonList(
new TransformDef(
diff --git
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventType.java
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventType.java
index 8132c29a3..bbe4b415c 100644
---
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventType.java
+++
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEventType.java
@@ -22,13 +22,23 @@ import
org.apache.flink.cdc.common.annotation.PublicEvolving;
/** An enumeration of schema change event types for {@link SchemaChangeEvent}.
*/
@PublicEvolving
public enum SchemaChangeEventType {
- ADD_COLUMN,
- ALTER_COLUMN_TYPE,
- CREATE_TABLE,
- DROP_COLUMN,
- DROP_TABLE,
- RENAME_COLUMN,
- TRUNCATE_TABLE;
+ ADD_COLUMN("add.column"),
+ ALTER_COLUMN_TYPE("alter.column.type"),
+ CREATE_TABLE("create.table"),
+ DROP_COLUMN("drop.column"),
+ DROP_TABLE("drop.table"),
+ RENAME_COLUMN("rename.column"),
+ TRUNCATE_TABLE("truncate.table");
+
+ private final String tag;
+
+ SchemaChangeEventType(String tag) {
+ this.tag = tag;
+ }
+
+ public String getTag() {
+ return tag;
+ }
public static SchemaChangeEventType ofEvent(SchemaChangeEvent event) {
if (event instanceof AddColumnEvent) {
diff --git
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java
index 48a4fbb13..343e99270 100644
---
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java
+++
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java
@@ -48,7 +48,7 @@ public class PipelineOptions {
public static final ConfigOption<SchemaChangeBehavior>
PIPELINE_SCHEMA_CHANGE_BEHAVIOR =
ConfigOptions.key("schema.change.behavior")
.enumType(SchemaChangeBehavior.class)
- .defaultValue(SchemaChangeBehavior.EVOLVE)
+ .defaultValue(SchemaChangeBehavior.LENIENT)
.withDescription(
Description.builder()
.text("Behavior for handling schema change
events. ")
diff --git
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/ChangeEventUtils.java
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/ChangeEventUtils.java
index 940dc3144..483752ce9 100644
---
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/ChangeEventUtils.java
+++
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/ChangeEventUtils.java
@@ -90,12 +90,8 @@ public class ChangeEventUtils {
List<String> includedSchemaEvolutionTypes, List<String>
excludedSchemaEvolutionTypes) {
List<SchemaChangeEventType> resultTypes = new ArrayList<>();
- if (includedSchemaEvolutionTypes.isEmpty()) {
- resultTypes.addAll(Arrays.asList(SchemaChangeEventTypeFamily.ALL));
- } else {
- for (String includeTag : includedSchemaEvolutionTypes) {
- resultTypes.addAll(resolveSchemaEvolutionTag(includeTag));
- }
+ for (String includeTag : includedSchemaEvolutionTypes) {
+ resultTypes.addAll(resolveSchemaEvolutionTag(includeTag));
}
for (String excludeTag : excludedSchemaEvolutionTypes) {
diff --git
a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/ChangeEventUtilsTest.java
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/ChangeEventUtilsTest.java
index fd3636191..5cdff9c52 100644
---
a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/ChangeEventUtilsTest.java
+++
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/ChangeEventUtilsTest.java
@@ -17,11 +17,16 @@
package org.apache.flink.cdc.common.utils;
+import org.apache.flink.cdc.common.event.SchemaChangeEventType;
+import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
+
import org.assertj.core.util.Sets;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
import static
org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN;
import static
org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_TYPE;
@@ -36,9 +41,12 @@ import static
org.apache.flink.cdc.common.testutils.assertions.EventAssertions.a
public class ChangeEventUtilsTest {
@Test
public void testResolveSchemaEvolutionOptions() {
- assertThat(
- ChangeEventUtils.resolveSchemaEvolutionOptions(
- Collections.emptyList(),
Collections.emptyList()))
+
+ List<String> allTags =
+ Arrays.stream(SchemaChangeEventTypeFamily.ALL)
+ .map(SchemaChangeEventType::getTag)
+ .collect(Collectors.toList());
+ assertThat(ChangeEventUtils.resolveSchemaEvolutionOptions(allTags,
Collections.emptyList()))
.isEqualTo(
Sets.set(
TRUNCATE_TABLE,
@@ -51,7 +59,7 @@ public class ChangeEventUtilsTest {
assertThat(
ChangeEventUtils.resolveSchemaEvolutionOptions(
- Collections.emptyList(),
Collections.singletonList("drop")))
+ allTags, Collections.singletonList("drop")))
.isEqualTo(
Sets.set(
ADD_COLUMN,
@@ -73,7 +81,7 @@ public class ChangeEventUtilsTest {
assertThat(
ChangeEventUtils.resolveSchemaEvolutionOptions(
- Collections.emptyList(),
Collections.singletonList("drop.column")))
+ allTags,
Collections.singletonList("drop.column")))
.isEqualTo(
Sets.set(
ADD_COLUMN,
diff --git
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
index 5c2d48fe5..81d466aab 100644
---
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
+++
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
+import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;
@@ -134,6 +135,8 @@ class FlinkPipelineComposerITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ pipelineConfig.set(
+ PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR,
SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@@ -191,6 +194,8 @@ class FlinkPipelineComposerITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ pipelineConfig.set(
+ PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR,
SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@@ -313,6 +318,8 @@ class FlinkPipelineComposerITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ pipelineConfig.set(
+ PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR,
SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@@ -373,6 +380,8 @@ class FlinkPipelineComposerITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ pipelineConfig.set(
+ PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR,
SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@@ -441,6 +450,8 @@ class FlinkPipelineComposerITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ pipelineConfig.set(
+ PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR,
SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@@ -496,6 +507,8 @@ class FlinkPipelineComposerITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ pipelineConfig.set(
+ PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR,
SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@@ -569,6 +582,8 @@ class FlinkPipelineComposerITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ pipelineConfig.set(
+ PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR,
SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@@ -766,6 +781,8 @@ class FlinkPipelineComposerITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ pipelineConfig.set(
+ PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR,
SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@@ -972,6 +989,8 @@ class FlinkPipelineComposerITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ pipelineConfig.set(
+ PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR,
SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@@ -1035,6 +1054,8 @@ class FlinkPipelineComposerITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ pipelineConfig.set(
+ PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR,
SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
diff --git
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
index c489b2e81..4c2536011 100644
---
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
+++
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
+import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;
@@ -213,6 +214,8 @@ class FlinkPipelineTransformITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ pipelineConfig.set(
+ PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR,
SchemaChangeBehavior.EVOLVE);
pipelineConfig.set(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE,
"America/Los_Angeles");
PipelineDef pipelineDef =
new PipelineDef(
@@ -266,6 +269,8 @@ class FlinkPipelineTransformITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ pipelineConfig.set(
+ PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR,
SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@@ -350,6 +355,8 @@ class FlinkPipelineTransformITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ pipelineConfig.set(
+ PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR,
SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@@ -436,6 +443,8 @@ class FlinkPipelineTransformITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ pipelineConfig.set(
+ PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR,
SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@@ -516,6 +525,8 @@ class FlinkPipelineTransformITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ pipelineConfig.set(
+ PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR,
SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@@ -608,6 +619,8 @@ class FlinkPipelineTransformITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ pipelineConfig.set(
+ PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR,
SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
diff --git
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java
index f9be7d7dd..c3b412dc7 100644
---
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java
+++
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java
@@ -19,6 +19,7 @@ package org.apache.flink.cdc.composer.flink;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
+import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.composer.PipelineExecution;
import org.apache.flink.cdc.composer.definition.PipelineDef;
import org.apache.flink.cdc.composer.definition.SinkDef;
@@ -139,6 +140,8 @@ public class FlinkPipelineUdfITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ pipelineConfig.set(
+ PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR,
SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@@ -207,6 +210,8 @@ public class FlinkPipelineUdfITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ pipelineConfig.set(
+ PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR,
SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@@ -273,6 +278,8 @@ public class FlinkPipelineUdfITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ pipelineConfig.set(
+ PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR,
SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@@ -341,6 +348,8 @@ public class FlinkPipelineUdfITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ pipelineConfig.set(
+ PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR,
SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@@ -411,6 +420,8 @@ public class FlinkPipelineUdfITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ pipelineConfig.set(
+ PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR,
SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@@ -472,6 +483,8 @@ public class FlinkPipelineUdfITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ pipelineConfig.set(
+ PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR,
SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@@ -565,6 +578,8 @@ public class FlinkPipelineUdfITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ pipelineConfig.set(
+ PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR,
SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@@ -632,6 +647,8 @@ public class FlinkPipelineUdfITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ pipelineConfig.set(
+ PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR,
SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@@ -697,6 +714,8 @@ public class FlinkPipelineUdfITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ pipelineConfig.set(
+ PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR,
SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
@@ -758,6 +777,8 @@ public class FlinkPipelineUdfITCase {
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ pipelineConfig.set(
+ PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR,
SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
index 1f730be62..29fe5db9d 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
@@ -210,7 +210,8 @@ public class MysqlE2eITCase extends PipelineTestEnvironment
{
+ " type: values\n"
+ "\n"
+ "pipeline:\n"
- + " parallelism: %d",
+ + " parallelism: %d\n"
+ + " schema.change.behavior: evolve",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java
index 92c2622c9..7551add08 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java
@@ -190,7 +190,6 @@ public class SchemaEvolveE2eITCase extends
PipelineTestEnvironment {
"AddColumnEvent{tableId=%s.members,
addedColumns=[ColumnWithPosition{column=`precise_age` DOUBLE, position=LAST,
existedColumnName=null}]}",
"AddColumnEvent{tableId=%s.members,
addedColumns=[ColumnWithPosition{column=`biological_sex` TINYINT,
position=LAST, existedColumnName=null}]}",
"DataChangeEvent{tableId=%s.members, before=[],
after=[1013, Fiona, null, null, 16.0, null], op=INSERT, meta=()}",
- "TruncateTableEvent{tableId=%s.members}",
"DataChangeEvent{tableId=%s.members, before=[],
after=[1014, Gem, null, null, 17.0, null], op=INSERT, meta=()}"));
assertNotExists(
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java
index 1d1f79f0f..50d5dfb1d 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java
@@ -189,7 +189,6 @@ public class SchemaEvolvingTransformE2eITCase extends
PipelineTestEnvironment {
"AlterColumnTypeEvent{tableId=%s.members,
typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}",
"AddColumnEvent{tableId=%s.members,
addedColumns=[ColumnWithPosition{column=`biological_sex` TINYINT,
position=LAST, existedColumnName=null}]}",
"DataChangeEvent{tableId=%s.members, before=[],
after=[1013 -> Fiona, 1013, Fiona, 16.0, 1026169, age < 20, null, null],
op=INSERT, meta=()}",
- "TruncateTableEvent{tableId=%s.members}",
"DataChangeEvent{tableId=%s.members, before=[],
after=[1014 -> Gem, 1014, Gem, 17.0, 1028196, age < 20, null, null], op=INSERT,
meta=()}"));
}
@@ -233,11 +232,12 @@ public class SchemaEvolvingTransformE2eITCase extends
PipelineTestEnvironment {
+ "\n"
+ "pipeline:\n"
+ " schema.change.behavior: unexpected\n"
- + " parallelism: 1",
+ + " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
- schemaEvolveDatabase.getDatabaseName());
+ schemaEvolveDatabase.getDatabaseName(),
+ parallelism);
Path mysqlCdcJar =
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar =
TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@@ -310,7 +310,7 @@ public class SchemaEvolvingTransformE2eITCase extends
PipelineTestEnvironment {
+ "\n"
+ "pipeline:\n"
+ " schema.change.behavior: %s\n"
- + " parallelism: 1",
+ + " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
@@ -318,7 +318,8 @@ public class SchemaEvolvingTransformE2eITCase extends
PipelineTestEnvironment {
mergeTable ? "(members|new_members)" : "members",
dbName,
dbName,
- behavior);
+ behavior,
+ parallelism);
Path mysqlCdcJar =
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar =
TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
index 9c6130359..36e7d9a86 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
@@ -738,12 +738,14 @@ public class TransformE2eITCase extends
PipelineTestEnvironment {
+ " projection: ID, 'id -> ' || ID AS UID,
PRICEALPHA AS PRICE\n"
+ " filter: ID > 1008\n"
+ "pipeline:\n"
- + " parallelism: 1",
+ + " parallelism: %d\n"
+ + " schema.change.behavior: evolve",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
transformTestDatabase.getDatabaseName(),
- transformTestDatabase.getDatabaseName());
+ transformTestDatabase.getDatabaseName(),
+ parallelism);
Path mysqlCdcJar =
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar =
TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@@ -835,12 +837,14 @@ public class TransformE2eITCase extends
PipelineTestEnvironment {
+ " projection: \\*, 'id -> ' || ID AS
UID\n"
+ " filter: ID > 1008\n"
+ "pipeline:\n"
- + " parallelism: 1",
+ + " parallelism: %d\n"
+ + " schema.change.behavior: evolve",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
transformTestDatabase.getDatabaseName(),
- transformTestDatabase.getDatabaseName());
+ transformTestDatabase.getDatabaseName(),
+ parallelism);
Path mysqlCdcJar =
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar =
TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@@ -940,12 +944,14 @@ public class TransformE2eITCase extends
PipelineTestEnvironment {
+ " projection: ID || ' <- id' AS UID, *\n"
+ " filter: ID > 1008\n"
+ "pipeline:\n"
- + " parallelism: 1",
+ + " parallelism: %d\n"
+ + " schema.change.behavior: evolve",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
transformTestDatabase.getDatabaseName(),
- transformTestDatabase.getDatabaseName());
+ transformTestDatabase.getDatabaseName(),
+ parallelism);
Path mysqlCdcJar =
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar =
TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
index 444fb41d2..ae765bae2 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
@@ -403,10 +403,6 @@ public class SchemaRegistryRequestHandler implements
Closeable {
}
return events;
}
- case DROP_TABLE:
- // We don't drop any tables in Lenient mode.
- LOG.info("A drop table event {} has been ignored in Lenient
mode.", event);
- return Collections.emptyList();
default:
return Collections.singletonList(event);
}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java
index 6d66fa878..1171ad07b 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java
@@ -28,6 +28,7 @@ import
org.apache.flink.cdc.common.function.HashFunctionProvider;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
+import org.apache.flink.cdc.runtime.serializer.event.EventSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@@ -108,7 +109,10 @@ public class PrePartitionOperator extends
AbstractStreamOperator<PartitioningEve
private void broadcastEvent(Event toBroadcast) {
for (int i = 0; i < downstreamParallelism; i++) {
- output.collect(new StreamRecord<>(new
PartitioningEvent(toBroadcast, i)));
+ // Deep-copying each event is required since downstream subTasks
might run in the same
+ // JVM
+ Event copiedEvent = EventSerializer.INSTANCE.copy(toBroadcast);
+ output.collect(new StreamRecord<>(new
PartitioningEvent(copiedEvent, i)));
}
}