This is an automated email from the ASF dual-hosted git repository. kunni pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push: new d0fe46cb6 [FLINK-38045] Build createTableEventCache using TableSchemas from split and Make transform operator stateless (#4056) d0fe46cb6 is described below commit d0fe46cb6f9aa61fda227f1968a8f8f008ecca45 Author: proletarians <75650402+proletari...@users.noreply.github.com> AuthorDate: Fri Aug 15 10:58:37 2025 +0800 [FLINK-38045] Build createTableEventCache using TableSchemas from split and Make transform operator stateless (#4056) Co-authored-by: wuzexian <shanqing....@alibaba-inc.com> --- .../cdc/composer/flink/FlinkPipelineComposer.java | 10 +- .../flink/translator/TransformTranslator.java | 22 +--- .../source/reader/MySqlPipelineRecordEmitter.java | 38 +++++- .../cdc/connectors/mysql/utils/MySqlTypeUtils.java | 3 +- .../mysql/source/MySqlPipelineITCase.java | 128 ++++++++++++++++++++- .../cdc/connectors/values/ValuesDatabase.java | 11 +- .../cdc/connectors/values/sink/ValuesDataSink.java | 2 +- .../values/sink/ValuesDataSinkOptions.java | 2 +- .../event/DebeziumEventDeserializationSchema.java | 22 ++++ .../cdc/connectors/mysql/source/MySqlSource.java | 4 +- .../mysql/source/reader/MySqlRecordEmitter.java | 3 + .../mysql/source/reader/MySqlSourceReader.java | 12 +- .../mysql/source/reader/MySqlSourceReaderTest.java | 12 +- .../tests/migration/YamlJobMigrationITCase.java | 89 ++++++++++++++ .../operators/transform/PreTransformOperator.java | 94 ++------------- .../transform/PreTransformOperatorBuilder.java | 9 +- 16 files changed, 331 insertions(+), 130 deletions(-) diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java index f77fc4bbf..6de9045af 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java @@ -18,6 +18,7 @@ package org.apache.flink.cdc.composer.flink; import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.annotation.VisibleForTesting; import org.apache.flink.cdc.common.configuration.Configuration; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.pipeline.PipelineOptions; @@ -186,9 +187,7 @@ public class FlinkPipelineComposer implements PipelineComposer { pipelineDef.getTransforms(), pipelineDef.getUdfs(), pipelineDef.getModels(), - dataSource.supportedMetadataColumns(), - !isParallelMetadataSource && !isBatchMode, - operatorUidGenerator); + dataSource.supportedMetadataColumns()); // PreTransform ---> PostTransform stream = @@ -290,4 +289,9 @@ public class FlinkPipelineComposer implements PipelineComposer { } return Optional.of(container); } + + @VisibleForTesting + public StreamExecutionEnvironment getEnv() { + return env; + } } diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java index 33bff17ee..fb8da7cea 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java @@ -50,30 +50,21 @@ public class TransformTranslator { List<TransformDef> transforms, List<UdfDef> udfFunctions, List<ModelDef> models, - SupportedMetadataColumn[] supportedMetadataColumns, - boolean shouldStoreSchemasInState, - OperatorUidGenerator operatorUidGenerator) { + SupportedMetadataColumn[] supportedMetadataColumns) { if (transforms.isEmpty()) { return input; } return input.transform( - "Transform:Schema", - new EventTypeInfo(), - generatePreTransform( - transforms, - udfFunctions, - models, - supportedMetadataColumns, - shouldStoreSchemasInState)) - .uid(operatorUidGenerator.generateUid("pre-transform")); + "Transform:Schema", + new EventTypeInfo(), + generatePreTransform(transforms, udfFunctions, models, supportedMetadataColumns)); } private PreTransformOperator generatePreTransform( List<TransformDef> transforms, List<UdfDef> udfFunctions, List<ModelDef> models, - SupportedMetadataColumn[] supportedMetadataColumns, - boolean shouldStoreSchemasInState) { + SupportedMetadataColumn[] supportedMetadataColumns) { PreTransformOperatorBuilder preTransformFunctionBuilder = PreTransformOperator.newBuilder(); for (TransformDef transform : transforms) { @@ -94,8 +85,7 @@ public class TransformTranslator { .map(this::udfDefToUDFTuple) .collect(Collectors.toList())) .addUdfFunctions( - models.stream().map(this::modelToUDFTuple).collect(Collectors.toList())) - .shouldStoreSchemasInState(shouldStoreSchemasInState); + models.stream().map(this::modelToUDFTuple).collect(Collectors.toList())); return preTransformFunctionBuilder.build(); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java index 649afb55c..3fa9cbb78 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java @@ -26,11 +26,14 @@ import org.apache.flink.cdc.connectors.mysql.schema.MySqlFieldDefinition; import org.apache.flink.cdc.connectors.mysql.schema.MySqlTableDefinition; import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig; import org.apache.flink.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics; +import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit; +import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit; import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitState; import org.apache.flink.cdc.connectors.mysql.source.utils.StatementUtils; import org.apache.flink.cdc.connectors.mysql.table.StartupOptions; import org.apache.flink.cdc.connectors.mysql.utils.MySqlTypeUtils; import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema; +import org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema; import org.apache.flink.connector.base.source.reader.RecordEmitter; import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser; @@ -40,6 +43,7 @@ import io.debezium.relational.RelationalDatabaseConnectorConfig; import io.debezium.relational.Table; import io.debezium.relational.TableId; import io.debezium.relational.Tables; +import io.debezium.relational.history.TableChanges; import io.debezium.text.ParsingException; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.connect.source.SourceRecord; @@ -77,7 +81,9 @@ public class MySqlPipelineRecordEmitter extends MySqlRecordEmitter<Event> { private boolean shouldEmitAllCreateTableEventsInSnapshotMode = true; private boolean isBounded = false; - private Map<TableId, CreateTableEvent> createTableEventCache; + private final DebeziumDeserializationSchema<Event> debeziumDeserializationSchema; + + private final Map<TableId, CreateTableEvent> createTableEventCache; public MySqlPipelineRecordEmitter( DebeziumDeserializationSchema<Event> debeziumDeserializationSchema, @@ -87,12 +93,32 @@ public class MySqlPipelineRecordEmitter extends MySqlRecordEmitter<Event> { debeziumDeserializationSchema, sourceReaderMetrics, sourceConfig.isIncludeSchemaChanges()); + this.debeziumDeserializationSchema = debeziumDeserializationSchema; this.sourceConfig = sourceConfig; this.alreadySendCreateTableTables = new HashSet<>(); - this.createTableEventCache = generateCreateTableEvent(sourceConfig); + this.createTableEventCache = + ((DebeziumEventDeserializationSchema) debeziumDeserializationSchema) + .getCreateTableEventCache(); this.isBounded = StartupOptions.snapshot().equals(sourceConfig.getStartupOptions()); } + @Override + public void applySplit(MySqlSplit split) { + if ((isBounded) && createTableEventCache.isEmpty() && split instanceof MySqlSnapshotSplit) { + // TableSchemas in MySqlSnapshotSplit only contains one table. + createTableEventCache.putAll(generateCreateTableEvent(sourceConfig)); + } else { + for (TableChanges.TableChange tableChange : split.getTableSchemas().values()) { + CreateTableEvent createTableEvent = + new CreateTableEvent( + toCdcTableId(tableChange.getId()), + buildSchemaFromTable(tableChange.getTable())); + ((DebeziumEventDeserializationSchema) debeziumDeserializationSchema) + .applyChangeEvent(createTableEvent); + } + } + } + @Override protected void processElement( SourceRecord element, SourceOutput<Event> output, MySqlSplitState splitState) @@ -130,6 +156,11 @@ public class MySqlPipelineRecordEmitter extends MySqlRecordEmitter<Event> { super.processElement(element, output, splitState); } + private org.apache.flink.cdc.common.event.TableId toCdcTableId(TableId dbzTableId) { + return org.apache.flink.cdc.common.event.TableId.tableId( + dbzTableId.catalog(), dbzTableId.table()); + } + private void sendCreateTableEvent( JdbcConnection jdbc, TableId tableId, SourceOutput<Event> output) { Schema schema = getSchema(jdbc, tableId); @@ -204,7 +235,10 @@ public class MySqlPipelineRecordEmitter extends MySqlRecordEmitter<Event> { private Schema parseDDL(String ddlStatement, TableId tableId) { Table table = parseDdl(ddlStatement, tableId); + return buildSchemaFromTable(table); + } + private Schema buildSchemaFromTable(Table table) { List<Column> columns = table.columns(); Schema.Builder tableBuilder = Schema.newBuilder(); for (int i = 0; i < columns.size(); i++) { diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java index 0c59ec976..ceb5cd825 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java @@ -129,7 +129,8 @@ public class MySqlTypeUtils { String typeName = column.typeName(); switch (typeName) { case BIT: - return column.length() == 1 + // column.length() might be -1 + return column.length() <= 1 ? DataTypes.BOOLEAN() : DataTypes.BINARY((column.length() + 7) / 8); case BOOL: diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java index df5e930ad..b6760ce26 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java @@ -60,6 +60,8 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.testcontainers.lifecycle.Startables; import java.sql.Connection; @@ -399,7 +401,131 @@ class MySqlPipelineITCase extends MySqlSourceTestBase { } @Test - void testExcludeTables() throws Exception { + void testLatestOffsetStartupMode() throws Exception { + inventoryDatabase.createAndInitialize(); + MySqlSourceConfigFactory configFactory = + new MySqlSourceConfigFactory() + .hostname(MYSQL8_CONTAINER.getHost()) + .port(MYSQL8_CONTAINER.getDatabasePort()) + .username(TEST_USER) + .password(TEST_PASSWORD) + .databaseList(inventoryDatabase.getDatabaseName()) + .tableList(inventoryDatabase.getDatabaseName() + "\\.products") + .startupOptions(StartupOptions.latest()) + .serverId(getServerId(env.getParallelism())) + .serverTimeZone("UTC") + .includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue()); + + FlinkSourceProvider sourceProvider = + (FlinkSourceProvider) new MySqlDataSource(configFactory).getEventSourceProvider(); + CloseableIterator<Event> events = + env.fromSource( + sourceProvider.getSource(), + WatermarkStrategy.noWatermarks(), + MySqlDataSourceFactory.IDENTIFIER, + new EventTypeInfo()) + .executeAndCollect(); + Thread.sleep(10_000); + TableId tableId = TableId.tableId(inventoryDatabase.getDatabaseName(), "products"); + + List<Event> expectedBinlog = new ArrayList<>(); + try (Connection connection = inventoryDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + expectedBinlog.addAll(executeAlterAndProvideExpected(tableId, statement)); + + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.INT().notNull(), + DataTypes.VARCHAR(255).notNull(), + DataTypes.FLOAT(), + DataTypes.VARCHAR(45), + DataTypes.VARCHAR(55) + }, + new String[] {"id", "name", "weight", "col1", "col2"}); + BinaryRecordDataGenerator generator = new BinaryRecordDataGenerator(rowType); + // insert more data + statement.execute( + String.format( + "INSERT INTO `%s`.`products` VALUES (default,'scooter',5.5,'c-10','c-20');", + inventoryDatabase.getDatabaseName())); // 110 + expectedBinlog.add( + DataChangeEvent.insertEvent( + tableId, + generator.generate( + new Object[] { + 110, + BinaryStringData.fromString("scooter"), + 5.5f, + BinaryStringData.fromString("c-10"), + BinaryStringData.fromString("c-20") + }))); + statement.execute( + String.format( + "INSERT INTO `%s`.`products` VALUES (default,'football',6.6,'c-11','c-21');", + inventoryDatabase.getDatabaseName())); // 111 + expectedBinlog.add( + DataChangeEvent.insertEvent( + tableId, + generator.generate( + new Object[] { + 111, + BinaryStringData.fromString("football"), + 6.6f, + BinaryStringData.fromString("c-11"), + BinaryStringData.fromString("c-21") + }))); + statement.execute( + String.format( + "UPDATE `%s`.`products` SET `col1`='c-12', `col2`='c-22' WHERE id=110;", + inventoryDatabase.getDatabaseName())); + expectedBinlog.add( + DataChangeEvent.updateEvent( + tableId, + generator.generate( + new Object[] { + 110, + BinaryStringData.fromString("scooter"), + 5.5f, + BinaryStringData.fromString("c-10"), + BinaryStringData.fromString("c-20") + }), + generator.generate( + new Object[] { + 110, + BinaryStringData.fromString("scooter"), + 5.5f, + BinaryStringData.fromString("c-12"), + BinaryStringData.fromString("c-22") + }))); + statement.execute( + String.format( + "DELETE FROM `%s`.`products` WHERE `id` = 111;", + inventoryDatabase.getDatabaseName())); + expectedBinlog.add( + DataChangeEvent.deleteEvent( + tableId, + generator.generate( + new Object[] { + 111, + BinaryStringData.fromString("football"), + 6.6f, + BinaryStringData.fromString("c-11"), + BinaryStringData.fromString("c-21") + }))); + } + // In this configuration, several subtasks might emit their corresponding CreateTableEvent + // to downstream. Since it is not possible to predict how many CreateTableEvents should we + // expect, we simply filter them out from expected sets, and assert there's at least one. + + Event createTableEvent = getProductsCreateTableEvent(tableId); + List<Event> actual = fetchResultsExcept(events, expectedBinlog.size(), createTableEvent); + assertThat(actual).isEqualTo(expectedBinlog); + } + + @ParameterizedTest(name = "batchEmit: {0}") + @ValueSource(booleans = {true, false}) + void testExcludeTables(boolean inBatch) throws Exception { inventoryDatabase.createAndInitialize(); String databaseName = inventoryDatabase.getDatabaseName(); MySqlSourceConfigFactory configFactory = diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java index 0004961f3..01c8c7bde 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java @@ -81,8 +81,15 @@ public class ValuesDatabase { private Set<SchemaChangeEventType> enabledSchemaEvolutionTypes; + private final boolean materializedInMemory; + public ValuesMetadataApplier() { + this(true); + } + + public ValuesMetadataApplier(boolean materializedInMemory) { this.enabledSchemaEvolutionTypes = getSupportedSchemaEvolutionTypes(); + this.materializedInMemory = materializedInMemory; } @Override @@ -109,7 +116,9 @@ public class ValuesDatabase { @Override public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) { - applySchemaChangeEvent(schemaChangeEvent); + if (materializedInMemory) { + applySchemaChangeEvent(schemaChangeEvent); + } } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSink.java index 7d4ee3dad..93871f2a4 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSink.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSink.java @@ -80,7 +80,7 @@ public class ValuesDataSink implements DataSink, Serializable { if (errorOnSchemaChange) { return new ValuesDatabase.ErrorOnChangeMetadataApplier(); } else { - return new ValuesDatabase.ValuesMetadataApplier(); + return new ValuesDatabase.ValuesMetadataApplier(materializedInMemory); } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSinkOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSinkOptions.java index 4830d102b..94ced1e3e 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSinkOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSinkOptions.java @@ -28,7 +28,7 @@ public class ValuesDataSinkOptions { .booleanType() .defaultValue(false) .withDescription( - "True if the DataChangeEvent need to be materialized in memory."); + "True if the SchemaChangeEvent and DataChangeEvent need to be materialized in memory."); public static final ConfigOption<Boolean> PRINT_ENABLED = ConfigOptions.key("print.enabled") diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java index ad7ec458e..338f03d23 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java @@ -24,6 +24,8 @@ import org.apache.flink.cdc.common.data.LocalZonedTimestampData; import org.apache.flink.cdc.common.data.RecordData; import org.apache.flink.cdc.common.data.TimestampData; import org.apache.flink.cdc.common.data.binary.BinaryStringData; +import org.apache.flink.cdc.common.event.ChangeEvent; +import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.TableId; @@ -59,6 +61,7 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import java.time.Instant; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -82,10 +85,13 @@ public abstract class DebeziumEventDeserializationSchema extends SourceRecordEve /** Changelog Mode to use for encoding changes in Flink internal data structure. */ protected final DebeziumChangelogMode changelogMode; + private final Map<io.debezium.relational.TableId, CreateTableEvent> createTableEventCache; + public DebeziumEventDeserializationSchema( SchemaDataTypeInference schemaDataTypeInference, DebeziumChangelogMode changelogMode) { this.schemaDataTypeInference = schemaDataTypeInference; this.changelogMode = changelogMode; + this.createTableEventCache = new HashMap<>(); } @Override @@ -435,4 +441,20 @@ public abstract class DebeziumEventDeserializationSchema extends SourceRecordEve } }; } + + public Map<io.debezium.relational.TableId, CreateTableEvent> getCreateTableEventCache() { + return createTableEventCache; + } + + public void applyChangeEvent(ChangeEvent changeEvent) { + org.apache.flink.cdc.common.event.TableId flinkTableId = changeEvent.tableId(); + + io.debezium.relational.TableId debeziumTableId = + new io.debezium.relational.TableId( + flinkTableId.getNamespace(), + flinkTableId.getSchemaName(), + flinkTableId.getTableName()); + + createTableEventCache.put(debeziumTableId, (CreateTableEvent) changeEvent); + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java index d68138c07..cb06cc45a 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java @@ -48,7 +48,6 @@ import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSourceReaderCont import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSplitReader; import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit; import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitSerializer; -import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitState; import org.apache.flink.cdc.connectors.mysql.source.split.SourceRecords; import org.apache.flink.cdc.connectors.mysql.source.utils.hooks.SnapshotPhaseHooks; import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema; @@ -289,7 +288,6 @@ public class MySqlSource<T> @FunctionalInterface interface RecordEmitterSupplier<T> extends Serializable { - RecordEmitter<SourceRecords, T, MySqlSplitState> get( - MySqlSourceReaderMetrics metrics, MySqlSourceConfig sourceConfig); + MySqlRecordEmitter<T> get(MySqlSourceReaderMetrics metrics, MySqlSourceConfig sourceConfig); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java index e3c504113..449e7f608 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java @@ -20,6 +20,7 @@ package org.apache.flink.cdc.connectors.mysql.source.reader; import org.apache.flink.api.connector.source.SourceOutput; import org.apache.flink.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics; import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset; +import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit; import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitState; import org.apache.flink.cdc.connectors.mysql.source.split.SourceRecords; import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils; @@ -120,6 +121,8 @@ public class MySqlRecordEmitter<T> implements RecordEmitter<SourceRecords, T, My debeziumDeserializationSchema.deserialize(element, outputCollector); } + public void applySplit(MySqlSplit split) {} + private void reportMetrics(SourceRecord element) { Long messageTimestamp = RecordUtils.getMessageTimestamp(element); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java index d9bfa18e7..e1d7e7dce 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java @@ -43,7 +43,6 @@ import org.apache.flink.cdc.connectors.mysql.source.split.SourceRecords; import org.apache.flink.cdc.connectors.mysql.source.utils.ChunkUtils; import org.apache.flink.cdc.connectors.mysql.source.utils.TableDiscoveryUtils; import org.apache.flink.configuration.Configuration; -import org.apache.flink.connector.base.source.reader.RecordEmitter; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase; import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager; @@ -82,13 +81,14 @@ public class MySqlSourceReader<T> private final Map<String, MySqlBinlogSplit> uncompletedBinlogSplits; private final int subtaskId; private final MySqlSourceReaderContext mySqlSourceReaderContext; - private final MySqlPartition partition; private volatile MySqlBinlogSplit suspendedBinlogSplit; + private final MySqlRecordEmitter<T> recordEmitter; + private final MySqlPartition partition; public MySqlSourceReader( FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecords>> elementQueue, Supplier<MySqlSplitReader> splitReaderSupplier, - RecordEmitter<SourceRecords, T, MySqlSplitState> recordEmitter, + MySqlRecordEmitter<T> recordEmitter, Configuration config, MySqlSourceReaderContext context, MySqlSourceConfig sourceConfig) { @@ -98,6 +98,7 @@ public class MySqlSourceReader<T> recordEmitter, config, context.getSourceReaderContext()); + this.recordEmitter = recordEmitter; this.sourceConfig = sourceConfig; this.finishedUnackedSplits = new HashMap<>(); this.uncompletedBinlogSplits = new HashMap<>(); @@ -117,6 +118,7 @@ public class MySqlSourceReader<T> @Override protected MySqlSplitState initializedState(MySqlSplit split) { + recordEmitter.applySplit(split); if (split.isSnapshotSplit()) { return new MySqlSnapshotSplitState(split.asSnapshotSplit()); } else { @@ -377,7 +379,7 @@ public class MySqlSourceReader<T> FinishedSnapshotSplitsReportEvent reportEvent = new FinishedSnapshotSplitsReportEvent(finishedOffsets); context.sendSourceEventToCoordinator(reportEvent); - LOG.debug( + LOG.info( "Source reader {} reports offsets of finished snapshot splits {}.", subtaskId, finishedOffsets); @@ -436,7 +438,7 @@ public class MySqlSourceReader<T> binlogSplit.splitId(), MySqlBinlogSplit.appendFinishedSplitInfos( binlogSplit, newAddedMetadataGroup)); - LOG.info( + LOG.debug( "Source reader {} fills metadata of group {} to binlog split", subtaskId, newAddedMetadataGroup.size()); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java index 3f0cd2a2e..7b8dfdcdb 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java @@ -563,16 +563,16 @@ class MySqlSourceReaderTest extends MySqlSourceTestBase { final Method metricGroupMethod = readerContext.getClass().getMethod("metricGroup"); metricGroupMethod.setAccessible(true); final MetricGroup metricGroup = (MetricGroup) metricGroupMethod.invoke(readerContext); - final RecordEmitter<SourceRecords, SourceRecord, MySqlSplitState> recordEmitter = + final MySqlRecordEmitter<SourceRecord> recordEmitter = limit > 0 ? new MysqlLimitedRecordEmitter( new ForwardDeserializeSchema(), - new MySqlSourceReaderMetrics(metricGroup), + new MySqlSourceReaderMetrics(readerContext.metricGroup()), configuration.isIncludeSchemaChanges(), limit) : new MySqlRecordEmitter<>( new ForwardDeserializeSchema(), - new MySqlSourceReaderMetrics(metricGroup), + new MySqlSourceReaderMetrics(readerContext.metricGroup()), configuration.isIncludeSchemaChanges()); final MySqlSourceReaderContext mySqlSourceReaderContext = new MySqlSourceReaderContext(readerContext); @@ -723,8 +723,7 @@ class MySqlSourceReaderTest extends MySqlSourceTestBase { * A implementation of {@link RecordEmitter} which only emit records in given limit number, this * class is used for test purpose. */ - private static class MysqlLimitedRecordEmitter - implements RecordEmitter<SourceRecords, SourceRecord, MySqlSplitState> { + private static class MysqlLimitedRecordEmitter extends MySqlRecordEmitter<SourceRecord> { private static final Logger LOG = LoggerFactory.getLogger(MySqlRecordEmitter.class); private static final FlinkJsonTableChangeSerializer TABLE_CHANGE_SERIALIZER = @@ -741,6 +740,7 @@ class MySqlSourceReaderTest extends MySqlSourceTestBase { MySqlSourceReaderMetrics sourceReaderMetrics, boolean includeSchemaChanges, int limit) { + super(debeziumDeserializationSchema, sourceReaderMetrics, includeSchemaChanges); this.debeziumDeserializationSchema = debeziumDeserializationSchema; this.sourceReaderMetrics = sourceReaderMetrics; this.includeSchemaChanges = includeSchemaChanges; @@ -766,7 +766,7 @@ class MySqlSourceReaderTest extends MySqlSourceTestBase { } } - private void processElement( + protected void processElement( SourceRecord element, SourceOutput<SourceRecord> output, MySqlSplitState splitState) throws Exception { if (isWatermarkEvent(element)) { diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/YamlJobMigrationITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/YamlJobMigrationITCase.java index ca349dfd2..6c929ee28 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/YamlJobMigrationITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/YamlJobMigrationITCase.java @@ -187,6 +187,95 @@ class YamlJobMigrationITCase extends PipelineTestEnvironment { cancelJob(newJobID); } + @ParameterizedTest(name = "{0} -> SNAPSHOT") + @EnumSource(names = {"SNAPSHOT"}) + void testStartingJobFromSavepointWithSchemaChange(TarballFetcher.CdcVersion migrateFromVersion) + throws Exception { + TarballFetcher.fetch(jobManager, migrateFromVersion); + runInContainerAsRoot(jobManager, "chmod", "0777", "-R", "/tmp/cdc/"); + + LOG.info("Successfully fetched CDC {}.", migrateFromVersion); + + String content = + String.format( + "source:\n" + + " type: mysql\n" + + " hostname: %s\n" + + " port: %d\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.\\.*\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + "\n" + + "sink:\n" + + " type: values\n" + + " materialized.in.memory: false" + + "\n" + + "pipeline:\n" + + " parallelism: %d\n" + + "use.legacy.json.format: true\n", + INTER_CONTAINER_MYSQL_ALIAS, + MySqlContainer.MYSQL_PORT, + MYSQL_TEST_USER, + MYSQL_TEST_PASSWORD, + mysqlInventoryDatabase.getDatabaseName(), + 4); + JobID jobID = submitPipelineJob(migrateFromVersion, content); + Assertions.assertThat(jobID).isNotNull(); + LOG.info("Submitted Job ID is {} ", jobID); + + validateResult( + dbNameFormatter, + "CreateTableEvent{tableId=%s.customers, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`address` VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=%s.products, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`description` VARCHAR(512),`weight` FLOAT,`enum_c` STRING 'red',`json_c` STRING,`point_c` STRING}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[101, user_1, Shanghai, 123567891234], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[102, user_2, Shanghai, 123567891234], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[103, user_3, Shanghai, 123567891234], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai, 123567891234], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[101, scooter, Small 2-wheel scooter, 3.14, red, {\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[102, car battery, 12V car battery, 8.1, white, {\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[104, hammer, 12oz carpenter's hammer, 0.75, white, {\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[105, hammer, 14oz carpenter's hammer, 0.875, red, {\"k1\": \"v1\", \"k2\": \"v2\"}, {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[107, rocks, box of assorted rocks, 5.3, null, null, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[108, jacket, water resistent black wind breaker, 0.1, null, null, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null], op=INSERT, meta=()}"); + LOG.info("Snapshot stage finished successfully."); + + generateIncrementalEventsPhaseOne(); + validateResult( + dbNameFormatter, + "DataChangeEvent{tableId=%s.products, before=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], after=[106, hammer, 18oz carpenter hammer, 1.0, null, null, null], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[107, rocks, box of assorted rocks, 5.3, null, null, null], after=[107, rocks, box of assorted rocks, 5.1, null, null, null], op=UPDATE, meta=()}"); + LOG.info("Incremental stage 1 finished successfully."); + + String savepointPath = stopJobWithSavepoint(jobID); + LOG.info("Stopped Job {} and created a savepoint at {}.", jobID, savepointPath); + // Modify schema and make some data changes. + generateIncrementalEventsPhaseTwo(); + JobID newJobID = submitPipelineJob(content, savepointPath, true); + LOG.info("Reincarnated Job {} has been submitted successfully.", newJobID); + validateResult( + dbNameFormatter, + "CreateTableEvent{tableId=%s.products, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`description` VARCHAR(512),`weight` FLOAT,`enum_c` STRING 'red',`json_c` STRING,`point_c` STRING}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=%s.products, before=[107, rocks, box of assorted rocks, 5.3, null, null, null], after=[107, rocks, box of assorted rocks, 5.1, null, null, null], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], after=[106, hammer, 18oz carpenter hammer, 1.0, null, null, null], op=UPDATE, meta=()}", + "AddColumnEvent{tableId=%s.products, addedColumns=[ColumnWithPosition{column=`new_col` INT, position=LAST, existedColumnName=null}]}", + "DataChangeEvent{tableId=%s.products, before=[], after=[110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[111, scooter, Big 2-wheel scooter , 5.18, null, null, null, 1], op=INSERT, meta=()}"); + LOG.info("Incremental stage 2 finished successfully."); + + generateIncrementalEventsPhaseThree(); + validateResult( + dbNameFormatter, + "DataChangeEvent{tableId=%s.products, before=[110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1], after=[110, jacket, new water resistent white wind breaker, 0.5, null, null, null, 1], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.18, null, null, null, 1], after=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], after=[], op=DELETE, meta=()}"); + cancelJob(newJobID); + } + private void generateIncrementalEventsPhaseOne() { executeMySqlStatements( mysqlInventoryDatabase, diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java index 97293ab5e..61af9e370 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java @@ -17,9 +17,6 @@ package org.apache.flink.cdc.runtime.operators.transform; -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.cdc.common.data.binary.BinaryRecordData; @@ -50,15 +47,13 @@ import org.apache.flink.streaming.runtime.tasks.StreamTask; import javax.annotation.Nullable; -import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; -import java.util.HashSet; +import java.util.HashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -70,16 +65,11 @@ public class PreTransformOperator extends AbstractStreamOperator<Event> private static final long serialVersionUID = 1L; - /** All tables which have been sent {@link CreateTableEvent} to downstream. */ - private final Set<TableId> alreadySentCreateTableEvents; - private final List<TransformRule> transformRules; private final Map<TableId, PreTransformChangeInfo> preTransformChangeInfoMap; private final List<Tuple2<Selectors, SchemaMetadataTransform>> schemaMetadataTransformers; private final List<Tuple3<String, String, Map<String, String>>> udfFunctions; - private final boolean shouldStoreSchemasInState; - private transient ListState<byte[]> state; private transient List<PreTransformer> transforms; private transient List<UserDefinedFunctionDescriptor> udfDescriptors; private transient Map<TableId, PreTransformProcessor> preTransformProcessorMap; @@ -91,17 +81,14 @@ public class PreTransformOperator extends AbstractStreamOperator<Event> PreTransformOperator( List<TransformRule> transformRules, - List<Tuple3<String, String, Map<String, String>>> udfFunctions, - boolean shouldStoreSchemasInState) { - this.preTransformChangeInfoMap = new ConcurrentHashMap<>(); - this.alreadySentCreateTableEvents = new HashSet<>(); - this.preTransformProcessorMap = new ConcurrentHashMap<>(); + List<Tuple3<String, String, Map<String, String>>> udfFunctions) { + this.preTransformChangeInfoMap = new HashMap<>(); + this.preTransformProcessorMap = new HashMap<>(); this.schemaMetadataTransformers = new ArrayList<>(); this.chainingStrategy = ChainingStrategy.ALWAYS; this.transformRules = transformRules; this.udfFunctions = udfFunctions; - this.shouldStoreSchemasInState = shouldStoreSchemasInState; } @Override @@ -115,8 +102,6 @@ public class PreTransformOperator extends AbstractStreamOperator<Event> .map(udf -> new UserDefinedFunctionDescriptor(udf.f0, udf.f1, udf.f2)) .collect(Collectors.toList()); - // Initialize data fields in advance because they might be accessed in - // `::initializeState` function when restoring from a previous state. this.transforms = new ArrayList<>(); for (TransformRule transformRule : transformRules) { String tableInclusions = transformRule.getTableInclusions(); @@ -144,54 +129,17 @@ public class PreTransformOperator extends AbstractStreamOperator<Event> @Override public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); - if (!shouldStoreSchemasInState) { - // Skip schema persistency if we're in the distributed schema mode or the batch - // execution mode. - return; - } - OperatorStateStore stateStore = context.getOperatorStateStore(); - ListStateDescriptor<byte[]> descriptor = - new ListStateDescriptor<>("originalSchemaState", byte[].class); - state = stateStore.getUnionListState(descriptor); - if (context.isRestored()) { - for (byte[] serializedTableInfo : state.get()) { - PreTransformChangeInfo stateTableChangeInfo = - PreTransformChangeInfo.SERIALIZER.deserialize( - PreTransformChangeInfo.SERIALIZER.getVersion(), - serializedTableInfo); - preTransformChangeInfoMap.put( - stateTableChangeInfo.getTableId(), stateTableChangeInfo); - - CreateTableEvent restoredCreateTableEvent = - new CreateTableEvent( - stateTableChangeInfo.getTableId(), - stateTableChangeInfo.getPreTransformedSchema()); - // hasAsteriskMap needs to be recalculated after restoring from a checkpoint. - cacheTransformRuleInfo(restoredCreateTableEvent); - } - } + // Historically, transform operator maintains internal state to help transforming schemas + // correctly after restart. + // However, it is not required after + // [FLINK-38045] got merged, since + // MySQL source will emit correct CreateTableEvents from state. } @Override public void snapshotState(StateSnapshotContext context) throws Exception { super.snapshotState(context); - if (!shouldStoreSchemasInState) { - // Same reason in this#initializeState. - return; - } - state.update( - new ArrayList<>( - preTransformChangeInfoMap.values().stream() - .map( - tableChangeInfo -> { - try { - return PreTransformChangeInfo.SERIALIZER.serialize( - tableChangeInfo); - } catch (IOException e) { - throw new RuntimeException(e); - } - }) - .collect(Collectors.toList()))); + // Do nothing, the reason is the same as this#initializeState. } @Override @@ -204,7 +152,6 @@ public class PreTransformOperator extends AbstractStreamOperator<Event> public void close() throws Exception { super.close(); clearOperator(); - this.state = null; } @Override @@ -239,7 +186,6 @@ public class PreTransformOperator extends AbstractStreamOperator<Event> // which may be different with the schema currently being processed. if (!preTransformProcessorMap.containsKey(createTableEvent.tableId())) { output.collect(new StreamRecord<>(cacheCreateTable(createTableEvent))); - alreadySentCreateTableEvents.add(createTableEvent.tableId()); } } else if (event instanceof DropTableEvent) { preTransformProcessorMap.remove(((DropTableEvent) event).tableId()); @@ -247,33 +193,16 @@ public class PreTransformOperator extends AbstractStreamOperator<Event> } else if (event instanceof TruncateTableEvent) { output.collect(new StreamRecord<>(event)); } else if (event instanceof SchemaChangeEvent) { - lazilyEmitCreateTableEvent(event); SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event; preTransformProcessorMap.remove(schemaChangeEvent.tableId()); cacheChangeSchema(schemaChangeEvent) .ifPresent(e -> output.collect(new StreamRecord<>(e))); } else if (event instanceof DataChangeEvent) { - lazilyEmitCreateTableEvent(event); output.collect(new StreamRecord<>(processDataChangeEvent(((DataChangeEvent) event)))); } } - /** Emit related CreateTableEvent for the first time when meeting ChangeEvent. */ - private void lazilyEmitCreateTableEvent(Event event) { - ChangeEvent changeEvent = (ChangeEvent) event; - if (!alreadySentCreateTableEvents.contains(changeEvent.tableId())) { - PreTransformChangeInfo stateTableChangeInfo = - preTransformChangeInfoMap.get(changeEvent.tableId()); - CreateTableEvent createTableEvent = - new CreateTableEvent( - stateTableChangeInfo.getTableId(), - stateTableChangeInfo.getPreTransformedSchema()); - output.collect(new StreamRecord<>(createTableEvent)); - alreadySentCreateTableEvents.add(changeEvent.tableId()); - } - } - - private SchemaChangeEvent cacheCreateTable(CreateTableEvent event) { + private CreateTableEvent cacheCreateTable(CreateTableEvent event) { TableId tableId = event.tableId(); Schema originalSchema = event.getSchema(); event = transformCreateTableEvent(event); @@ -354,6 +283,7 @@ public class PreTransformOperator extends AbstractStreamOperator<Event> tableId, transformSchemaMetaData( createTableEvent.getSchema(), transform.f1)); + break; } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorBuilder.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorBuilder.java index e458b6416..bb4df65a5 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorBuilder.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorBuilder.java @@ -29,7 +29,6 @@ import java.util.Map; /** Builder of {@link PreTransformOperator}. */ public class PreTransformOperatorBuilder { private final List<TransformRule> transformRules = new ArrayList<>(); - private boolean shouldStoreSchemasInState; private final List<Tuple3<String, String, Map<String, String>>> udfFunctions = new ArrayList<>(); @@ -77,13 +76,7 @@ public class PreTransformOperatorBuilder { return this; } - public PreTransformOperatorBuilder shouldStoreSchemasInState( - boolean shouldStoreSchemasInState) { - this.shouldStoreSchemasInState = shouldStoreSchemasInState; - return this; - } - public PreTransformOperator build() { - return new PreTransformOperator(transformRules, udfFunctions, shouldStoreSchemasInState); + return new PreTransformOperator(transformRules, udfFunctions); } }