This is an automated email from the ASF dual-hosted git repository. gongzhongqiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push: new 39fd00ce4 [FLINK-36610] MySQL CDC supports parsing gh-ost / pt-osc generated schema changes (#3668) 39fd00ce4 is described below commit 39fd00ce49dd6683a1c57d81acd06b1611e5b579 Author: yuxiqian <34335406+yuxiq...@users.noreply.github.com> AuthorDate: Mon Jan 13 16:16:56 2025 +0800 [FLINK-36610] MySQL CDC supports parsing gh-ost / pt-osc generated schema changes (#3668) Co-authored-by: MOBIN-F <18814118...@163.com> --- .../docs/connectors/flink-sources/mysql-cdc.md | 12 + .../docs/connectors/pipeline-connectors/mysql.md | 12 + .../docs/connectors/flink-sources/mysql-cdc.md | 12 + .../docs/connectors/pipeline-connectors/mysql.md | 12 + .../flink/cdc/common/utils/TestCaseUtils.java | 94 ++++ .../mysql/factory/MySqlDataSourceFactory.java | 5 +- .../mysql/source/MySqlDataSourceOptions.java | 8 + .../source/MySqlOnLineSchemaMigrationITCase.java | 591 ++++++++++++++++++++ .../mysql/debezium/reader/BinlogSplitReader.java | 26 + .../mysql/source/MySqlSourceBuilder.java | 6 + .../mysql/source/config/MySqlSourceConfig.java | 9 +- .../source/config/MySqlSourceConfigFactory.java | 10 +- .../mysql/source/config/MySqlSourceOptions.java | 8 + .../connectors/mysql/source/utils/RecordUtils.java | 111 ++++ .../connectors/mysql/table/MySqlTableSource.java | 9 +- .../mysql/table/MySqlTableSourceFactory.java | 6 +- .../MySqlOnLineSchemaMigrationSourceITCase.java | 584 ++++++++++++++++++++ .../mysql/source/MySqlSourceTestBase.java | 6 + .../MySqlOnLineSchemaMigrationTableITCase.java | 598 +++++++++++++++++++++ .../mysql/table/MySqlTableSourceFactoryTest.java | 34 +- 20 files changed, 2136 insertions(+), 17 deletions(-) diff --git a/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md b/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md index aa14fc440..a2a6f6ad7 100644 --- a/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md +++ b/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md @@ -377,6 +377,18 @@ Flink SQL> SELECT * FROM orders; <td>Boolean</td> <td>是否在快照结束后关闭空闲的 Reader。 此特性需要 flink 版本大于等于 1.14 并且 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 需要设置为 true。</td> </tr> + <tr> + <td>scan.parse.online.schema.changes.enabled</td> + <td>optional</td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td> + 是否尝试解析由 <a href="https://github.com/github/gh-ost">gh-ost</a> 或 <a href="https://docs.percona.com/percona-toolkit/pt-online-schema-change.html">pt-osc</a> 工具生成的表结构变更事件。 + 这些工具会在变更表结构时,将变更语句应用到“影子表”之上,并稍后将其与主表进行交换,以达到表结构变更的目的。 + <br> + 这是一项实验性功能。 + </td> + </tr> </tbody> </table> </div> diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md index a5f0924c3..2ff209442 100644 --- a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md +++ b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md @@ -286,6 +286,18 @@ pipeline: scan.binlog.newly-added-table.enabled: 只在 binlog 读取阶段读取新增表的增量数据。 </td> </tr> + <tr> + <td>scan.parse.online.schema.changes.enabled</td> + <td>optional</td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td> + 是否尝试解析由 <a href="https://github.com/github/gh-ost">gh-ost</a> 或 <a href="https://docs.percona.com/percona-toolkit/pt-online-schema-change.html">pt-osc</a> 工具生成的表结构变更事件。 + 这些工具会在变更表结构时,将变更语句应用到“影子表”之上,并稍后将其与主表进行交换,以达到表结构变更的目的。 + <br> + 这是一项实验性功能。 + </td> + </tr> </tbody> </table> </div> diff --git a/docs/content/docs/connectors/flink-sources/mysql-cdc.md b/docs/content/docs/connectors/flink-sources/mysql-cdc.md index f335f047a..60d81baa2 100644 --- a/docs/content/docs/connectors/flink-sources/mysql-cdc.md +++ b/docs/content/docs/connectors/flink-sources/mysql-cdc.md @@ -391,6 +391,18 @@ During a snapshot operation, the connector will query each included table to pro hex: The binary data type is converted to a hexadecimal string and transmitted. The default value is none. Depending on your requirements and data types, you can choose the appropriate processing mode. If your database contains a large number of binary data types, it is recommended to use base64 or hex mode to make it easier to handle during transmission.</td> </tr> + <tr> + <td>scan.parse.online.schema.changes.enabled</td> + <td>optional</td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td> + Whether to parse "online" schema changes generated by <a href="https://github.com/github/gh-ost">gh-ost</a> or <a href="https://docs.percona.com/percona-toolkit/pt-online-schema-change.html">pt-osc</a>. + Schema change events are applied to a "shadow" table and then swapped with the original table later. + <br> + This is an experimental feature, and subject to change in the future. + </td> + </tr> </tbody> </table> </div> diff --git a/docs/content/docs/connectors/pipeline-connectors/mysql.md b/docs/content/docs/connectors/pipeline-connectors/mysql.md index 3cc5c17a1..29c6549e5 100644 --- a/docs/content/docs/connectors/pipeline-connectors/mysql.md +++ b/docs/content/docs/connectors/pipeline-connectors/mysql.md @@ -293,6 +293,18 @@ pipeline: scan.binlog.newly-added-table.enabled: only do binlog-reading for newly added table during binlog reading phase. </td> </tr> + <tr> + <td>scan.parse.online.schema.changes.enabled</td> + <td>optional</td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td> + Whether to parse "online" schema changes generated by <a href="https://github.com/github/gh-ost">gh-ost</a> or <a href="https://docs.percona.com/percona-toolkit/pt-online-schema-change.html">pt-osc</a>. + Schema change events are applied to a "shadow" table and then swapped with the original table later. + <br> + This is an experimental feature, and subject to change in the future. + </td> + </tr> </tbody> </table> </div> diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/TestCaseUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/TestCaseUtils.java new file mode 100644 index 000000000..f179e779c --- /dev/null +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/TestCaseUtils.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.utils; + +import org.apache.flink.util.function.SupplierWithException; + +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.function.Predicate; +import java.util.function.Supplier; + +/** Some utility methods for creating repeated-checking test cases. */ +public class TestCaseUtils { + + public static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(30); + public static final Duration DEFAULT_INTERVAL = Duration.ofSeconds(1); + + /** Fetch with a ({@code timeout}, {@code interval}) duration. */ + public static void repeatedCheck(Supplier<Boolean> fetcher) { + repeatedCheck(fetcher, DEFAULT_TIMEOUT); + } + + /** Fetch with a ({@code timeout}, {@code interval}) duration. */ + public static void repeatedCheck(Supplier<Boolean> fetcher, Duration timeout) { + repeatedCheck(fetcher, timeout, DEFAULT_INTERVAL); + } + + /** Fetch with a ({@code timeout}, {@code interval}) duration. */ + public static void repeatedCheck( + Supplier<Boolean> fetcher, Duration timeout, Duration interval) { + repeatedCheck(fetcher::get, timeout, interval, Collections.emptyList()); + } + + /** Fetch and wait with a ({@code timeout}, {@code interval}) duration. */ + public static <T> void repeatedCheck( + Supplier<T> fetcher, Predicate<T> validator, Duration timeout, Duration interval) { + repeatedCheckAndValidate( + fetcher::get, validator, timeout, interval, Collections.emptyList()); + } + + /** Waiting for fetching values with a ({@code timeout}, {@code interval}) duration. */ + public static void repeatedCheck( + SupplierWithException<Boolean, Throwable> fetcher, + Duration timeout, + Duration interval, + List<Class<? extends Throwable>> allowedThrowsList) { + repeatedCheckAndValidate(fetcher, b -> b, timeout, interval, allowedThrowsList); + } + + /** Fetch and validate, with a ({@code timeout}, {@code interval}) duration. */ + public static <T> void repeatedCheckAndValidate( + SupplierWithException<T, Throwable> fetcher, + Predicate<T> validator, + Duration timeout, + Duration interval, + List<Class<? extends Throwable>> allowedThrowsList) { + + long start = System.currentTimeMillis(); + while (System.currentTimeMillis() - start < timeout.toMillis()) { + try { + if (validator.test(fetcher.get())) { + return; + } + } catch (Throwable t) { + if (allowedThrowsList.stream() + .noneMatch(clazz -> clazz.isAssignableFrom(t.getClass()))) { + throw new RuntimeException("Fetcher has thrown an unexpected exception: ", t); + } + } + try { + Thread.sleep(interval.toMillis()); + } catch (InterruptedException ignored) { + // ignored + } + } + throw new RuntimeException("Timeout when waiting for state to be ready."); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java index 3d89f18c0..408be27e9 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java @@ -67,6 +67,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOption import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HEARTBEAT_INTERVAL; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.METADATA_LIST; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED; @@ -139,6 +140,7 @@ public class MySqlDataSourceFactory implements DataSourceFactory { boolean scanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED); boolean scanBinlogNewlyAddedTableEnabled = config.get(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED); + boolean isParsingOnLineSchemaChanges = config.get(PARSE_ONLINE_SCHEMA_CHANGES); validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1); validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1); @@ -175,7 +177,8 @@ public class MySqlDataSourceFactory implements DataSourceFactory { .includeSchemaChanges(includeSchemaChanges) .debeziumProperties(getDebeziumProperties(configMap)) .jdbcProperties(getJdbcProperties(configMap)) - .scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled); + .scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled) + .parseOnLineSchemaChanges(isParsingOnLineSchemaChanges); List<TableId> tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java index b2284ed1c..aa682c211 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java @@ -281,4 +281,12 @@ public class MySqlDataSourceOptions { .withDescription( "List of readable metadata from SourceRecord to be passed to downstream, split by `,`. " + "Available readable metadata are: op_ts."); + + @Experimental + public static final ConfigOption<Boolean> PARSE_ONLINE_SCHEMA_CHANGES = + ConfigOptions.key("scan.parse.online.schema.changes.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to parse schema change events generated by gh-ost/pt-osc utilities. Defaults to false."); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationITCase.java new file mode 100644 index 000000000..544fc969e --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationITCase.java @@ -0,0 +1,591 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.cdc.common.data.binary.BinaryRecordData; +import org.apache.flink.cdc.common.data.binary.BinaryStringData; +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.PhysicalColumn; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.source.FlinkSourceProvider; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory; +import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; +import org.apache.flink.cdc.connectors.mysql.table.StartupOptions; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; +import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.util.CloseableIterator; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.testcontainers.DockerClientFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCHEMA_CHANGE_ENABLED; +import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD; +import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_USER; +import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.fetchResults; +import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.getServerId; +import static org.junit.Assert.assertEquals; + +/** + * IT case for Evolving MySQL schema with gh-ost/pt-osc utility. See <a + * href="https://github.com/github/gh-ost">github/gh-ost</a>/<a + * href="https://docs.percona.com/percona-toolkit/pt-online-schema-change.html">doc/pt-osc</a> for + * more details. + */ +public class MySqlOnLineSchemaMigrationITCase extends MySqlSourceTestBase { + private static final MySqlContainer MYSQL8_CONTAINER = + createMySqlContainer(MySqlVersion.V8_0, "docker/server-gtids/expire-seconds/my.cnf"); + + private static final String PERCONA_TOOLKIT = "perconalab/percona-toolkit:3.5.7"; + + protected static final GenericContainer<?> PERCONA_TOOLKIT_CONTAINER = + createPerconaToolkitContainer(); + + private final UniqueDatabase customerDatabase = + new UniqueDatabase(MYSQL8_CONTAINER, "customer", TEST_USER, TEST_PASSWORD); + + private final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + + private static final String GH_OST_DOWNLOAD_LINK = + DockerClientFactory.instance().client().versionCmd().exec().getArch().equals("amd64") + ? "https://github.com/github/gh-ost/releases/download/v1.1.6/gh-ost-binary-linux-amd64-20231207144046.tar.gz" + : "https://github.com/github/gh-ost/releases/download/v1.1.6/gh-ost-binary-linux-arm64-20231207144046.tar.gz"; + + @BeforeClass + public static void beforeClass() { + LOG.info("Starting MySql8 containers..."); + Startables.deepStart(Stream.of(MYSQL8_CONTAINER)).join(); + Startables.deepStart(Stream.of(PERCONA_TOOLKIT_CONTAINER)).join(); + LOG.info("Container MySql8 is started."); + } + + @AfterClass + public static void afterClass() { + LOG.info("Stopping MySql8 containers..."); + MYSQL8_CONTAINER.stop(); + PERCONA_TOOLKIT_CONTAINER.stop(); + LOG.info("Container MySql8 is stopped."); + } + + @Before + public void before() { + customerDatabase.createAndInitialize(); + TestValuesTableFactory.clearAllData(); + env.setParallelism(4); + env.enableCheckpointing(200); + env.setRestartStrategy(RestartStrategies.noRestart()); + } + + @After + public void after() { + customerDatabase.dropDatabase(); + } + + private static void installGhOstCli(Container<?> container) { + try { + execInContainer( + container, + "download gh-ost tarball", + "curl", + "-L", + "-o", + "/tmp/gh-ost.tar.gz", + GH_OST_DOWNLOAD_LINK); + execInContainer( + container, "unzip binary", "tar", "-xzvf", "/tmp/gh-ost.tar.gz", "-C", "/bin"); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + private static GenericContainer<?> createPerconaToolkitContainer() { + GenericContainer<?> perconaToolkit = + new GenericContainer<>(PERCONA_TOOLKIT) + // keep container alive + .withCommand("tail", "-f", "/dev/null") + .withNetwork(NETWORK) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + return perconaToolkit; + } + + @Test + public void testGhOstSchemaMigrationFromScratch() throws Exception { + LOG.info("Step 1: Install gh-ost command line utility"); + installGhOstCli(MYSQL8_CONTAINER); + + LOG.info("Step 2: Start pipeline job"); + env.setParallelism(1); + TableId tableId = TableId.tableId(customerDatabase.getDatabaseName(), "customers"); + MySqlSourceConfigFactory configFactory = + new MySqlSourceConfigFactory() + .hostname(MYSQL8_CONTAINER.getHost()) + .port(MYSQL8_CONTAINER.getDatabasePort()) + .username(TEST_USER) + .password(TEST_PASSWORD) + .databaseList(customerDatabase.getDatabaseName()) + .tableList(customerDatabase.getDatabaseName() + "\\.customers") + .startupOptions(StartupOptions.initial()) + .serverId(getServerId(env.getParallelism())) + .serverTimeZone("UTC") + .includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue()) + .parseOnLineSchemaChanges(true); + + FlinkSourceProvider sourceProvider = + (FlinkSourceProvider) new MySqlDataSource(configFactory).getEventSourceProvider(); + CloseableIterator<Event> events = + env.fromSource( + sourceProvider.getSource(), + WatermarkStrategy.noWatermarks(), + MySqlDataSourceFactory.IDENTIFIER, + new EventTypeInfo()) + .executeAndCollect(); + Thread.sleep(5_000); + + List<Event> expected = new ArrayList<>(); + Schema schemaV1 = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT().notNull()) + .physicalColumn("name", DataTypes.VARCHAR(255).notNull(), null, "flink") + .physicalColumn("address", DataTypes.VARCHAR(1024)) + .physicalColumn("phone_number", DataTypes.VARCHAR(512)) + .primaryKey(Collections.singletonList("id")) + .build(); + expected.add(new CreateTableEvent(tableId, schemaV1)); + expected.addAll(getSnapshotExpected(tableId, schemaV1)); + List<Event> actual = fetchResults(events, expected.size()); + assertEqualsInAnyOrder( + expected.stream().map(Object::toString).collect(Collectors.toList()), + actual.stream().map(Object::toString).collect(Collectors.toList())); + + // Wait for a little while until we're in Binlog streaming mode. + Thread.sleep(5_000); + + LOG.info("Step 3: Evolve schema with gh-ost - ADD COLUMN"); + execInContainer( + MYSQL8_CONTAINER, + "evolve schema", + "gh-ost", + "--user=" + TEST_USER, + "--password=" + TEST_PASSWORD, + "--database=" + customerDatabase.getDatabaseName(), + "--table=customers", + "--alter=add column ext int", + "--allow-on-master", // because we don't have a replica + "--initially-drop-old-table", // drop previously generated temporary tables + "--execute"); + + try (Connection connection = customerDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + // The new column `ext` has been inserted now + statement.execute( + String.format( + "INSERT INTO `%s`.`customers` VALUES (10000, 'Alice', 'Beijing', '123567891234', 17);", + customerDatabase.getDatabaseName())); + } + + Schema schemaV2 = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT().notNull()) + .physicalColumn("name", DataTypes.VARCHAR(255).notNull(), null, "flink") + .physicalColumn("address", DataTypes.VARCHAR(1024)) + .physicalColumn("phone_number", DataTypes.VARCHAR(512)) + .physicalColumn("ext", DataTypes.INT()) + .primaryKey(Collections.singletonList("id")) + .build(); + + assertEquals( + Arrays.asList( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + new PhysicalColumn("ext", DataTypes.INT(), null)))), + DataChangeEvent.insertEvent( + tableId, + generate(schemaV2, 10000, "Alice", "Beijing", "123567891234", 17))), + fetchResults(events, 2)); + + LOG.info("Step 4: Evolve schema with gh-ost - MODIFY COLUMN"); + execInContainer( + MYSQL8_CONTAINER, + "evolve schema", + "gh-ost", + "--user=" + TEST_USER, + "--password=" + TEST_PASSWORD, + "--database=" + customerDatabase.getDatabaseName(), + "--table=customers", + "--alter=modify column ext double", + "--allow-on-master", + "--initially-drop-old-table", + "--execute"); + + try (Connection connection = customerDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "INSERT INTO `%s`.`customers` VALUES (10001, 'Bob', 'Chongqing', '123567891234', 2.718281828);", + customerDatabase.getDatabaseName())); + } + + Schema schemaV3 = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT().notNull()) + .physicalColumn("name", DataTypes.VARCHAR(255).notNull(), null, "flink") + .physicalColumn("address", DataTypes.VARCHAR(1024)) + .physicalColumn("phone_number", DataTypes.VARCHAR(512)) + .physicalColumn("ext", DataTypes.DOUBLE()) + .primaryKey(Collections.singletonList("id")) + .build(); + + assertEquals( + Arrays.asList( + new AlterColumnTypeEvent( + tableId, Collections.singletonMap("ext", DataTypes.DOUBLE())), + DataChangeEvent.insertEvent( + tableId, + generate( + schemaV3, + 10001, + "Bob", + "Chongqing", + "123567891234", + 2.718281828))), + fetchResults(events, 2)); + + LOG.info("Step 5: Evolve schema with gh-ost - DROP COLUMN"); + execInContainer( + MYSQL8_CONTAINER, + "evolve schema", + "gh-ost", + "--user=" + TEST_USER, + "--password=" + TEST_PASSWORD, + "--database=" + customerDatabase.getDatabaseName(), + "--table=customers", + "--alter=drop column ext", + "--allow-on-master", + "--initially-drop-old-table", + "--execute"); + + try (Connection connection = customerDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "INSERT INTO `%s`.`customers` VALUES (10002, 'Cicada', 'Urumqi', '123567891234');", + customerDatabase.getDatabaseName())); + } + + Schema schemaV4 = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT().notNull()) + .physicalColumn("name", DataTypes.VARCHAR(255).notNull(), null, "flink") + .physicalColumn("address", DataTypes.VARCHAR(1024)) + .physicalColumn("phone_number", DataTypes.VARCHAR(512)) + .primaryKey(Collections.singletonList("id")) + .build(); + + assertEquals( + Arrays.asList( + new DropColumnEvent(tableId, Collections.singletonList("ext")), + DataChangeEvent.insertEvent( + tableId, + generate(schemaV4, 10002, "Cicada", "Urumqi", "123567891234"))), + fetchResults(events, 2)); + } + + @Test + public void testPtOscSchemaMigrationFromScratch() throws Exception { + LOG.info("Step 1: Start pipeline job"); + + env.setParallelism(1); + TableId tableId = TableId.tableId(customerDatabase.getDatabaseName(), "customers"); + MySqlSourceConfigFactory configFactory = + new MySqlSourceConfigFactory() + .hostname(MYSQL8_CONTAINER.getHost()) + .port(MYSQL8_CONTAINER.getDatabasePort()) + .username(TEST_USER) + .password(TEST_PASSWORD) + .databaseList(customerDatabase.getDatabaseName()) + .tableList(customerDatabase.getDatabaseName() + "\\.customers") + .startupOptions(StartupOptions.initial()) + .serverId(getServerId(env.getParallelism())) + .serverTimeZone("UTC") + .includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue()) + .parseOnLineSchemaChanges(true); + + FlinkSourceProvider sourceProvider = + (FlinkSourceProvider) new MySqlDataSource(configFactory).getEventSourceProvider(); + CloseableIterator<Event> events = + env.fromSource( + sourceProvider.getSource(), + WatermarkStrategy.noWatermarks(), + MySqlDataSourceFactory.IDENTIFIER, + new EventTypeInfo()) + .executeAndCollect(); + Thread.sleep(5_000); + + List<Event> expected = new ArrayList<>(); + Schema schemaV1 = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT().notNull()) + .physicalColumn("name", DataTypes.VARCHAR(255).notNull(), null, "flink") + .physicalColumn("address", DataTypes.VARCHAR(1024)) + .physicalColumn("phone_number", DataTypes.VARCHAR(512)) + .primaryKey(Collections.singletonList("id")) + .build(); + expected.add(new CreateTableEvent(tableId, schemaV1)); + expected.addAll(getSnapshotExpected(tableId, schemaV1)); + List<Event> actual = fetchResults(events, expected.size()); + assertEqualsInAnyOrder( + expected.stream().map(Object::toString).collect(Collectors.toList()), + actual.stream().map(Object::toString).collect(Collectors.toList())); + + // Wait for a little while until we're in Binlog streaming mode. + Thread.sleep(5_000); + + LOG.info("Step 2: Evolve schema with pt-osc - ADD COLUMN"); + execInContainer( + PERCONA_TOOLKIT_CONTAINER, + "evolve schema", + "pt-online-schema-change", + "--user=" + TEST_USER, + "--host=" + INTER_CONTAINER_MYSQL_ALIAS, + "--password=" + TEST_PASSWORD, + "P=3306,t=customers,D=" + customerDatabase.getDatabaseName(), + "--alter", + "add column ext int", + "--charset=utf8", + "--recursion-method=NONE", // Do not look for slave nodes + "--print", + "--execute"); + + try (Connection connection = customerDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + // The new column `ext` has been inserted now + statement.execute( + String.format( + "INSERT INTO `%s`.`customers` VALUES (10000, 'Alice', 'Beijing', '123567891234', 17);", + customerDatabase.getDatabaseName())); + } + + Schema schemaV2 = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT().notNull()) + .physicalColumn("name", DataTypes.VARCHAR(255).notNull(), null, "flink") + .physicalColumn("address", DataTypes.VARCHAR(1024)) + .physicalColumn("phone_number", DataTypes.VARCHAR(512)) + .physicalColumn("ext", DataTypes.INT()) + .primaryKey(Collections.singletonList("id")) + .build(); + + assertEquals( + Arrays.asList( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + new PhysicalColumn("ext", DataTypes.INT(), null)))), + DataChangeEvent.insertEvent( + tableId, + generate(schemaV2, 10000, "Alice", "Beijing", "123567891234", 17))), + fetchResults(events, 2)); + + LOG.info("Step 3: Evolve schema with pt-osc - MODIFY COLUMN"); + execInContainer( + PERCONA_TOOLKIT_CONTAINER, + "evolve schema", + "pt-online-schema-change", + "--user=" + TEST_USER, + "--host=" + INTER_CONTAINER_MYSQL_ALIAS, + "--password=" + TEST_PASSWORD, + "P=3306,t=customers,D=" + customerDatabase.getDatabaseName(), + "--alter", + "modify column ext double", + "--charset=utf8", + "--recursion-method=NONE", + "--print", + "--execute"); + + try (Connection connection = customerDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "INSERT INTO `%s`.`customers` VALUES (10001, 'Bob', 'Chongqing', '123567891234', 2.718281828);", + customerDatabase.getDatabaseName())); + } + + Schema schemaV3 = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT().notNull()) + .physicalColumn("name", DataTypes.VARCHAR(255).notNull(), null, "flink") + .physicalColumn("address", DataTypes.VARCHAR(1024)) + .physicalColumn("phone_number", DataTypes.VARCHAR(512)) + .physicalColumn("ext", DataTypes.DOUBLE()) + .primaryKey(Collections.singletonList("id")) + .build(); + + assertEquals( + Arrays.asList( + new AlterColumnTypeEvent( + tableId, Collections.singletonMap("ext", DataTypes.DOUBLE())), + DataChangeEvent.insertEvent( + tableId, + generate( + schemaV3, + 10001, + "Bob", + "Chongqing", + "123567891234", + 2.718281828))), + fetchResults(events, 2)); + + LOG.info("Step 4: Evolve schema with pt-osc - DROP COLUMN"); + execInContainer( + PERCONA_TOOLKIT_CONTAINER, + "evolve schema", + "pt-online-schema-change", + "--user=" + TEST_USER, + "--host=" + INTER_CONTAINER_MYSQL_ALIAS, + "--password=" + TEST_PASSWORD, + "P=3306,t=customers,D=" + customerDatabase.getDatabaseName(), + "--alter", + "drop column ext", + "--charset=utf8", + "--recursion-method=NONE", + "--print", + "--execute"); + + try (Connection connection = customerDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "INSERT INTO `%s`.`customers` VALUES (10002, 'Cicada', 'Urumqi', '123567891234');", + customerDatabase.getDatabaseName())); + } + + Schema schemaV4 = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT().notNull()) + .physicalColumn("name", DataTypes.VARCHAR(255).notNull(), null, "flink") + .physicalColumn("address", DataTypes.VARCHAR(1024)) + .physicalColumn("phone_number", DataTypes.VARCHAR(512)) + .primaryKey(Collections.singletonList("id")) + .build(); + + assertEquals( + Arrays.asList( + new DropColumnEvent(tableId, Collections.singletonList("ext")), + DataChangeEvent.insertEvent( + tableId, + generate(schemaV4, 10002, "Cicada", "Urumqi", "123567891234"))), + fetchResults(events, 2)); + } + + private static void execInContainer(Container<?> container, String prompt, String... commands) + throws IOException, InterruptedException { + { + LOG.info( + "Starting to {} with the following command: `{}`", + prompt, + String.join(" ", commands)); + Container.ExecResult execResult = container.execInContainer(commands); + if (execResult.getExitCode() == 0) { + LOG.info("Successfully {}. Stdout: {}", prompt, execResult.getStdout()); + } else { + LOG.error( + "Failed to {}. Exit code: {}, Stdout: {}, Stderr: {}", + prompt, + execResult.getExitCode(), + execResult.getStdout(), + execResult.getStderr()); + throw new IOException("Failed to execute commands: " + String.join(" ", commands)); + } + } + } + + private List<Event> getSnapshotExpected(TableId tableId, Schema schema) { + return Stream.of( + generate(schema, 101, "user_1", "Shanghai", "123567891234"), + generate(schema, 102, "user_2", "Shanghai", "123567891234"), + generate(schema, 103, "user_3", "Shanghai", "123567891234"), + generate(schema, 109, "user_4", "Shanghai", "123567891234"), + generate(schema, 110, "user_5", "Shanghai", "123567891234"), + generate(schema, 111, "user_6", "Shanghai", "123567891234"), + generate(schema, 118, "user_7", "Shanghai", "123567891234"), + generate(schema, 121, "user_8", "Shanghai", "123567891234"), + generate(schema, 123, "user_9", "Shanghai", "123567891234"), + generate(schema, 1009, "user_10", "Shanghai", "123567891234"), + generate(schema, 1010, "user_11", "Shanghai", "123567891234"), + generate(schema, 1011, "user_12", "Shanghai", "123567891234"), + generate(schema, 1012, "user_13", "Shanghai", "123567891234"), + generate(schema, 1013, "user_14", "Shanghai", "123567891234"), + generate(schema, 1014, "user_15", "Shanghai", "123567891234"), + generate(schema, 1015, "user_16", "Shanghai", "123567891234"), + generate(schema, 1016, "user_17", "Shanghai", "123567891234"), + generate(schema, 1017, "user_18", "Shanghai", "123567891234"), + generate(schema, 1018, "user_19", "Shanghai", "123567891234"), + generate(schema, 1019, "user_20", "Shanghai", "123567891234"), + generate(schema, 2000, "user_21", "Shanghai", "123567891234")) + .map(record -> DataChangeEvent.insertEvent(tableId, record)) + .collect(Collectors.toList()); + } + + private BinaryRecordData generate(Schema schema, Object... fields) { + return (new BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]))) + .generate( + Arrays.stream(fields) + .map( + e -> + (e instanceof String) + ? BinaryStringData.fromString((String) e) + : e) + .toArray()); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java index 31173469b..447fda96a 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java @@ -53,6 +53,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -83,6 +84,7 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecords, MySqlSpl private Predicate capturedTableFilter; private final StoppableChangeEventSourceContext changeEventSourceContext = new StoppableChangeEventSourceContext(); + private final boolean isParsingOnLineSchemaChanges; private static final long READER_CLOSE_TIMEOUT = 30L; @@ -93,6 +95,8 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecords, MySqlSpl this.executorService = Executors.newSingleThreadExecutor(threadFactory); this.currentTaskRunning = true; this.pureBinlogPhaseTables = new HashSet<>(); + this.isParsingOnLineSchemaChanges = + statefulTaskContext.getSourceConfig().isParseOnLineSchemaChanges(); } public void submitSplit(MySqlSplit mySqlSplit) { @@ -148,6 +152,14 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecords, MySqlSpl if (currentTaskRunning) { List<DataChangeEvent> batch = queue.poll(); for (DataChangeEvent event : batch) { + if (isParsingOnLineSchemaChanges) { + Optional<SourceRecord> oscRecord = + parseOnLineSchemaChangeEvent(event.getRecord()); + if (oscRecord.isPresent()) { + sourceRecords.add(oscRecord.get()); + continue; + } + } if (shouldEmit(event.getRecord())) { sourceRecords.add(event.getRecord()); } @@ -195,6 +207,20 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecords, MySqlSpl } } + private Optional<SourceRecord> parseOnLineSchemaChangeEvent(SourceRecord sourceRecord) { + if (RecordUtils.isOnLineSchemaChangeEvent(sourceRecord)) { + // This is a gh-ost initialized schema change event and should be emitted if the + // peeled tableId matches the predicate. + TableId originalTableId = RecordUtils.getTableId(sourceRecord); + TableId peeledTableId = RecordUtils.peelTableId(originalTableId); + if (capturedTableFilter.test(peeledTableId)) { + return Optional.of( + RecordUtils.setTableId(sourceRecord, originalTableId, peeledTableId)); + } + } + return Optional.empty(); + } + /** * Returns the record should emit or not. * diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java index c03aa36b0..3fce1eb61 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java @@ -268,6 +268,12 @@ public class MySqlSourceBuilder<T> { return this; } + /** Whether to parse gh-ost utility generated schema change events. Defaults to false. */ + public MySqlSourceBuilder<T> parseOnLineSchemaChanges(boolean parseOnLineSchemaChanges) { + this.configFactory.parseOnLineSchemaChanges(parseOnLineSchemaChanges); + return this; + } + /** * Build the {@link MySqlSource}. * diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java index dd0ac7896..85079f28c 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java @@ -66,6 +66,7 @@ public class MySqlSourceConfig implements Serializable { private final Properties jdbcProperties; private final Map<ObjectPath, String> chunkKeyColumns; private final boolean skipSnapshotBackfill; + private final boolean parseOnLineSchemaChanges; // -------------------------------------------------------------------------------------------- // Debezium Configurations @@ -99,7 +100,8 @@ public class MySqlSourceConfig implements Serializable { Properties dbzProperties, Properties jdbcProperties, Map<ObjectPath, String> chunkKeyColumns, - boolean skipSnapshotBackfill) { + boolean skipSnapshotBackfill, + boolean parseOnLineSchemaChanges) { this.hostname = checkNotNull(hostname); this.port = port; this.username = checkNotNull(username); @@ -127,6 +129,7 @@ public class MySqlSourceConfig implements Serializable { this.jdbcProperties = jdbcProperties; this.chunkKeyColumns = chunkKeyColumns; this.skipSnapshotBackfill = skipSnapshotBackfill; + this.parseOnLineSchemaChanges = parseOnLineSchemaChanges; } public String getHostname() { @@ -210,6 +213,10 @@ public class MySqlSourceConfig implements Serializable { return closeIdleReaders; } + public boolean isParseOnLineSchemaChanges() { + return parseOnLineSchemaChanges; + } + public Properties getDbzProperties() { return dbzProperties; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java index 8b65055ca..f0ca4cc96 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java @@ -70,6 +70,7 @@ public class MySqlSourceConfigFactory implements Serializable { private Properties dbzProperties; private Map<ObjectPath, String> chunkKeyColumns = new HashMap<>(); private boolean skipSnapshotBackfill = false; + private boolean parseOnLineSchemaChanges = false; public MySqlSourceConfigFactory hostname(String hostname) { this.hostname = hostname; @@ -291,6 +292,12 @@ public class MySqlSourceConfigFactory implements Serializable { return this; } + /** Whether to parse gh-ost/pt-osc utility generated schema change events. Defaults to false. */ + public MySqlSourceConfigFactory parseOnLineSchemaChanges(boolean parseOnLineSchemaChanges) { + this.parseOnLineSchemaChanges = parseOnLineSchemaChanges; + return this; + } + /** Creates a new {@link MySqlSourceConfig} for the given subtask {@code subtaskId}. */ public MySqlSourceConfig createConfig(int subtaskId) { // hard code server name, because we don't need to distinguish it, docs: @@ -384,6 +391,7 @@ public class MySqlSourceConfigFactory implements Serializable { props, jdbcProperties, chunkKeyColumns, - skipSnapshotBackfill); + skipSnapshotBackfill, + parseOnLineSchemaChanges); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java index f3424c8df..87de37e05 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java @@ -262,4 +262,12 @@ public class MySqlSourceOptions { .defaultValue(false) .withDescription( "Whether to skip backfill in snapshot reading phase. If backfill is skipped, changes on captured tables during snapshot phase will be consumed later in binlog reading phase instead of being merged into the snapshot. WARNING: Skipping backfill might lead to data inconsistency because some binlog events happened within the snapshot phase might be replayed (only at-least-once semantic is promised). For example updating an already updated value in snapshot, or del [...] + + @Experimental + public static final ConfigOption<Boolean> PARSE_ONLINE_SCHEMA_CHANGES = + ConfigOptions.key("scan.parse.online.schema.changes.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to parse schema change events generated by gh-ost/pt-osc utilities. Defaults to false."); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java index d85944adb..e6848f1c4 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java @@ -25,7 +25,11 @@ import org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitI import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + import io.debezium.data.Envelope; +import io.debezium.document.Document; import io.debezium.document.DocumentReader; import io.debezium.relational.TableId; import io.debezium.relational.history.HistoryRecord; @@ -50,6 +54,8 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; import static io.debezium.connector.AbstractSourceInfo.DATABASE_NAME_KEY; @@ -384,6 +390,40 @@ public class RecordUtils { return new TableId(dbName, null, tableName); } + public static SourceRecord setTableId( + SourceRecord dataRecord, TableId originalTableId, TableId tableId) { + Struct value = (Struct) dataRecord.value(); + Document historyRecordDocument; + try { + historyRecordDocument = getHistoryRecord(dataRecord).document(); + } catch (IOException e) { + throw new RuntimeException(e); + } + HistoryRecord newHistoryRecord = + new HistoryRecord( + historyRecordDocument.set( + HistoryRecord.Fields.DDL_STATEMENTS, + historyRecordDocument + .get(HistoryRecord.Fields.DDL_STATEMENTS) + .asString() + .replace(originalTableId.table(), tableId.table()))); + + Struct newSource = + value.getStruct(Envelope.FieldName.SOURCE) + .put(DATABASE_NAME_KEY, tableId.catalog()) + .put(TABLE_NAME_KEY, tableId.table()); + return dataRecord.newRecord( + dataRecord.topic(), + dataRecord.kafkaPartition(), + dataRecord.keySchema(), + dataRecord.key(), + dataRecord.valueSchema(), + value.put(Envelope.FieldName.SOURCE, newSource) + .put(HISTORY_RECORD_FIELD, newHistoryRecord.toString()), + dataRecord.timestamp(), + dataRecord.headers()); + } + public static boolean isTableChangeRecord(SourceRecord dataRecord) { Struct value = (Struct) dataRecord.value(); Struct source = value.getStruct(Envelope.FieldName.SOURCE); @@ -489,4 +529,75 @@ public class RecordUtils { } return Optional.empty(); } + + /** + * This utility method checks if given source record is a gh-ost/pt-osc initiated schema change + * event by checking the "alter" ddl. + */ + public static boolean isOnLineSchemaChangeEvent(SourceRecord record) { + if (!isSchemaChangeEvent(record)) { + return false; + } + Struct value = (Struct) record.value(); + ObjectMapper mapper = new ObjectMapper(); + try { + // There will be these schema change events generated in total during one transaction. + // + // gh-ost: + // DROP TABLE IF EXISTS `db`.`_tb1_gho` + // DROP TABLE IF EXISTS `db`.`_tb1_del` + // DROP TABLE IF EXISTS `db`.`_tb1_ghc` + // create /* gh-ost */ table `db`.`_tb1_ghc` ... + // create /* gh-ost */ table `db`.`_tb1_gho` like `db`.`tb1` + // alter /* gh-ost */ table `db`.`_tb1_gho` add column c varchar(255) + // create /* gh-ost */ table `db`.`_tb1_del` ... + // DROP TABLE IF EXISTS `db`.`_tb1_del` + // rename /* gh-ost */ table `db`.`tb1` to `db`.`_tb1_del` + // rename /* gh-ost */ table `db`.`_tb1_gho` to `db`.`tb1` + // DROP TABLE IF EXISTS `db`.`_tb1_ghc` + // DROP TABLE IF EXISTS `db`.`_tb1_del` + // + // pt-osc: + // CREATE TABLE `db`.`_test_tb1_new` + // ALTER TABLE `db`.`_test_tb1_new` add column c varchar(50) + // CREATE TRIGGER `pt_osc_db_test_tb1_del`... + // CREATE TRIGGER `pt_osc_db_test_tb1_upd`... + // CREATE TRIGGER `pt_osc_db_test_tb1_ins`... + // ANALYZE TABLE `db`.`_test_tb1_new` /* pt-online-schema-change */ + // RENAME TABLE `db`.`test_tb1` TO `db`.`_test_tb1_old`, `db`.`_test_tb1_new` TO + // `db`.`test_tb1` + // DROP TABLE IF EXISTS `_test_tb1_old` /* generated by server */ + // DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_del` + // DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_upd` + // DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_ins` + // + // Among all these, we only need the "ALTER" one that happens on the `_gho`/`_new` + // table. + String ddl = + mapper.readTree(value.getString(HISTORY_RECORD_FIELD)) + .get(HistoryRecord.Fields.DDL_STATEMENTS) + .asText() + .toLowerCase(); + if (ddl.startsWith("alter")) { + String tableName = + value.getStruct(Envelope.FieldName.SOURCE).getString(TABLE_NAME_KEY); + return OSC_TABLE_ID_PATTERN.matcher(tableName).matches(); + } + + return false; + } catch (JsonProcessingException e) { + return false; + } + } + + private static final Pattern OSC_TABLE_ID_PATTERN = Pattern.compile("^_(.*)_(gho|new)$"); + + /** This utility method peels out gh-ost/pt-osc mangled tableId to the original one. */ + public static TableId peelTableId(TableId tableId) { + Matcher matchingResult = OSC_TABLE_ID_PATTERN.matcher(tableId.table()); + if (matchingResult.matches()) { + return new TableId(tableId.catalog(), tableId.schema(), matchingResult.group(1)); + } + return tableId; + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java index 242e02da6..8d023ad30 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java @@ -98,6 +98,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat private final Duration heartbeatInterval; private final String chunkKeyColumn; final boolean skipSnapshotBackFill; + final boolean parseOnlineSchemaChanges; // -------------------------------------------------------------------------------------------- // Mutable attributes @@ -135,7 +136,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat Properties jdbcProperties, Duration heartbeatInterval, @Nullable String chunkKeyColumn, - boolean skipSnapshotBackFill) { + boolean skipSnapshotBackFill, + boolean parseOnlineSchemaChanges) { this.physicalSchema = physicalSchema; this.port = port; this.hostname = checkNotNull(hostname); @@ -159,6 +161,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled; this.closeIdleReaders = closeIdleReaders; this.jdbcProperties = jdbcProperties; + this.parseOnlineSchemaChanges = parseOnlineSchemaChanges; // Mutable attributes this.producedDataType = physicalSchema.toPhysicalRowDataType(); this.metadataKeys = Collections.emptyList(); @@ -220,6 +223,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat .heartbeatInterval(heartbeatInterval) .chunkKeyColumn(new ObjectPath(database, tableName), chunkKeyColumn) .skipSnapshotBackfill(skipSnapshotBackFill) + .parseOnLineSchemaChanges(parseOnlineSchemaChanges) .build(); return SourceProvider.of(parallelSource); } else { @@ -305,7 +309,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat jdbcProperties, heartbeatInterval, chunkKeyColumn, - skipSnapshotBackFill); + skipSnapshotBackFill, + parseOnlineSchemaChanges); source.metadataKeys = metadataKeys; source.producedDataType = producedDataType; return source; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java index e435f946a..c3404db42 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java @@ -102,6 +102,8 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory { config.get(MySqlSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); boolean skipSnapshotBackFill = config.get(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP); + boolean parseOnLineSchemaChanges = + config.get(MySqlSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES); if (enableParallelRead) { validatePrimaryKeyIfEnableParallel(physicalSchema, chunkKeyColumn); @@ -145,7 +147,8 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory { JdbcUrlUtils.getJdbcProperties(context.getCatalogTable().getOptions()), heartbeatInterval, chunkKeyColumn, - skipSnapshotBackFill); + skipSnapshotBackFill, + parseOnLineSchemaChanges); } @Override @@ -191,6 +194,7 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory { options.add(MySqlSourceOptions.HEARTBEAT_INTERVAL); options.add(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN); options.add(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP); + options.add(MySqlSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES); return options; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationSourceITCase.java new file mode 100644 index 000000000..848869783 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationSourceITCase.java @@ -0,0 +1,584 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.cdc.common.data.binary.BinaryRecordData; +import org.apache.flink.cdc.common.data.binary.BinaryStringData; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.utils.TestCaseUtils; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema; +import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.types.Row; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.testcontainers.DockerClientFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.sql.Connection; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Random; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.api.common.JobStatus.RUNNING; +import static org.apache.flink.cdc.common.utils.TestCaseUtils.DEFAULT_INTERVAL; +import static org.apache.flink.cdc.common.utils.TestCaseUtils.DEFAULT_TIMEOUT; + +/** + * IT case for Evolving MySQL schema with gh-ost/pt-osc utility. See <a + * href="https://github.com/github/gh-ost">github/gh-ost</a>/<a + * href="https://docs.percona.com/percona-toolkit/pt-online-schema-change.html">doc/pt-osc</a> for + * more details. + */ +public class MySqlOnLineSchemaMigrationSourceITCase extends MySqlSourceTestBase { + private static final MySqlContainer MYSQL8_CONTAINER = + createMySqlContainer(MySqlVersion.V8_0, "docker/server-gtids/expire-seconds/my.cnf"); + + private static final String TEST_USER = "mysqluser"; + private static final String TEST_PASSWORD = "mysqlpw"; + + private static final String PERCONA_TOOLKIT = "perconalab/percona-toolkit:3.5.7"; + + protected static final GenericContainer<?> PERCONA_TOOLKIT_CONTAINER = + createPerconaToolkitContainer(); + + private final UniqueDatabase customerDatabase = + new UniqueDatabase(MYSQL8_CONTAINER, "customer", TEST_USER, TEST_PASSWORD); + + private final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + private final StreamTableEnvironment tEnv = + StreamTableEnvironment.create( + env, EnvironmentSettings.newInstance().inStreamingMode().build()); + + private static final String GH_OST_DOWNLOAD_LINK = + DockerClientFactory.instance().client().versionCmd().exec().getArch().equals("amd64") + ? "https://github.com/github/gh-ost/releases/download/v1.1.6/gh-ost-binary-linux-amd64-20231207144046.tar.gz" + : "https://github.com/github/gh-ost/releases/download/v1.1.6/gh-ost-binary-linux-arm64-20231207144046.tar.gz"; + + @BeforeClass + public static void beforeClass() { + LOG.info("Starting containers..."); + Startables.deepStart(Stream.of(MYSQL8_CONTAINER)).join(); + Startables.deepStart(Stream.of(PERCONA_TOOLKIT_CONTAINER)).join(); + LOG.info("Containers are started."); + } + + @AfterClass + public static void afterClass() { + LOG.info("Stopping containers..."); + MYSQL8_CONTAINER.stop(); + PERCONA_TOOLKIT_CONTAINER.close(); + LOG.info("Containers are stopped."); + } + + @Before + public void before() { + TestValuesTableFactory.clearAllData(); + env.setParallelism(DEFAULT_PARALLELISM); + env.enableCheckpointing(200); + customerDatabase.createAndInitialize(); + System.setOut(new PrintStream(outCaptor)); + } + + @After + public void after() { + customerDatabase.dropDatabase(); + System.setOut(sysOut); + } + + private static void installGhOstCli(Container<?> container) { + try { + execInContainer( + container, + "download gh-ost tarball", + "curl", + "-L", + "-o", + "/tmp/gh-ost.tar.gz", + GH_OST_DOWNLOAD_LINK); + execInContainer( + container, "unzip binary", "tar", "-xzvf", "/tmp/gh-ost.tar.gz", "-C", "/bin"); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + private static GenericContainer<?> createPerconaToolkitContainer() { + GenericContainer<?> perconaToolkit = + new GenericContainer<>(PERCONA_TOOLKIT) + // keep container alive + .withCommand("tail", "-f", "/dev/null") + .withNetwork(NETWORK) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + return perconaToolkit; + } + + private final PrintStream sysOut = System.out; + private final ByteArrayOutputStream outCaptor = new ByteArrayOutputStream(); + + @Test + public void testGhOstSchemaMigrationFromScratch() throws Exception { + LOG.info("Step 1: Install gh-ost command line utility"); + installGhOstCli(MYSQL8_CONTAINER); + + LOG.info("Step 2: Start pipeline job"); + MySqlSource<String> mySqlSource = + MySqlSource.<String>builder() + .hostname(MYSQL8_CONTAINER.getHost()) + .port(MYSQL8_CONTAINER.getDatabasePort()) + .databaseList(customerDatabase.getDatabaseName()) + .tableList(customerDatabase.getDatabaseName() + ".customers") + .username(customerDatabase.getUsername()) + .password(customerDatabase.getPassword()) + .serverId("5401-5404") + .deserializer(new JsonDebeziumDeserializationSchema()) + .serverTimeZone("UTC") + .includeSchemaChanges(true) // output the schema changes as well + .build(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(3000); + env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySqlParallelSource") + .setParallelism(4) + .print() + .setParallelism(1); + + JobClient resultClient = env.executeAsync(); + + TestCaseUtils.repeatedCheck( + () -> resultClient.getJobStatus().get().equals(RUNNING), + DEFAULT_TIMEOUT, + DEFAULT_INTERVAL, + Arrays.asList(InterruptedException.class, NoSuchElementException.class)); + + { + String[] expected = + new String[] { + "{\"id\":101,\"name\":\"user_1\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":102,\"name\":\"user_2\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":103,\"name\":\"user_3\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":109,\"name\":\"user_4\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":110,\"name\":\"user_5\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":111,\"name\":\"user_6\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":118,\"name\":\"user_7\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":121,\"name\":\"user_8\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":123,\"name\":\"user_9\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":1009,\"name\":\"user_10\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":1010,\"name\":\"user_11\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":1011,\"name\":\"user_12\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":1012,\"name\":\"user_13\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":1013,\"name\":\"user_14\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":1014,\"name\":\"user_15\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":1015,\"name\":\"user_16\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":1016,\"name\":\"user_17\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":1017,\"name\":\"user_18\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":1018,\"name\":\"user_19\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":1019,\"name\":\"user_20\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":2000,\"name\":\"user_21\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + }; + TestCaseUtils.repeatedCheck( + () -> Arrays.stream(expected).allMatch(outCaptor.toString()::contains)); + } + + // Wait for a little while until we're in Binlog streaming mode. + Thread.sleep(5_000); + + { + LOG.info("Step 3: Evolve schema with gh-ost - ADD COLUMN"); + execInContainer( + MYSQL8_CONTAINER, + "evolve schema", + "gh-ost", + "--user=" + TEST_USER, + "--password=" + TEST_PASSWORD, + "--database=" + customerDatabase.getDatabaseName(), + "--table=customers", + "--alter=add column ext int first", + "--allow-on-master", // because we don't have a replica + "--initially-drop-old-table", // drop previously generated temporary tables + "--execute"); + + try (Connection connection = customerDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + // The new column `ext` has been inserted now + statement.execute( + String.format( + "INSERT INTO `%s`.`customers` VALUES (17, 10000, 'Alice', 'Beijing', '123567891234');", + customerDatabase.getDatabaseName())); + } + + TestCaseUtils.repeatedCheck( + () -> + outCaptor + .toString() + .contains( + "{\"ext\":17,\"id\":10000,\"name\":\"Alice\",\"address\":\"Beijing\",\"phone_number\":\"123567891234\"}")); + } + + { + LOG.info("Step 4: Evolve schema with gh-ost - MODIFY COLUMN"); + execInContainer( + MYSQL8_CONTAINER, + "evolve schema", + "gh-ost", + "--user=" + TEST_USER, + "--password=" + TEST_PASSWORD, + "--database=" + customerDatabase.getDatabaseName(), + "--table=customers", + "--alter=modify column ext double", + "--allow-on-master", + "--initially-drop-old-table", + "--execute"); + + try (Connection connection = customerDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "INSERT INTO `%s`.`customers` VALUES (2.718281828, 10001, 'Bob', 'Chongqing', '123567891234');", + customerDatabase.getDatabaseName())); + } + + TestCaseUtils.repeatedCheck( + () -> + outCaptor + .toString() + .contains( + "{\"ext\":2.718281828,\"id\":10001,\"name\":\"Bob\",\"address\":\"Chongqing\",\"phone_number\":\"123567891234\"}")); + } + + { + LOG.info("Step 5: Evolve schema with gh-ost - DROP COLUMN"); + execInContainer( + MYSQL8_CONTAINER, + "evolve schema", + "gh-ost", + "--user=" + TEST_USER, + "--password=" + TEST_PASSWORD, + "--database=" + customerDatabase.getDatabaseName(), + "--table=customers", + "--alter=drop column ext", + "--allow-on-master", + "--initially-drop-old-table", + "--execute"); + + try (Connection connection = customerDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "INSERT INTO `%s`.`customers` VALUES (10002, 'Cicada', 'Urumqi', '123567891234');", + customerDatabase.getDatabaseName())); + } + + TestCaseUtils.repeatedCheck( + () -> + outCaptor + .toString() + .contains( + "{\"id\":10002,\"name\":\"Cicada\",\"address\":\"Urumqi\",\"phone_number\":\"123567891234\"}")); + } + resultClient.cancel(); + } + + @Test + public void testPtOscSchemaMigrationFromScratch() throws Exception { + LOG.info("Step 1: Start pipeline job"); + MySqlSource<String> mySqlSource = + MySqlSource.<String>builder() + .hostname(MYSQL8_CONTAINER.getHost()) + .port(MYSQL8_CONTAINER.getDatabasePort()) + .databaseList(customerDatabase.getDatabaseName()) + .tableList(customerDatabase.getDatabaseName() + ".customers") + .username(customerDatabase.getUsername()) + .password(customerDatabase.getPassword()) + .serverId("5401-5404") + .deserializer(new JsonDebeziumDeserializationSchema()) + .serverTimeZone("UTC") + .includeSchemaChanges(true) // output the schema changes as well + .build(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(3000); + env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySqlParallelSource") + .setParallelism(4) + .print() + .setParallelism(1); + + JobClient resultClient = env.executeAsync(); + + TestCaseUtils.repeatedCheck( + () -> resultClient.getJobStatus().get().equals(RUNNING), + DEFAULT_TIMEOUT, + DEFAULT_INTERVAL, + Arrays.asList(InterruptedException.class, NoSuchElementException.class)); + + { + String[] expected = + new String[] { + "{\"id\":101,\"name\":\"user_1\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":102,\"name\":\"user_2\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":103,\"name\":\"user_3\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":109,\"name\":\"user_4\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":110,\"name\":\"user_5\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":111,\"name\":\"user_6\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":118,\"name\":\"user_7\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":121,\"name\":\"user_8\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":123,\"name\":\"user_9\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":1009,\"name\":\"user_10\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":1010,\"name\":\"user_11\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":1011,\"name\":\"user_12\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":1012,\"name\":\"user_13\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":1013,\"name\":\"user_14\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":1014,\"name\":\"user_15\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":1015,\"name\":\"user_16\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":1016,\"name\":\"user_17\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":1017,\"name\":\"user_18\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":1018,\"name\":\"user_19\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":1019,\"name\":\"user_20\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":2000,\"name\":\"user_21\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + }; + TestCaseUtils.repeatedCheck( + () -> Arrays.stream(expected).allMatch(outCaptor.toString()::contains)); + } + + // Wait for a little while until we're in Binlog streaming mode. + Thread.sleep(5000L); + + LOG.info("Step 2: Evolve schema with pt-osc - ADD COLUMN"); + execInContainer( + PERCONA_TOOLKIT_CONTAINER, + "evolve schema", + "pt-online-schema-change", + "--user=" + TEST_USER, + "--host=" + INTER_CONTAINER_MYSQL_ALIAS, + "--password=" + TEST_PASSWORD, + "P=3306,t=customers,D=" + customerDatabase.getDatabaseName(), + "--alter", + "add column ext int FIRST", + "--charset=utf8", + "--recursion-method=NONE", // Do not look for slave nodes + "--print", + "--execute"); + + try (Connection connection = customerDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + // The new column `ext` has been inserted now + statement.execute( + String.format( + "INSERT INTO `%s`.`customers` VALUES (17, 10000, 'Alice', 'Beijing', '123567891234');", + customerDatabase.getDatabaseName())); + + TestCaseUtils.repeatedCheck( + () -> + outCaptor + .toString() + .contains( + "{\"ext\":17,\"id\":10000,\"name\":\"Alice\",\"address\":\"Beijing\",\"phone_number\":\"123567891234\"}")); + } + + LOG.info("Step 3: Evolve schema with pt-osc - MODIFY COLUMN"); + execInContainer( + PERCONA_TOOLKIT_CONTAINER, + "evolve schema", + "pt-online-schema-change", + "--user=" + TEST_USER, + "--host=" + INTER_CONTAINER_MYSQL_ALIAS, + "--password=" + TEST_PASSWORD, + "P=3306,t=customers,D=" + customerDatabase.getDatabaseName(), + "--alter", + "modify column ext double", + "--charset=utf8", + "--recursion-method=NONE", + "--print", + "--execute"); + + try (Connection connection = customerDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "INSERT INTO `%s`.`customers` VALUES (2.718281828, 10001, 'Bob', 'Chongqing', '123567891234');", + customerDatabase.getDatabaseName())); + TestCaseUtils.repeatedCheck( + () -> + outCaptor + .toString() + .contains( + "{\"ext\":2.718281828,\"id\":10001,\"name\":\"Bob\",\"address\":\"Chongqing\",\"phone_number\":\"123567891234\"}")); + } + + LOG.info("Step 4: Evolve schema with pt-osc - DROP COLUMN"); + execInContainer( + PERCONA_TOOLKIT_CONTAINER, + "evolve schema", + "pt-online-schema-change", + "--user=" + TEST_USER, + "--host=" + INTER_CONTAINER_MYSQL_ALIAS, + "--password=" + TEST_PASSWORD, + "P=3306,t=customers,D=" + customerDatabase.getDatabaseName(), + "--alter", + "drop column ext", + "--charset=utf8", + "--recursion-method=NONE", + "--print", + "--execute"); + + try (Connection connection = customerDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "INSERT INTO `%s`.`customers` VALUES (10002, 'Cicada', 'Urumqi', '123567891234');", + customerDatabase.getDatabaseName())); + TestCaseUtils.repeatedCheck( + () -> + outCaptor + .toString() + .contains( + "{\"id\":10002,\"name\":\"Cicada\",\"address\":\"Urumqi\",\"phone_number\":\"123567891234\"}")); + } + } + + private static void execInContainer(Container<?> container, String prompt, String... commands) + throws IOException, InterruptedException { + { + LOG.info( + "Starting to {} with the following command: `{}`", + prompt, + String.join(" ", commands)); + Container.ExecResult execResult = container.execInContainer(commands); + if (execResult.getExitCode() == 0) { + LOG.info("Successfully {}. Stdout: {}", prompt, execResult.getStdout()); + } else { + LOG.error( + "Failed to {}. Exit code: {}, Stdout: {}, Stderr: {}", + prompt, + execResult.getExitCode(), + execResult.getStdout(), + execResult.getStderr()); + throw new IOException("Failed to execute commands: " + String.join(" ", commands)); + } + } + } + + private List<Event> getSnapshotExpected(TableId tableId, Schema schema) { + return Stream.of( + generate(schema, 101, "user_1", "Shanghai", "123567891234"), + generate(schema, 102, "user_2", "Shanghai", "123567891234"), + generate(schema, 103, "user_3", "Shanghai", "123567891234"), + generate(schema, 109, "user_4", "Shanghai", "123567891234"), + generate(schema, 110, "user_5", "Shanghai", "123567891234"), + generate(schema, 111, "user_6", "Shanghai", "123567891234"), + generate(schema, 118, "user_7", "Shanghai", "123567891234"), + generate(schema, 121, "user_8", "Shanghai", "123567891234"), + generate(schema, 123, "user_9", "Shanghai", "123567891234"), + generate(schema, 1009, "user_10", "Shanghai", "123567891234"), + generate(schema, 1010, "user_11", "Shanghai", "123567891234"), + generate(schema, 1011, "user_12", "Shanghai", "123567891234"), + generate(schema, 1012, "user_13", "Shanghai", "123567891234"), + generate(schema, 1013, "user_14", "Shanghai", "123567891234"), + generate(schema, 1014, "user_15", "Shanghai", "123567891234"), + generate(schema, 1015, "user_16", "Shanghai", "123567891234"), + generate(schema, 1016, "user_17", "Shanghai", "123567891234"), + generate(schema, 1017, "user_18", "Shanghai", "123567891234"), + generate(schema, 1018, "user_19", "Shanghai", "123567891234"), + generate(schema, 1019, "user_20", "Shanghai", "123567891234"), + generate(schema, 2000, "user_21", "Shanghai", "123567891234")) + .map(record -> DataChangeEvent.insertEvent(tableId, record)) + .collect(Collectors.toList()); + } + + private BinaryRecordData generate(Schema schema, Object... fields) { + return (new BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]))) + .generate( + Arrays.stream(fields) + .map( + e -> + (e instanceof String) + ? BinaryStringData.fromString((String) e) + : e) + .toArray()); + } + + private String getServerId() { + final Random random = new Random(); + int serverId = random.nextInt(100) + 5400; + return serverId + "-" + (serverId + env.getParallelism()); + } + + protected String getServerId(int base) { + return base + "-" + (base + DEFAULT_PARALLELISM); + } + + private static void waitForSnapshotStarted(String sinkName) throws InterruptedException { + while (sinkSize(sinkName) == 0) { + Thread.sleep(100); + } + } + + private static void waitForSinkSize(String sinkName, int expectedSize) { + TestCaseUtils.repeatedCheck(() -> sinkSize(sinkName) >= expectedSize); + } + + private static int sinkSize(String sinkName) { + synchronized (TestValuesTableFactory.class) { + try { + return TestValuesTableFactory.getRawResults(sinkName).size(); + } catch (IllegalArgumentException e) { + // job is not started yet + return 0; + } + } + } + + private static List<String> fetchRows(Iterator<Row> iter, int size) { + List<String> rows = new ArrayList<>(size); + while (size > 0 && iter.hasNext()) { + Row row = iter.next(); + rows.add(row.toString()); + size--; + } + return rows; + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceTestBase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceTestBase.java index 215dc3ec7..2b8e76500 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceTestBase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceTestBase.java @@ -32,9 +32,11 @@ import org.apache.flink.util.TestLogger; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.ClassRule; import org.junit.Rule; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Network; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.lifecycle.Startables; @@ -55,6 +57,8 @@ public abstract class MySqlSourceTestBase extends TestLogger { protected static final int DEFAULT_PARALLELISM = 4; protected static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V5_7); protected InMemoryReporter metricReporter = InMemoryReporter.createWithRetainedMetrics(); + public static final String INTER_CONTAINER_MYSQL_ALIAS = "mysql"; + @ClassRule public static final Network NETWORK = Network.newNetwork(); @Rule public final MiniClusterWithClientResource miniClusterResource = @@ -96,6 +100,8 @@ public abstract class MySqlSourceTestBase extends TestLogger { .withDatabaseName("flink-test") .withUsername("flinkuser") .withPassword("flinkpw") + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_MYSQL_ALIAS) .withLogConsumer(new Slf4jLogConsumer(LOG)); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlOnLineSchemaMigrationTableITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlOnLineSchemaMigrationTableITCase.java new file mode 100644 index 000000000..92e5fc566 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlOnLineSchemaMigrationTableITCase.java @@ -0,0 +1,598 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.table; + +import org.apache.flink.cdc.common.data.binary.BinaryRecordData; +import org.apache.flink.cdc.common.data.binary.BinaryStringData; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.utils.TestCaseUtils; +import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.testcontainers.DockerClientFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Random; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.api.common.JobStatus.RUNNING; +import static org.apache.flink.cdc.common.utils.TestCaseUtils.DEFAULT_INTERVAL; +import static org.apache.flink.cdc.common.utils.TestCaseUtils.DEFAULT_TIMEOUT; + +/** + * IT case for Evolving MySQL schema with gh-ost/pt-osc utility. See <a + * href="https://github.com/github/gh-ost">github/gh-ost</a>/<a + * href="https://docs.percona.com/percona-toolkit/pt-online-schema-change.html">doc/pt-osc</a> for + * more details. + */ +public class MySqlOnLineSchemaMigrationTableITCase extends MySqlSourceTestBase { + private static final MySqlContainer MYSQL8_CONTAINER = + createMySqlContainer(MySqlVersion.V8_0, "docker/server-gtids/expire-seconds/my.cnf"); + + private static final String TEST_USER = "mysqluser"; + private static final String TEST_PASSWORD = "mysqlpw"; + + private static final String PERCONA_TOOLKIT = "perconalab/percona-toolkit:3.5.7"; + + protected static final GenericContainer<?> PERCONA_TOOLKIT_CONTAINER = + createPerconaToolkitContainer(); + + private final UniqueDatabase customerDatabase = + new UniqueDatabase(MYSQL8_CONTAINER, "customer", TEST_USER, TEST_PASSWORD); + + private final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + private final StreamTableEnvironment tEnv = + StreamTableEnvironment.create( + env, EnvironmentSettings.newInstance().inStreamingMode().build()); + + private static final String GH_OST_DOWNLOAD_LINK = + DockerClientFactory.instance().client().versionCmd().exec().getArch().equals("amd64") + ? "https://github.com/github/gh-ost/releases/download/v1.1.6/gh-ost-binary-linux-amd64-20231207144046.tar.gz" + : "https://github.com/github/gh-ost/releases/download/v1.1.6/gh-ost-binary-linux-arm64-20231207144046.tar.gz"; + + @BeforeClass + public static void beforeClass() { + LOG.info("Starting containers..."); + Startables.deepStart(Stream.of(MYSQL8_CONTAINER)).join(); + Startables.deepStart(Stream.of(PERCONA_TOOLKIT_CONTAINER)).join(); + LOG.info("Containers are started."); + } + + @AfterClass + public static void afterClass() { + LOG.info("Stopping containers..."); + MYSQL8_CONTAINER.stop(); + PERCONA_TOOLKIT_CONTAINER.close(); + LOG.info("Containers are stopped."); + } + + @Before + public void before() { + TestValuesTableFactory.clearAllData(); + env.setParallelism(DEFAULT_PARALLELISM); + env.enableCheckpointing(200); + customerDatabase.createAndInitialize(); + } + + @After + public void after() { + customerDatabase.dropDatabase(); + } + + private static void installGhOstCli(Container<?> container) { + try { + execInContainer( + container, + "download gh-ost tarball", + "curl", + "-L", + "-o", + "/tmp/gh-ost.tar.gz", + GH_OST_DOWNLOAD_LINK); + execInContainer( + container, "unzip binary", "tar", "-xzvf", "/tmp/gh-ost.tar.gz", "-C", "/bin"); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + private static GenericContainer<?> createPerconaToolkitContainer() { + GenericContainer<?> perconaToolkit = + new GenericContainer<>(PERCONA_TOOLKIT) + // keep container alive + .withCommand("tail", "-f", "/dev/null") + .withNetwork(NETWORK) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + return perconaToolkit; + } + + @Test + public void testGhOstSchemaMigrationFromScratch() throws Exception { + LOG.info("Step 1: Install gh-ost command line utility"); + installGhOstCli(MYSQL8_CONTAINER); + + LOG.info("Step 2: Start pipeline job"); + String sourceDDL = + String.format( + "CREATE TABLE debezium_source (" + + " `id` INT NOT NULL," + + " name STRING," + + " address STRING," + + " phone_number STRING," + + " primary key (`id`) not enforced" + + ") WITH (" + + " 'connector' = 'mysql-cdc'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'table-name' = '%s'," + + " 'scan.incremental.snapshot.enabled' = '%s'," + + " 'server-time-zone' = 'UTC'," + + " 'server-id' = '%s'" + + ")", + MYSQL8_CONTAINER.getHost(), + MYSQL8_CONTAINER.getDatabasePort(), + TEST_USER, + TEST_PASSWORD, + customerDatabase.getDatabaseName(), + "customers", + true, + getServerId()); + + tEnv.executeSql(sourceDDL); + + // async submit job + TableResult result = tEnv.executeSql("SELECT * FROM debezium_source"); + + // wait for the source startup, we don't have a better way to wait it, use sleep for now + TestCaseUtils.repeatedCheck( + () -> result.getJobClient().get().getJobStatus().get().equals(RUNNING), + DEFAULT_TIMEOUT, + DEFAULT_INTERVAL, + Arrays.asList(InterruptedException.class, NoSuchElementException.class)); + + CloseableIterator<Row> iterator = result.collect(); + + { + String[] expected = + new String[] { + "+I[101, user_1, Shanghai, 123567891234]", + "+I[102, user_2, Shanghai, 123567891234]", + "+I[103, user_3, Shanghai, 123567891234]", + "+I[109, user_4, Shanghai, 123567891234]", + "+I[110, user_5, Shanghai, 123567891234]", + "+I[111, user_6, Shanghai, 123567891234]", + "+I[118, user_7, Shanghai, 123567891234]", + "+I[121, user_8, Shanghai, 123567891234]", + "+I[123, user_9, Shanghai, 123567891234]", + "+I[1009, user_10, Shanghai, 123567891234]", + "+I[1010, user_11, Shanghai, 123567891234]", + "+I[1011, user_12, Shanghai, 123567891234]", + "+I[1012, user_13, Shanghai, 123567891234]", + "+I[1013, user_14, Shanghai, 123567891234]", + "+I[1014, user_15, Shanghai, 123567891234]", + "+I[1015, user_16, Shanghai, 123567891234]", + "+I[1016, user_17, Shanghai, 123567891234]", + "+I[1017, user_18, Shanghai, 123567891234]", + "+I[1018, user_19, Shanghai, 123567891234]", + "+I[1019, user_20, Shanghai, 123567891234]", + "+I[2000, user_21, Shanghai, 123567891234]" + }; + assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length)); + } + + // Wait for a little while until we're in Binlog streaming mode. + Thread.sleep(5_000); + + { + LOG.info("Step 3: Evolve schema with gh-ost - ADD COLUMN"); + execInContainer( + MYSQL8_CONTAINER, + "evolve schema", + "gh-ost", + "--user=" + TEST_USER, + "--password=" + TEST_PASSWORD, + "--database=" + customerDatabase.getDatabaseName(), + "--table=customers", + "--alter=add column ext int first", + "--allow-on-master", // because we don't have a replica + "--initially-drop-old-table", // drop previously generated temporary tables + "--execute"); + + try (Connection connection = customerDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + // The new column `ext` has been inserted now + statement.execute( + String.format( + "INSERT INTO `%s`.`customers` VALUES (17, 10000, 'Alice', 'Beijing', '123567891234');", + customerDatabase.getDatabaseName())); + } + + String[] expected = + new String[] { + "+I[10000, Alice, Beijing, 123567891234]", + }; + assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length)); + } + + { + LOG.info("Step 4: Evolve schema with gh-ost - MODIFY COLUMN"); + execInContainer( + MYSQL8_CONTAINER, + "evolve schema", + "gh-ost", + "--user=" + TEST_USER, + "--password=" + TEST_PASSWORD, + "--database=" + customerDatabase.getDatabaseName(), + "--table=customers", + "--alter=modify column ext double", + "--allow-on-master", + "--initially-drop-old-table", + "--execute"); + + try (Connection connection = customerDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "INSERT INTO `%s`.`customers` VALUES (2.718281828, 10001, 'Bob', 'Chongqing', '123567891234');", + customerDatabase.getDatabaseName())); + } + + String[] expected = + new String[] { + "+I[10001, Bob, Chongqing, 123567891234]", + }; + assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length)); + } + + { + LOG.info("Step 5: Evolve schema with gh-ost - DROP COLUMN"); + execInContainer( + MYSQL8_CONTAINER, + "evolve schema", + "gh-ost", + "--user=" + TEST_USER, + "--password=" + TEST_PASSWORD, + "--database=" + customerDatabase.getDatabaseName(), + "--table=customers", + "--alter=drop column ext", + "--allow-on-master", + "--initially-drop-old-table", + "--execute"); + + try (Connection connection = customerDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "INSERT INTO `%s`.`customers` VALUES (10002, 'Cicada', 'Urumqi', '123567891234');", + customerDatabase.getDatabaseName())); + } + + String[] expected = + new String[] { + "+I[10002, Cicada, Urumqi, 123567891234]", + }; + assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length)); + } + } + + @Test + public void testPtOscSchemaMigrationFromScratch() throws Exception { + LOG.info("Step 1: Start pipeline job"); + String sourceDDL = + String.format( + "CREATE TABLE debezium_source (" + + " `id` INT NOT NULL," + + " name STRING," + + " address STRING," + + " phone_number STRING," + + " primary key (`id`) not enforced" + + ") WITH (" + + " 'connector' = 'mysql-cdc'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'table-name' = '%s'," + + " 'scan.incremental.snapshot.enabled' = '%s'," + + " 'server-time-zone' = 'UTC'," + + " 'server-id' = '%s'" + + ")", + MYSQL8_CONTAINER.getHost(), + MYSQL8_CONTAINER.getDatabasePort(), + TEST_USER, + TEST_PASSWORD, + customerDatabase.getDatabaseName(), + "customers", + true, + getServerId()); + + tEnv.executeSql(sourceDDL); + + // async submit job + TableResult result = tEnv.executeSql("SELECT * FROM debezium_source"); + + // wait for the source startup, we don't have a better way to wait it, use sleep for now + TestCaseUtils.repeatedCheck( + () -> result.getJobClient().get().getJobStatus().get().equals(RUNNING), + DEFAULT_TIMEOUT, + DEFAULT_INTERVAL, + Arrays.asList(InterruptedException.class, NoSuchElementException.class)); + + CloseableIterator<Row> iterator = result.collect(); + { + String[] expected = + new String[] { + "+I[101, user_1, Shanghai, 123567891234]", + "+I[102, user_2, Shanghai, 123567891234]", + "+I[103, user_3, Shanghai, 123567891234]", + "+I[109, user_4, Shanghai, 123567891234]", + "+I[110, user_5, Shanghai, 123567891234]", + "+I[111, user_6, Shanghai, 123567891234]", + "+I[118, user_7, Shanghai, 123567891234]", + "+I[121, user_8, Shanghai, 123567891234]", + "+I[123, user_9, Shanghai, 123567891234]", + "+I[1009, user_10, Shanghai, 123567891234]", + "+I[1010, user_11, Shanghai, 123567891234]", + "+I[1011, user_12, Shanghai, 123567891234]", + "+I[1012, user_13, Shanghai, 123567891234]", + "+I[1013, user_14, Shanghai, 123567891234]", + "+I[1014, user_15, Shanghai, 123567891234]", + "+I[1015, user_16, Shanghai, 123567891234]", + "+I[1016, user_17, Shanghai, 123567891234]", + "+I[1017, user_18, Shanghai, 123567891234]", + "+I[1018, user_19, Shanghai, 123567891234]", + "+I[1019, user_20, Shanghai, 123567891234]", + "+I[2000, user_21, Shanghai, 123567891234]" + }; + assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length)); + } + + // Wait for a little while until we're in Binlog streaming mode. + Thread.sleep(5_000); + + LOG.info("Step 2: Evolve schema with pt-osc - ADD COLUMN"); + execInContainer( + PERCONA_TOOLKIT_CONTAINER, + "evolve schema", + "pt-online-schema-change", + "--user=" + TEST_USER, + "--host=" + INTER_CONTAINER_MYSQL_ALIAS, + "--password=" + TEST_PASSWORD, + "P=3306,t=customers,D=" + customerDatabase.getDatabaseName(), + "--alter", + "add column ext int FIRST", + "--charset=utf8", + "--recursion-method=NONE", // Do not look for slave nodes + "--print", + "--execute"); + + try (Connection connection = customerDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + // The new column `ext` has been inserted now + statement.execute( + String.format( + "INSERT INTO `%s`.`customers` VALUES (17, 10000, 'Alice', 'Beijing', '123567891234');", + customerDatabase.getDatabaseName())); + + String[] expected = + new String[] { + "+I[10000, Alice, Beijing, 123567891234]", + }; + assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length)); + } + + LOG.info("Step 3: Evolve schema with pt-osc - MODIFY COLUMN"); + execInContainer( + PERCONA_TOOLKIT_CONTAINER, + "evolve schema", + "pt-online-schema-change", + "--user=" + TEST_USER, + "--host=" + INTER_CONTAINER_MYSQL_ALIAS, + "--password=" + TEST_PASSWORD, + "P=3306,t=customers,D=" + customerDatabase.getDatabaseName(), + "--alter", + "modify column ext double", + "--charset=utf8", + "--recursion-method=NONE", + "--print", + "--execute"); + + try (Connection connection = customerDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "INSERT INTO `%s`.`customers` VALUES (2.718281828, 10001, 'Bob', 'Chongqing', '123567891234');", + customerDatabase.getDatabaseName())); + String[] expected = + new String[] { + "+I[10001, Bob, Chongqing, 123567891234]", + }; + assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length)); + } + + LOG.info("Step 4: Evolve schema with pt-osc - DROP COLUMN"); + execInContainer( + PERCONA_TOOLKIT_CONTAINER, + "evolve schema", + "pt-online-schema-change", + "--user=" + TEST_USER, + "--host=" + INTER_CONTAINER_MYSQL_ALIAS, + "--password=" + TEST_PASSWORD, + "P=3306,t=customers,D=" + customerDatabase.getDatabaseName(), + "--alter", + "drop column ext", + "--charset=utf8", + "--recursion-method=NONE", + "--print", + "--execute"); + + try (Connection connection = customerDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "INSERT INTO `%s`.`customers` VALUES (10002, 'Cicada', 'Urumqi', '123567891234');", + customerDatabase.getDatabaseName())); + String[] expected = + new String[] { + "+I[10002, Cicada, Urumqi, 123567891234]", + }; + assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length)); + } + } + + private static void execInContainer(Container<?> container, String prompt, String... commands) + throws IOException, InterruptedException { + { + LOG.info( + "Starting to {} with the following command: `{}`", + prompt, + String.join(" ", commands)); + Container.ExecResult execResult = container.execInContainer(commands); + if (execResult.getExitCode() == 0) { + LOG.info("Successfully {}. Stdout: {}", prompt, execResult.getStdout()); + } else { + LOG.error( + "Failed to {}. Exit code: {}, Stdout: {}, Stderr: {}", + prompt, + execResult.getExitCode(), + execResult.getStdout(), + execResult.getStderr()); + throw new IOException("Failed to execute commands: " + String.join(" ", commands)); + } + } + } + + private List<Event> getSnapshotExpected(TableId tableId, Schema schema) { + return Stream.of( + generate(schema, 101, "user_1", "Shanghai", "123567891234"), + generate(schema, 102, "user_2", "Shanghai", "123567891234"), + generate(schema, 103, "user_3", "Shanghai", "123567891234"), + generate(schema, 109, "user_4", "Shanghai", "123567891234"), + generate(schema, 110, "user_5", "Shanghai", "123567891234"), + generate(schema, 111, "user_6", "Shanghai", "123567891234"), + generate(schema, 118, "user_7", "Shanghai", "123567891234"), + generate(schema, 121, "user_8", "Shanghai", "123567891234"), + generate(schema, 123, "user_9", "Shanghai", "123567891234"), + generate(schema, 1009, "user_10", "Shanghai", "123567891234"), + generate(schema, 1010, "user_11", "Shanghai", "123567891234"), + generate(schema, 1011, "user_12", "Shanghai", "123567891234"), + generate(schema, 1012, "user_13", "Shanghai", "123567891234"), + generate(schema, 1013, "user_14", "Shanghai", "123567891234"), + generate(schema, 1014, "user_15", "Shanghai", "123567891234"), + generate(schema, 1015, "user_16", "Shanghai", "123567891234"), + generate(schema, 1016, "user_17", "Shanghai", "123567891234"), + generate(schema, 1017, "user_18", "Shanghai", "123567891234"), + generate(schema, 1018, "user_19", "Shanghai", "123567891234"), + generate(schema, 1019, "user_20", "Shanghai", "123567891234"), + generate(schema, 2000, "user_21", "Shanghai", "123567891234")) + .map(record -> DataChangeEvent.insertEvent(tableId, record)) + .collect(Collectors.toList()); + } + + private BinaryRecordData generate(Schema schema, Object... fields) { + return (new BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]))) + .generate( + Arrays.stream(fields) + .map( + e -> + (e instanceof String) + ? BinaryStringData.fromString((String) e) + : e) + .toArray()); + } + + private String getServerId() { + final Random random = new Random(); + int serverId = random.nextInt(100) + 5400; + return serverId + "-" + (serverId + env.getParallelism()); + } + + protected String getServerId(int base) { + return base + "-" + (base + DEFAULT_PARALLELISM); + } + + private static void waitForSnapshotStarted(String sinkName) throws InterruptedException { + while (sinkSize(sinkName) == 0) { + Thread.sleep(100); + } + } + + private static void waitForSinkSize(String sinkName, int expectedSize) + throws InterruptedException { + while (sinkSize(sinkName) < expectedSize) { + Thread.sleep(100); + } + } + + private static int sinkSize(String sinkName) { + synchronized (TestValuesTableFactory.class) { + try { + return TestValuesTableFactory.getRawResults(sinkName).size(); + } catch (IllegalArgumentException e) { + // job is not started yet + return 0; + } + } + } + + private static List<String> fetchRows(Iterator<Row> iter, int size) { + List<String> rows = new ArrayList<>(size); + while (size > 0 && iter.hasNext()) { + Row row = iter.next(); + rows.add(row.toString()); + size--; + } + return rows; + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java index 8339a1db5..e5acc934f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java @@ -51,6 +51,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOpt import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_MAX_RETRIES; import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_TIMEOUT; import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.HEARTBEAT_INTERVAL; +import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES; import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP; import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED; @@ -127,7 +128,8 @@ public class MySqlTableSourceFactoryTest { new Properties(), HEARTBEAT_INTERVAL.defaultValue(), null, - SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue()); + SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), + PARSE_ONLINE_SCHEMA_CHANGES.defaultValue()); assertEquals(expectedSource, actualSource); } @@ -173,7 +175,8 @@ public class MySqlTableSourceFactoryTest { new Properties(), HEARTBEAT_INTERVAL.defaultValue(), "testCol", - SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue()); + SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), + PARSE_ONLINE_SCHEMA_CHANGES.defaultValue()); assertEquals(expectedSource, actualSource); } @@ -215,7 +218,8 @@ public class MySqlTableSourceFactoryTest { new Properties(), HEARTBEAT_INTERVAL.defaultValue(), null, - SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue()); + SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), + PARSE_ONLINE_SCHEMA_CHANGES.defaultValue()); assertEquals(expectedSource, actualSource); } @@ -255,7 +259,8 @@ public class MySqlTableSourceFactoryTest { new Properties(), HEARTBEAT_INTERVAL.defaultValue(), null, - SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue()); + SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), + PARSE_ONLINE_SCHEMA_CHANGES.defaultValue()); assertEquals(expectedSource, actualSource); } @@ -311,7 +316,8 @@ public class MySqlTableSourceFactoryTest { jdbcProperties, Duration.ofMillis(15213), "testCol", - true); + true, + PARSE_ONLINE_SCHEMA_CHANGES.defaultValue()); assertEquals(expectedSource, actualSource); assertTrue(actualSource instanceof MySqlTableSource); MySqlTableSource actualMySqlTableSource = (MySqlTableSource) actualSource; @@ -365,7 +371,8 @@ public class MySqlTableSourceFactoryTest { new Properties(), HEARTBEAT_INTERVAL.defaultValue(), null, - SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue()); + SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), + PARSE_ONLINE_SCHEMA_CHANGES.defaultValue()); assertEquals(expectedSource, actualSource); } @@ -403,7 +410,8 @@ public class MySqlTableSourceFactoryTest { new Properties(), HEARTBEAT_INTERVAL.defaultValue(), null, - SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue()); + SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), + PARSE_ONLINE_SCHEMA_CHANGES.defaultValue()); assertEquals(expectedSource, actualSource); } @@ -442,7 +450,8 @@ public class MySqlTableSourceFactoryTest { new Properties(), HEARTBEAT_INTERVAL.defaultValue(), null, - SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue()); + SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), + PARSE_ONLINE_SCHEMA_CHANGES.defaultValue()); assertEquals(expectedSource, actualSource); } @@ -482,7 +491,8 @@ public class MySqlTableSourceFactoryTest { new Properties(), HEARTBEAT_INTERVAL.defaultValue(), null, - SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue()); + SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), + PARSE_ONLINE_SCHEMA_CHANGES.defaultValue()); assertEquals(expectedSource, actualSource); } @@ -520,7 +530,8 @@ public class MySqlTableSourceFactoryTest { new Properties(), HEARTBEAT_INTERVAL.defaultValue(), null, - SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue()); + SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), + PARSE_ONLINE_SCHEMA_CHANGES.defaultValue()); assertEquals(expectedSource, actualSource); } @@ -563,7 +574,8 @@ public class MySqlTableSourceFactoryTest { new Properties(), HEARTBEAT_INTERVAL.defaultValue(), null, - SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue()); + SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), + PARSE_ONLINE_SCHEMA_CHANGES.defaultValue()); expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType(); expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name");