This is an automated email from the ASF dual-hosted git repository.

gongzhongqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 39fd00ce4 [FLINK-36610] MySQL CDC supports parsing gh-ost / pt-osc 
generated schema changes (#3668)
39fd00ce4 is described below

commit 39fd00ce49dd6683a1c57d81acd06b1611e5b579
Author: yuxiqian <34335406+yuxiq...@users.noreply.github.com>
AuthorDate: Mon Jan 13 16:16:56 2025 +0800

    [FLINK-36610] MySQL CDC supports parsing gh-ost / pt-osc generated schema 
changes (#3668)
    
    Co-authored-by: MOBIN-F <18814118...@163.com>
---
 .../docs/connectors/flink-sources/mysql-cdc.md     |  12 +
 .../docs/connectors/pipeline-connectors/mysql.md   |  12 +
 .../docs/connectors/flink-sources/mysql-cdc.md     |  12 +
 .../docs/connectors/pipeline-connectors/mysql.md   |  12 +
 .../flink/cdc/common/utils/TestCaseUtils.java      |  94 ++++
 .../mysql/factory/MySqlDataSourceFactory.java      |   5 +-
 .../mysql/source/MySqlDataSourceOptions.java       |   8 +
 .../source/MySqlOnLineSchemaMigrationITCase.java   | 591 ++++++++++++++++++++
 .../mysql/debezium/reader/BinlogSplitReader.java   |  26 +
 .../mysql/source/MySqlSourceBuilder.java           |   6 +
 .../mysql/source/config/MySqlSourceConfig.java     |   9 +-
 .../source/config/MySqlSourceConfigFactory.java    |  10 +-
 .../mysql/source/config/MySqlSourceOptions.java    |   8 +
 .../connectors/mysql/source/utils/RecordUtils.java | 111 ++++
 .../connectors/mysql/table/MySqlTableSource.java   |   9 +-
 .../mysql/table/MySqlTableSourceFactory.java       |   6 +-
 .../MySqlOnLineSchemaMigrationSourceITCase.java    | 584 ++++++++++++++++++++
 .../mysql/source/MySqlSourceTestBase.java          |   6 +
 .../MySqlOnLineSchemaMigrationTableITCase.java     | 598 +++++++++++++++++++++
 .../mysql/table/MySqlTableSourceFactoryTest.java   |  34 +-
 20 files changed, 2136 insertions(+), 17 deletions(-)

diff --git a/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md 
b/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md
index aa14fc440..a2a6f6ad7 100644
--- a/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md
+++ b/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md
@@ -377,6 +377,18 @@ Flink SQL> SELECT * FROM orders;
       <td>Boolean</td>
       <td>是否在快照结束后关闭空闲的 Reader。 此特性需要 flink 版本大于等于 1.14 并且 
'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 需要设置为 
true。</td>
     </tr>
+    <tr>
+      <td>scan.parse.online.schema.changes.enabled</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>
+        是否尝试解析由 <a href="https://github.com/github/gh-ost";>gh-ost</a> 或 <a 
href="https://docs.percona.com/percona-toolkit/pt-online-schema-change.html";>pt-osc</a>
 工具生成的表结构变更事件。
+        这些工具会在变更表结构时,将变更语句应用到“影子表”之上,并稍后将其与主表进行交换,以达到表结构变更的目的。
+        <br>
+        这是一项实验性功能。
+      </td>
+    </tr>
     </tbody>
 </table>
 </div>
diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md 
b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
index a5f0924c3..2ff209442 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
@@ -286,6 +286,18 @@ pipeline:
           scan.binlog.newly-added-table.enabled: 只在 binlog 读取阶段读取新增表的增量数据。
       </td>
     </tr>
+    <tr>
+      <td>scan.parse.online.schema.changes.enabled</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>
+        是否尝试解析由 <a href="https://github.com/github/gh-ost";>gh-ost</a> 或 <a 
href="https://docs.percona.com/percona-toolkit/pt-online-schema-change.html";>pt-osc</a>
 工具生成的表结构变更事件。
+        这些工具会在变更表结构时,将变更语句应用到“影子表”之上,并稍后将其与主表进行交换,以达到表结构变更的目的。
+        <br>
+        这是一项实验性功能。
+      </td>
+    </tr>
     </tbody>
 </table>
 </div>
diff --git a/docs/content/docs/connectors/flink-sources/mysql-cdc.md 
b/docs/content/docs/connectors/flink-sources/mysql-cdc.md
index f335f047a..60d81baa2 100644
--- a/docs/content/docs/connectors/flink-sources/mysql-cdc.md
+++ b/docs/content/docs/connectors/flink-sources/mysql-cdc.md
@@ -391,6 +391,18 @@ During a snapshot operation, the connector will query each 
included table to pro
           hex: The binary data type is converted to a hexadecimal string and 
transmitted.
       The default value is none. Depending on your requirements and data 
types, you can choose the appropriate processing mode. If your database 
contains a large number of binary data types, it is recommended to use base64 
or hex mode to make it easier to handle during transmission.</td> 
     </tr>
+    <tr>
+      <td>scan.parse.online.schema.changes.enabled</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>
+        Whether to parse "online" schema changes generated by <a 
href="https://github.com/github/gh-ost";>gh-ost</a> or <a 
href="https://docs.percona.com/percona-toolkit/pt-online-schema-change.html";>pt-osc</a>.
+        Schema change events are applied to a "shadow" table and then swapped 
with the original table later.
+        <br>
+        This is an experimental feature, and subject to change in the future.
+      </td> 
+    </tr>
     </tbody>
 </table>
 </div>
diff --git a/docs/content/docs/connectors/pipeline-connectors/mysql.md 
b/docs/content/docs/connectors/pipeline-connectors/mysql.md
index 3cc5c17a1..29c6549e5 100644
--- a/docs/content/docs/connectors/pipeline-connectors/mysql.md
+++ b/docs/content/docs/connectors/pipeline-connectors/mysql.md
@@ -293,6 +293,18 @@ pipeline:
           scan.binlog.newly-added-table.enabled: only do binlog-reading for 
newly added table during binlog reading phase.
       </td>
     </tr>
+    <tr>
+      <td>scan.parse.online.schema.changes.enabled</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>
+        Whether to parse "online" schema changes generated by <a 
href="https://github.com/github/gh-ost";>gh-ost</a> or <a 
href="https://docs.percona.com/percona-toolkit/pt-online-schema-change.html";>pt-osc</a>.
+        Schema change events are applied to a "shadow" table and then swapped 
with the original table later.
+        <br>
+        This is an experimental feature, and subject to change in the future.
+      </td> 
+    </tr>
     </tbody>
 </table>
 </div>
diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/TestCaseUtils.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/TestCaseUtils.java
new file mode 100644
index 000000000..f179e779c
--- /dev/null
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/TestCaseUtils.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.utils;
+
+import org.apache.flink.util.function.SupplierWithException;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+/** Some utility methods for creating repeated-checking test cases. */
+public class TestCaseUtils {
+
+    public static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(30);
+    public static final Duration DEFAULT_INTERVAL = Duration.ofSeconds(1);
+
+    /** Fetch with a ({@code timeout}, {@code interval}) duration. */
+    public static void repeatedCheck(Supplier<Boolean> fetcher) {
+        repeatedCheck(fetcher, DEFAULT_TIMEOUT);
+    }
+
+    /** Fetch with a ({@code timeout}, {@code interval}) duration. */
+    public static void repeatedCheck(Supplier<Boolean> fetcher, Duration 
timeout) {
+        repeatedCheck(fetcher, timeout, DEFAULT_INTERVAL);
+    }
+
+    /** Fetch with a ({@code timeout}, {@code interval}) duration. */
+    public static void repeatedCheck(
+            Supplier<Boolean> fetcher, Duration timeout, Duration interval) {
+        repeatedCheck(fetcher::get, timeout, interval, 
Collections.emptyList());
+    }
+
+    /** Fetch and wait with a ({@code timeout}, {@code interval}) duration. */
+    public static <T> void repeatedCheck(
+            Supplier<T> fetcher, Predicate<T> validator, Duration timeout, 
Duration interval) {
+        repeatedCheckAndValidate(
+                fetcher::get, validator, timeout, interval, 
Collections.emptyList());
+    }
+
+    /** Waiting for fetching values with a ({@code timeout}, {@code interval}) 
duration. */
+    public static void repeatedCheck(
+            SupplierWithException<Boolean, Throwable> fetcher,
+            Duration timeout,
+            Duration interval,
+            List<Class<? extends Throwable>> allowedThrowsList) {
+        repeatedCheckAndValidate(fetcher, b -> b, timeout, interval, 
allowedThrowsList);
+    }
+
+    /** Fetch and validate, with a ({@code timeout}, {@code interval}) 
duration. */
+    public static <T> void repeatedCheckAndValidate(
+            SupplierWithException<T, Throwable> fetcher,
+            Predicate<T> validator,
+            Duration timeout,
+            Duration interval,
+            List<Class<? extends Throwable>> allowedThrowsList) {
+
+        long start = System.currentTimeMillis();
+        while (System.currentTimeMillis() - start < timeout.toMillis()) {
+            try {
+                if (validator.test(fetcher.get())) {
+                    return;
+                }
+            } catch (Throwable t) {
+                if (allowedThrowsList.stream()
+                        .noneMatch(clazz -> 
clazz.isAssignableFrom(t.getClass()))) {
+                    throw new RuntimeException("Fetcher has thrown an 
unexpected exception: ", t);
+                }
+            }
+            try {
+                Thread.sleep(interval.toMillis());
+            } catch (InterruptedException ignored) {
+                // ignored
+            }
+        }
+        throw new RuntimeException("Timeout when waiting for state to be 
ready.");
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
index 3d89f18c0..408be27e9 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
@@ -67,6 +67,7 @@ import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOption
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HEARTBEAT_INTERVAL;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.METADATA_LIST;
+import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
@@ -139,6 +140,7 @@ public class MySqlDataSourceFactory implements 
DataSourceFactory {
         boolean scanNewlyAddedTableEnabled = 
config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
         boolean scanBinlogNewlyAddedTableEnabled =
                 config.get(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED);
+        boolean isParsingOnLineSchemaChanges = 
config.get(PARSE_ONLINE_SCHEMA_CHANGES);
 
         validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 
1);
         validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
@@ -175,7 +177,8 @@ public class MySqlDataSourceFactory implements 
DataSourceFactory {
                         .includeSchemaChanges(includeSchemaChanges)
                         .debeziumProperties(getDebeziumProperties(configMap))
                         .jdbcProperties(getJdbcProperties(configMap))
-                        
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled);
+                        .scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
+                        
.parseOnLineSchemaChanges(isParsingOnLineSchemaChanges);
 
         List<TableId> tableIds = 
MySqlSchemaUtils.listTables(configFactory.createConfig(0), null);
 
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
index b2284ed1c..aa682c211 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
@@ -281,4 +281,12 @@ public class MySqlDataSourceOptions {
                     .withDescription(
                             "List of readable metadata from SourceRecord to be 
passed to downstream, split by `,`. "
                                     + "Available readable metadata are: 
op_ts.");
+
+    @Experimental
+    public static final ConfigOption<Boolean> PARSE_ONLINE_SCHEMA_CHANGES =
+            ConfigOptions.key("scan.parse.online.schema.changes.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to parse schema change events generated 
by gh-ost/pt-osc utilities. Defaults to false.");
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationITCase.java
new file mode 100644
index 000000000..544fc969e
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationITCase.java
@@ -0,0 +1,591 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.source;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.AddColumnEvent;
+import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.DropColumnEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.PhysicalColumn;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.source.FlinkSourceProvider;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory;
+import 
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
+import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.testcontainers.DockerClientFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCHEMA_CHANGE_ENABLED;
+import static 
org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD;
+import static 
org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_USER;
+import static 
org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.fetchResults;
+import static 
org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.getServerId;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * IT case for Evolving MySQL schema with gh-ost/pt-osc utility. See <a
+ * href="https://github.com/github/gh-ost";>github/gh-ost</a>/<a
+ * 
href="https://docs.percona.com/percona-toolkit/pt-online-schema-change.html";>doc/pt-osc</a>
 for
+ * more details.
+ */
+public class MySqlOnLineSchemaMigrationITCase extends MySqlSourceTestBase {
+    private static final MySqlContainer MYSQL8_CONTAINER =
+            createMySqlContainer(MySqlVersion.V8_0, 
"docker/server-gtids/expire-seconds/my.cnf");
+
+    private static final String PERCONA_TOOLKIT = 
"perconalab/percona-toolkit:3.5.7";
+
+    protected static final GenericContainer<?> PERCONA_TOOLKIT_CONTAINER =
+            createPerconaToolkitContainer();
+
+    private final UniqueDatabase customerDatabase =
+            new UniqueDatabase(MYSQL8_CONTAINER, "customer", TEST_USER, 
TEST_PASSWORD);
+
+    private final StreamExecutionEnvironment env =
+            StreamExecutionEnvironment.getExecutionEnvironment();
+
+    private static final String GH_OST_DOWNLOAD_LINK =
+            
DockerClientFactory.instance().client().versionCmd().exec().getArch().equals("amd64")
+                    ? 
"https://github.com/github/gh-ost/releases/download/v1.1.6/gh-ost-binary-linux-amd64-20231207144046.tar.gz";
+                    : 
"https://github.com/github/gh-ost/releases/download/v1.1.6/gh-ost-binary-linux-arm64-20231207144046.tar.gz";;
+
+    @BeforeClass
+    public static void beforeClass() {
+        LOG.info("Starting MySql8 containers...");
+        Startables.deepStart(Stream.of(MYSQL8_CONTAINER)).join();
+        Startables.deepStart(Stream.of(PERCONA_TOOLKIT_CONTAINER)).join();
+        LOG.info("Container MySql8 is started.");
+    }
+
+    @AfterClass
+    public static void afterClass() {
+        LOG.info("Stopping MySql8 containers...");
+        MYSQL8_CONTAINER.stop();
+        PERCONA_TOOLKIT_CONTAINER.stop();
+        LOG.info("Container MySql8 is stopped.");
+    }
+
+    @Before
+    public void before() {
+        customerDatabase.createAndInitialize();
+        TestValuesTableFactory.clearAllData();
+        env.setParallelism(4);
+        env.enableCheckpointing(200);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+    }
+
+    @After
+    public void after() {
+        customerDatabase.dropDatabase();
+    }
+
+    private static void installGhOstCli(Container<?> container) {
+        try {
+            execInContainer(
+                    container,
+                    "download gh-ost tarball",
+                    "curl",
+                    "-L",
+                    "-o",
+                    "/tmp/gh-ost.tar.gz",
+                    GH_OST_DOWNLOAD_LINK);
+            execInContainer(
+                    container, "unzip binary", "tar", "-xzvf", 
"/tmp/gh-ost.tar.gz", "-C", "/bin");
+        } catch (IOException | InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static GenericContainer<?> createPerconaToolkitContainer() {
+        GenericContainer<?> perconaToolkit =
+                new GenericContainer<>(PERCONA_TOOLKIT)
+                        // keep container alive
+                        .withCommand("tail", "-f", "/dev/null")
+                        .withNetwork(NETWORK)
+                        .withLogConsumer(new Slf4jLogConsumer(LOG));
+        return perconaToolkit;
+    }
+
+    @Test
+    public void testGhOstSchemaMigrationFromScratch() throws Exception {
+        LOG.info("Step 1: Install gh-ost command line utility");
+        installGhOstCli(MYSQL8_CONTAINER);
+
+        LOG.info("Step 2: Start pipeline job");
+        env.setParallelism(1);
+        TableId tableId = TableId.tableId(customerDatabase.getDatabaseName(), 
"customers");
+        MySqlSourceConfigFactory configFactory =
+                new MySqlSourceConfigFactory()
+                        .hostname(MYSQL8_CONTAINER.getHost())
+                        .port(MYSQL8_CONTAINER.getDatabasePort())
+                        .username(TEST_USER)
+                        .password(TEST_PASSWORD)
+                        .databaseList(customerDatabase.getDatabaseName())
+                        .tableList(customerDatabase.getDatabaseName() + 
"\\.customers")
+                        .startupOptions(StartupOptions.initial())
+                        .serverId(getServerId(env.getParallelism()))
+                        .serverTimeZone("UTC")
+                        
.includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue())
+                        .parseOnLineSchemaChanges(true);
+
+        FlinkSourceProvider sourceProvider =
+                (FlinkSourceProvider) new 
MySqlDataSource(configFactory).getEventSourceProvider();
+        CloseableIterator<Event> events =
+                env.fromSource(
+                                sourceProvider.getSource(),
+                                WatermarkStrategy.noWatermarks(),
+                                MySqlDataSourceFactory.IDENTIFIER,
+                                new EventTypeInfo())
+                        .executeAndCollect();
+        Thread.sleep(5_000);
+
+        List<Event> expected = new ArrayList<>();
+        Schema schemaV1 =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT().notNull())
+                        .physicalColumn("name", 
DataTypes.VARCHAR(255).notNull(), null, "flink")
+                        .physicalColumn("address", DataTypes.VARCHAR(1024))
+                        .physicalColumn("phone_number", DataTypes.VARCHAR(512))
+                        .primaryKey(Collections.singletonList("id"))
+                        .build();
+        expected.add(new CreateTableEvent(tableId, schemaV1));
+        expected.addAll(getSnapshotExpected(tableId, schemaV1));
+        List<Event> actual = fetchResults(events, expected.size());
+        assertEqualsInAnyOrder(
+                
expected.stream().map(Object::toString).collect(Collectors.toList()),
+                
actual.stream().map(Object::toString).collect(Collectors.toList()));
+
+        // Wait for a little while until we're in Binlog streaming mode.
+        Thread.sleep(5_000);
+
+        LOG.info("Step 3: Evolve schema with gh-ost - ADD COLUMN");
+        execInContainer(
+                MYSQL8_CONTAINER,
+                "evolve schema",
+                "gh-ost",
+                "--user=" + TEST_USER,
+                "--password=" + TEST_PASSWORD,
+                "--database=" + customerDatabase.getDatabaseName(),
+                "--table=customers",
+                "--alter=add column ext int",
+                "--allow-on-master", // because we don't have a replica
+                "--initially-drop-old-table", // drop previously generated 
temporary tables
+                "--execute");
+
+        try (Connection connection = customerDatabase.getJdbcConnection();
+                Statement statement = connection.createStatement()) {
+            // The new column `ext` has been inserted now
+            statement.execute(
+                    String.format(
+                            "INSERT INTO `%s`.`customers` VALUES (10000, 
'Alice', 'Beijing', '123567891234', 17);",
+                            customerDatabase.getDatabaseName()));
+        }
+
+        Schema schemaV2 =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT().notNull())
+                        .physicalColumn("name", 
DataTypes.VARCHAR(255).notNull(), null, "flink")
+                        .physicalColumn("address", DataTypes.VARCHAR(1024))
+                        .physicalColumn("phone_number", DataTypes.VARCHAR(512))
+                        .physicalColumn("ext", DataTypes.INT())
+                        .primaryKey(Collections.singletonList("id"))
+                        .build();
+
+        assertEquals(
+                Arrays.asList(
+                        new AddColumnEvent(
+                                tableId,
+                                Collections.singletonList(
+                                        new AddColumnEvent.ColumnWithPosition(
+                                                new PhysicalColumn("ext", 
DataTypes.INT(), null)))),
+                        DataChangeEvent.insertEvent(
+                                tableId,
+                                generate(schemaV2, 10000, "Alice", "Beijing", 
"123567891234", 17))),
+                fetchResults(events, 2));
+
+        LOG.info("Step 4: Evolve schema with gh-ost - MODIFY COLUMN");
+        execInContainer(
+                MYSQL8_CONTAINER,
+                "evolve schema",
+                "gh-ost",
+                "--user=" + TEST_USER,
+                "--password=" + TEST_PASSWORD,
+                "--database=" + customerDatabase.getDatabaseName(),
+                "--table=customers",
+                "--alter=modify column ext double",
+                "--allow-on-master",
+                "--initially-drop-old-table",
+                "--execute");
+
+        try (Connection connection = customerDatabase.getJdbcConnection();
+                Statement statement = connection.createStatement()) {
+            statement.execute(
+                    String.format(
+                            "INSERT INTO `%s`.`customers` VALUES (10001, 
'Bob', 'Chongqing', '123567891234', 2.718281828);",
+                            customerDatabase.getDatabaseName()));
+        }
+
+        Schema schemaV3 =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT().notNull())
+                        .physicalColumn("name", 
DataTypes.VARCHAR(255).notNull(), null, "flink")
+                        .physicalColumn("address", DataTypes.VARCHAR(1024))
+                        .physicalColumn("phone_number", DataTypes.VARCHAR(512))
+                        .physicalColumn("ext", DataTypes.DOUBLE())
+                        .primaryKey(Collections.singletonList("id"))
+                        .build();
+
+        assertEquals(
+                Arrays.asList(
+                        new AlterColumnTypeEvent(
+                                tableId, Collections.singletonMap("ext", 
DataTypes.DOUBLE())),
+                        DataChangeEvent.insertEvent(
+                                tableId,
+                                generate(
+                                        schemaV3,
+                                        10001,
+                                        "Bob",
+                                        "Chongqing",
+                                        "123567891234",
+                                        2.718281828))),
+                fetchResults(events, 2));
+
+        LOG.info("Step 5: Evolve schema with gh-ost - DROP COLUMN");
+        execInContainer(
+                MYSQL8_CONTAINER,
+                "evolve schema",
+                "gh-ost",
+                "--user=" + TEST_USER,
+                "--password=" + TEST_PASSWORD,
+                "--database=" + customerDatabase.getDatabaseName(),
+                "--table=customers",
+                "--alter=drop column ext",
+                "--allow-on-master",
+                "--initially-drop-old-table",
+                "--execute");
+
+        try (Connection connection = customerDatabase.getJdbcConnection();
+                Statement statement = connection.createStatement()) {
+            statement.execute(
+                    String.format(
+                            "INSERT INTO `%s`.`customers` VALUES (10002, 
'Cicada', 'Urumqi', '123567891234');",
+                            customerDatabase.getDatabaseName()));
+        }
+
+        Schema schemaV4 =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT().notNull())
+                        .physicalColumn("name", 
DataTypes.VARCHAR(255).notNull(), null, "flink")
+                        .physicalColumn("address", DataTypes.VARCHAR(1024))
+                        .physicalColumn("phone_number", DataTypes.VARCHAR(512))
+                        .primaryKey(Collections.singletonList("id"))
+                        .build();
+
+        assertEquals(
+                Arrays.asList(
+                        new DropColumnEvent(tableId, 
Collections.singletonList("ext")),
+                        DataChangeEvent.insertEvent(
+                                tableId,
+                                generate(schemaV4, 10002, "Cicada", "Urumqi", 
"123567891234"))),
+                fetchResults(events, 2));
+    }
+
+    @Test
+    public void testPtOscSchemaMigrationFromScratch() throws Exception {
+        LOG.info("Step 1: Start pipeline job");
+
+        env.setParallelism(1);
+        TableId tableId = TableId.tableId(customerDatabase.getDatabaseName(), 
"customers");
+        MySqlSourceConfigFactory configFactory =
+                new MySqlSourceConfigFactory()
+                        .hostname(MYSQL8_CONTAINER.getHost())
+                        .port(MYSQL8_CONTAINER.getDatabasePort())
+                        .username(TEST_USER)
+                        .password(TEST_PASSWORD)
+                        .databaseList(customerDatabase.getDatabaseName())
+                        .tableList(customerDatabase.getDatabaseName() + 
"\\.customers")
+                        .startupOptions(StartupOptions.initial())
+                        .serverId(getServerId(env.getParallelism()))
+                        .serverTimeZone("UTC")
+                        
.includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue())
+                        .parseOnLineSchemaChanges(true);
+
+        FlinkSourceProvider sourceProvider =
+                (FlinkSourceProvider) new 
MySqlDataSource(configFactory).getEventSourceProvider();
+        CloseableIterator<Event> events =
+                env.fromSource(
+                                sourceProvider.getSource(),
+                                WatermarkStrategy.noWatermarks(),
+                                MySqlDataSourceFactory.IDENTIFIER,
+                                new EventTypeInfo())
+                        .executeAndCollect();
+        Thread.sleep(5_000);
+
+        List<Event> expected = new ArrayList<>();
+        Schema schemaV1 =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT().notNull())
+                        .physicalColumn("name", 
DataTypes.VARCHAR(255).notNull(), null, "flink")
+                        .physicalColumn("address", DataTypes.VARCHAR(1024))
+                        .physicalColumn("phone_number", DataTypes.VARCHAR(512))
+                        .primaryKey(Collections.singletonList("id"))
+                        .build();
+        expected.add(new CreateTableEvent(tableId, schemaV1));
+        expected.addAll(getSnapshotExpected(tableId, schemaV1));
+        List<Event> actual = fetchResults(events, expected.size());
+        assertEqualsInAnyOrder(
+                
expected.stream().map(Object::toString).collect(Collectors.toList()),
+                
actual.stream().map(Object::toString).collect(Collectors.toList()));
+
+        // Wait for a little while until we're in Binlog streaming mode.
+        Thread.sleep(5_000);
+
+        LOG.info("Step 2: Evolve schema with pt-osc - ADD COLUMN");
+        execInContainer(
+                PERCONA_TOOLKIT_CONTAINER,
+                "evolve schema",
+                "pt-online-schema-change",
+                "--user=" + TEST_USER,
+                "--host=" + INTER_CONTAINER_MYSQL_ALIAS,
+                "--password=" + TEST_PASSWORD,
+                "P=3306,t=customers,D=" + customerDatabase.getDatabaseName(),
+                "--alter",
+                "add column ext int",
+                "--charset=utf8",
+                "--recursion-method=NONE", // Do not look for slave nodes
+                "--print",
+                "--execute");
+
+        try (Connection connection = customerDatabase.getJdbcConnection();
+                Statement statement = connection.createStatement()) {
+            // The new column `ext` has been inserted now
+            statement.execute(
+                    String.format(
+                            "INSERT INTO `%s`.`customers` VALUES (10000, 
'Alice', 'Beijing', '123567891234', 17);",
+                            customerDatabase.getDatabaseName()));
+        }
+
+        Schema schemaV2 =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT().notNull())
+                        .physicalColumn("name", 
DataTypes.VARCHAR(255).notNull(), null, "flink")
+                        .physicalColumn("address", DataTypes.VARCHAR(1024))
+                        .physicalColumn("phone_number", DataTypes.VARCHAR(512))
+                        .physicalColumn("ext", DataTypes.INT())
+                        .primaryKey(Collections.singletonList("id"))
+                        .build();
+
+        assertEquals(
+                Arrays.asList(
+                        new AddColumnEvent(
+                                tableId,
+                                Collections.singletonList(
+                                        new AddColumnEvent.ColumnWithPosition(
+                                                new PhysicalColumn("ext", 
DataTypes.INT(), null)))),
+                        DataChangeEvent.insertEvent(
+                                tableId,
+                                generate(schemaV2, 10000, "Alice", "Beijing", 
"123567891234", 17))),
+                fetchResults(events, 2));
+
+        LOG.info("Step 3: Evolve schema with pt-osc - MODIFY COLUMN");
+        execInContainer(
+                PERCONA_TOOLKIT_CONTAINER,
+                "evolve schema",
+                "pt-online-schema-change",
+                "--user=" + TEST_USER,
+                "--host=" + INTER_CONTAINER_MYSQL_ALIAS,
+                "--password=" + TEST_PASSWORD,
+                "P=3306,t=customers,D=" + customerDatabase.getDatabaseName(),
+                "--alter",
+                "modify column ext double",
+                "--charset=utf8",
+                "--recursion-method=NONE",
+                "--print",
+                "--execute");
+
+        try (Connection connection = customerDatabase.getJdbcConnection();
+                Statement statement = connection.createStatement()) {
+            statement.execute(
+                    String.format(
+                            "INSERT INTO `%s`.`customers` VALUES (10001, 
'Bob', 'Chongqing', '123567891234', 2.718281828);",
+                            customerDatabase.getDatabaseName()));
+        }
+
+        Schema schemaV3 =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT().notNull())
+                        .physicalColumn("name", 
DataTypes.VARCHAR(255).notNull(), null, "flink")
+                        .physicalColumn("address", DataTypes.VARCHAR(1024))
+                        .physicalColumn("phone_number", DataTypes.VARCHAR(512))
+                        .physicalColumn("ext", DataTypes.DOUBLE())
+                        .primaryKey(Collections.singletonList("id"))
+                        .build();
+
+        assertEquals(
+                Arrays.asList(
+                        new AlterColumnTypeEvent(
+                                tableId, Collections.singletonMap("ext", 
DataTypes.DOUBLE())),
+                        DataChangeEvent.insertEvent(
+                                tableId,
+                                generate(
+                                        schemaV3,
+                                        10001,
+                                        "Bob",
+                                        "Chongqing",
+                                        "123567891234",
+                                        2.718281828))),
+                fetchResults(events, 2));
+
+        LOG.info("Step 4: Evolve schema with pt-osc - DROP COLUMN");
+        execInContainer(
+                PERCONA_TOOLKIT_CONTAINER,
+                "evolve schema",
+                "pt-online-schema-change",
+                "--user=" + TEST_USER,
+                "--host=" + INTER_CONTAINER_MYSQL_ALIAS,
+                "--password=" + TEST_PASSWORD,
+                "P=3306,t=customers,D=" + customerDatabase.getDatabaseName(),
+                "--alter",
+                "drop column ext",
+                "--charset=utf8",
+                "--recursion-method=NONE",
+                "--print",
+                "--execute");
+
+        try (Connection connection = customerDatabase.getJdbcConnection();
+                Statement statement = connection.createStatement()) {
+            statement.execute(
+                    String.format(
+                            "INSERT INTO `%s`.`customers` VALUES (10002, 
'Cicada', 'Urumqi', '123567891234');",
+                            customerDatabase.getDatabaseName()));
+        }
+
+        Schema schemaV4 =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT().notNull())
+                        .physicalColumn("name", 
DataTypes.VARCHAR(255).notNull(), null, "flink")
+                        .physicalColumn("address", DataTypes.VARCHAR(1024))
+                        .physicalColumn("phone_number", DataTypes.VARCHAR(512))
+                        .primaryKey(Collections.singletonList("id"))
+                        .build();
+
+        assertEquals(
+                Arrays.asList(
+                        new DropColumnEvent(tableId, 
Collections.singletonList("ext")),
+                        DataChangeEvent.insertEvent(
+                                tableId,
+                                generate(schemaV4, 10002, "Cicada", "Urumqi", 
"123567891234"))),
+                fetchResults(events, 2));
+    }
+
+    private static void execInContainer(Container<?> container, String prompt, 
String... commands)
+            throws IOException, InterruptedException {
+        {
+            LOG.info(
+                    "Starting to {} with the following command: `{}`",
+                    prompt,
+                    String.join(" ", commands));
+            Container.ExecResult execResult = 
container.execInContainer(commands);
+            if (execResult.getExitCode() == 0) {
+                LOG.info("Successfully {}. Stdout: {}", prompt, 
execResult.getStdout());
+            } else {
+                LOG.error(
+                        "Failed to {}. Exit code: {}, Stdout: {}, Stderr: {}",
+                        prompt,
+                        execResult.getExitCode(),
+                        execResult.getStdout(),
+                        execResult.getStderr());
+                throw new IOException("Failed to execute commands: " + 
String.join(" ", commands));
+            }
+        }
+    }
+
+    private List<Event> getSnapshotExpected(TableId tableId, Schema schema) {
+        return Stream.of(
+                        generate(schema, 101, "user_1", "Shanghai", 
"123567891234"),
+                        generate(schema, 102, "user_2", "Shanghai", 
"123567891234"),
+                        generate(schema, 103, "user_3", "Shanghai", 
"123567891234"),
+                        generate(schema, 109, "user_4", "Shanghai", 
"123567891234"),
+                        generate(schema, 110, "user_5", "Shanghai", 
"123567891234"),
+                        generate(schema, 111, "user_6", "Shanghai", 
"123567891234"),
+                        generate(schema, 118, "user_7", "Shanghai", 
"123567891234"),
+                        generate(schema, 121, "user_8", "Shanghai", 
"123567891234"),
+                        generate(schema, 123, "user_9", "Shanghai", 
"123567891234"),
+                        generate(schema, 1009, "user_10", "Shanghai", 
"123567891234"),
+                        generate(schema, 1010, "user_11", "Shanghai", 
"123567891234"),
+                        generate(schema, 1011, "user_12", "Shanghai", 
"123567891234"),
+                        generate(schema, 1012, "user_13", "Shanghai", 
"123567891234"),
+                        generate(schema, 1013, "user_14", "Shanghai", 
"123567891234"),
+                        generate(schema, 1014, "user_15", "Shanghai", 
"123567891234"),
+                        generate(schema, 1015, "user_16", "Shanghai", 
"123567891234"),
+                        generate(schema, 1016, "user_17", "Shanghai", 
"123567891234"),
+                        generate(schema, 1017, "user_18", "Shanghai", 
"123567891234"),
+                        generate(schema, 1018, "user_19", "Shanghai", 
"123567891234"),
+                        generate(schema, 1019, "user_20", "Shanghai", 
"123567891234"),
+                        generate(schema, 2000, "user_21", "Shanghai", 
"123567891234"))
+                .map(record -> DataChangeEvent.insertEvent(tableId, record))
+                .collect(Collectors.toList());
+    }
+
+    private BinaryRecordData generate(Schema schema, Object... fields) {
+        return (new 
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0])))
+                .generate(
+                        Arrays.stream(fields)
+                                .map(
+                                        e ->
+                                                (e instanceof String)
+                                                        ? 
BinaryStringData.fromString((String) e)
+                                                        : e)
+                                .toArray());
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
index 31173469b..447fda96a 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
@@ -53,6 +53,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -83,6 +84,7 @@ public class BinlogSplitReader implements 
DebeziumReader<SourceRecords, MySqlSpl
     private Predicate capturedTableFilter;
     private final StoppableChangeEventSourceContext changeEventSourceContext =
             new StoppableChangeEventSourceContext();
+    private final boolean isParsingOnLineSchemaChanges;
 
     private static final long READER_CLOSE_TIMEOUT = 30L;
 
@@ -93,6 +95,8 @@ public class BinlogSplitReader implements 
DebeziumReader<SourceRecords, MySqlSpl
         this.executorService = 
Executors.newSingleThreadExecutor(threadFactory);
         this.currentTaskRunning = true;
         this.pureBinlogPhaseTables = new HashSet<>();
+        this.isParsingOnLineSchemaChanges =
+                
statefulTaskContext.getSourceConfig().isParseOnLineSchemaChanges();
     }
 
     public void submitSplit(MySqlSplit mySqlSplit) {
@@ -148,6 +152,14 @@ public class BinlogSplitReader implements 
DebeziumReader<SourceRecords, MySqlSpl
         if (currentTaskRunning) {
             List<DataChangeEvent> batch = queue.poll();
             for (DataChangeEvent event : batch) {
+                if (isParsingOnLineSchemaChanges) {
+                    Optional<SourceRecord> oscRecord =
+                            parseOnLineSchemaChangeEvent(event.getRecord());
+                    if (oscRecord.isPresent()) {
+                        sourceRecords.add(oscRecord.get());
+                        continue;
+                    }
+                }
                 if (shouldEmit(event.getRecord())) {
                     sourceRecords.add(event.getRecord());
                 }
@@ -195,6 +207,20 @@ public class BinlogSplitReader implements 
DebeziumReader<SourceRecords, MySqlSpl
         }
     }
 
+    private Optional<SourceRecord> parseOnLineSchemaChangeEvent(SourceRecord 
sourceRecord) {
+        if (RecordUtils.isOnLineSchemaChangeEvent(sourceRecord)) {
+            // This is a gh-ost initialized schema change event and should be 
emitted if the
+            // peeled tableId matches the predicate.
+            TableId originalTableId = RecordUtils.getTableId(sourceRecord);
+            TableId peeledTableId = RecordUtils.peelTableId(originalTableId);
+            if (capturedTableFilter.test(peeledTableId)) {
+                return Optional.of(
+                        RecordUtils.setTableId(sourceRecord, originalTableId, 
peeledTableId));
+            }
+        }
+        return Optional.empty();
+    }
+
     /**
      * Returns the record should emit or not.
      *
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java
index c03aa36b0..3fce1eb61 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java
@@ -268,6 +268,12 @@ public class MySqlSourceBuilder<T> {
         return this;
     }
 
+    /** Whether to parse gh-ost utility generated schema change events. 
Defaults to false. */
+    public MySqlSourceBuilder<T> parseOnLineSchemaChanges(boolean 
parseOnLineSchemaChanges) {
+        this.configFactory.parseOnLineSchemaChanges(parseOnLineSchemaChanges);
+        return this;
+    }
+
     /**
      * Build the {@link MySqlSource}.
      *
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
index dd0ac7896..85079f28c 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
@@ -66,6 +66,7 @@ public class MySqlSourceConfig implements Serializable {
     private final Properties jdbcProperties;
     private final Map<ObjectPath, String> chunkKeyColumns;
     private final boolean skipSnapshotBackfill;
+    private final boolean parseOnLineSchemaChanges;
 
     // 
--------------------------------------------------------------------------------------------
     // Debezium Configurations
@@ -99,7 +100,8 @@ public class MySqlSourceConfig implements Serializable {
             Properties dbzProperties,
             Properties jdbcProperties,
             Map<ObjectPath, String> chunkKeyColumns,
-            boolean skipSnapshotBackfill) {
+            boolean skipSnapshotBackfill,
+            boolean parseOnLineSchemaChanges) {
         this.hostname = checkNotNull(hostname);
         this.port = port;
         this.username = checkNotNull(username);
@@ -127,6 +129,7 @@ public class MySqlSourceConfig implements Serializable {
         this.jdbcProperties = jdbcProperties;
         this.chunkKeyColumns = chunkKeyColumns;
         this.skipSnapshotBackfill = skipSnapshotBackfill;
+        this.parseOnLineSchemaChanges = parseOnLineSchemaChanges;
     }
 
     public String getHostname() {
@@ -210,6 +213,10 @@ public class MySqlSourceConfig implements Serializable {
         return closeIdleReaders;
     }
 
+    public boolean isParseOnLineSchemaChanges() {
+        return parseOnLineSchemaChanges;
+    }
+
     public Properties getDbzProperties() {
         return dbzProperties;
     }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
index 8b65055ca..f0ca4cc96 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
@@ -70,6 +70,7 @@ public class MySqlSourceConfigFactory implements Serializable 
{
     private Properties dbzProperties;
     private Map<ObjectPath, String> chunkKeyColumns = new HashMap<>();
     private boolean skipSnapshotBackfill = false;
+    private boolean parseOnLineSchemaChanges = false;
 
     public MySqlSourceConfigFactory hostname(String hostname) {
         this.hostname = hostname;
@@ -291,6 +292,12 @@ public class MySqlSourceConfigFactory implements 
Serializable {
         return this;
     }
 
+    /** Whether to parse gh-ost/pt-osc utility generated schema change events. 
Defaults to false. */
+    public MySqlSourceConfigFactory parseOnLineSchemaChanges(boolean 
parseOnLineSchemaChanges) {
+        this.parseOnLineSchemaChanges = parseOnLineSchemaChanges;
+        return this;
+    }
+
     /** Creates a new {@link MySqlSourceConfig} for the given subtask {@code 
subtaskId}. */
     public MySqlSourceConfig createConfig(int subtaskId) {
         // hard code server name, because we don't need to distinguish it, 
docs:
@@ -384,6 +391,7 @@ public class MySqlSourceConfigFactory implements 
Serializable {
                 props,
                 jdbcProperties,
                 chunkKeyColumns,
-                skipSnapshotBackfill);
+                skipSnapshotBackfill,
+                parseOnLineSchemaChanges);
     }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java
index f3424c8df..87de37e05 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java
@@ -262,4 +262,12 @@ public class MySqlSourceOptions {
                     .defaultValue(false)
                     .withDescription(
                             "Whether to skip backfill in snapshot reading 
phase. If backfill is skipped, changes on captured tables during snapshot phase 
will be consumed later in binlog reading phase instead of being merged into the 
snapshot. WARNING: Skipping backfill might lead to data inconsistency because 
some binlog events happened within the snapshot phase might be replayed (only 
at-least-once semantic is promised). For example updating an already updated 
value in snapshot, or del [...]
+
+    @Experimental
+    public static final ConfigOption<Boolean> PARSE_ONLINE_SCHEMA_CHANGES =
+            ConfigOptions.key("scan.parse.online.schema.changes.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to parse schema change events generated 
by gh-ost/pt-osc utilities. Defaults to false.");
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java
index d85944adb..e6848f1c4 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java
@@ -25,7 +25,11 @@ import 
org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitI
 import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
 import org.apache.flink.table.types.logical.RowType;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
 import io.debezium.data.Envelope;
+import io.debezium.document.Document;
 import io.debezium.document.DocumentReader;
 import io.debezium.relational.TableId;
 import io.debezium.relational.history.HistoryRecord;
@@ -50,6 +54,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 import static io.debezium.connector.AbstractSourceInfo.DATABASE_NAME_KEY;
@@ -384,6 +390,40 @@ public class RecordUtils {
         return new TableId(dbName, null, tableName);
     }
 
+    public static SourceRecord setTableId(
+            SourceRecord dataRecord, TableId originalTableId, TableId tableId) 
{
+        Struct value = (Struct) dataRecord.value();
+        Document historyRecordDocument;
+        try {
+            historyRecordDocument = getHistoryRecord(dataRecord).document();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        HistoryRecord newHistoryRecord =
+                new HistoryRecord(
+                        historyRecordDocument.set(
+                                HistoryRecord.Fields.DDL_STATEMENTS,
+                                historyRecordDocument
+                                        
.get(HistoryRecord.Fields.DDL_STATEMENTS)
+                                        .asString()
+                                        .replace(originalTableId.table(), 
tableId.table())));
+
+        Struct newSource =
+                value.getStruct(Envelope.FieldName.SOURCE)
+                        .put(DATABASE_NAME_KEY, tableId.catalog())
+                        .put(TABLE_NAME_KEY, tableId.table());
+        return dataRecord.newRecord(
+                dataRecord.topic(),
+                dataRecord.kafkaPartition(),
+                dataRecord.keySchema(),
+                dataRecord.key(),
+                dataRecord.valueSchema(),
+                value.put(Envelope.FieldName.SOURCE, newSource)
+                        .put(HISTORY_RECORD_FIELD, 
newHistoryRecord.toString()),
+                dataRecord.timestamp(),
+                dataRecord.headers());
+    }
+
     public static boolean isTableChangeRecord(SourceRecord dataRecord) {
         Struct value = (Struct) dataRecord.value();
         Struct source = value.getStruct(Envelope.FieldName.SOURCE);
@@ -489,4 +529,75 @@ public class RecordUtils {
         }
         return Optional.empty();
     }
+
+    /**
+     * This utility method checks if given source record is a gh-ost/pt-osc 
initiated schema change
+     * event by checking the "alter" ddl.
+     */
+    public static boolean isOnLineSchemaChangeEvent(SourceRecord record) {
+        if (!isSchemaChangeEvent(record)) {
+            return false;
+        }
+        Struct value = (Struct) record.value();
+        ObjectMapper mapper = new ObjectMapper();
+        try {
+            // There will be these schema change events generated in total 
during one transaction.
+            //
+            // gh-ost:
+            // DROP TABLE IF EXISTS `db`.`_tb1_gho`
+            // DROP TABLE IF EXISTS `db`.`_tb1_del`
+            // DROP TABLE IF EXISTS `db`.`_tb1_ghc`
+            // create /* gh-ost */ table `db`.`_tb1_ghc` ...
+            // create /* gh-ost */ table `db`.`_tb1_gho` like `db`.`tb1`
+            // alter /* gh-ost */ table `db`.`_tb1_gho` add column c 
varchar(255)
+            // create /* gh-ost */ table `db`.`_tb1_del` ...
+            // DROP TABLE IF EXISTS `db`.`_tb1_del`
+            // rename /* gh-ost */ table `db`.`tb1` to `db`.`_tb1_del`
+            // rename /* gh-ost */ table `db`.`_tb1_gho` to `db`.`tb1`
+            // DROP TABLE IF EXISTS `db`.`_tb1_ghc`
+            // DROP TABLE IF EXISTS `db`.`_tb1_del`
+            //
+            // pt-osc:
+            // CREATE TABLE `db`.`_test_tb1_new`
+            // ALTER TABLE `db`.`_test_tb1_new` add column c varchar(50)
+            // CREATE TRIGGER `pt_osc_db_test_tb1_del`...
+            // CREATE TRIGGER `pt_osc_db_test_tb1_upd`...
+            // CREATE TRIGGER `pt_osc_db_test_tb1_ins`...
+            // ANALYZE TABLE `db`.`_test_tb1_new` /* pt-online-schema-change */
+            // RENAME TABLE `db`.`test_tb1` TO `db`.`_test_tb1_old`, 
`db`.`_test_tb1_new` TO
+            // `db`.`test_tb1`
+            // DROP TABLE IF EXISTS `_test_tb1_old` /* generated by server */
+            // DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_del`
+            // DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_upd`
+            // DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_ins`
+            //
+            // Among all these, we only need the "ALTER" one that happens on 
the `_gho`/`_new`
+            // table.
+            String ddl =
+                    mapper.readTree(value.getString(HISTORY_RECORD_FIELD))
+                            .get(HistoryRecord.Fields.DDL_STATEMENTS)
+                            .asText()
+                            .toLowerCase();
+            if (ddl.startsWith("alter")) {
+                String tableName =
+                        
value.getStruct(Envelope.FieldName.SOURCE).getString(TABLE_NAME_KEY);
+                return OSC_TABLE_ID_PATTERN.matcher(tableName).matches();
+            }
+
+            return false;
+        } catch (JsonProcessingException e) {
+            return false;
+        }
+    }
+
+    private static final Pattern OSC_TABLE_ID_PATTERN = 
Pattern.compile("^_(.*)_(gho|new)$");
+
+    /** This utility method peels out gh-ost/pt-osc mangled tableId to the 
original one. */
+    public static TableId peelTableId(TableId tableId) {
+        Matcher matchingResult = OSC_TABLE_ID_PATTERN.matcher(tableId.table());
+        if (matchingResult.matches()) {
+            return new TableId(tableId.catalog(), tableId.schema(), 
matchingResult.group(1));
+        }
+        return tableId;
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java
index 242e02da6..8d023ad30 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java
@@ -98,6 +98,7 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
     private final Duration heartbeatInterval;
     private final String chunkKeyColumn;
     final boolean skipSnapshotBackFill;
+    final boolean parseOnlineSchemaChanges;
 
     // 
--------------------------------------------------------------------------------------------
     // Mutable attributes
@@ -135,7 +136,8 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
             Properties jdbcProperties,
             Duration heartbeatInterval,
             @Nullable String chunkKeyColumn,
-            boolean skipSnapshotBackFill) {
+            boolean skipSnapshotBackFill,
+            boolean parseOnlineSchemaChanges) {
         this.physicalSchema = physicalSchema;
         this.port = port;
         this.hostname = checkNotNull(hostname);
@@ -159,6 +161,7 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
         this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled;
         this.closeIdleReaders = closeIdleReaders;
         this.jdbcProperties = jdbcProperties;
+        this.parseOnlineSchemaChanges = parseOnlineSchemaChanges;
         // Mutable attributes
         this.producedDataType = physicalSchema.toPhysicalRowDataType();
         this.metadataKeys = Collections.emptyList();
@@ -220,6 +223,7 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
                             .heartbeatInterval(heartbeatInterval)
                             .chunkKeyColumn(new ObjectPath(database, 
tableName), chunkKeyColumn)
                             .skipSnapshotBackfill(skipSnapshotBackFill)
+                            .parseOnLineSchemaChanges(parseOnlineSchemaChanges)
                             .build();
             return SourceProvider.of(parallelSource);
         } else {
@@ -305,7 +309,8 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
                         jdbcProperties,
                         heartbeatInterval,
                         chunkKeyColumn,
-                        skipSnapshotBackFill);
+                        skipSnapshotBackFill,
+                        parseOnlineSchemaChanges);
         source.metadataKeys = metadataKeys;
         source.producedDataType = producedDataType;
         return source;
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java
index e435f946a..c3404db42 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java
@@ -102,6 +102,8 @@ public class MySqlTableSourceFactory implements 
DynamicTableSourceFactory {
                 
config.get(MySqlSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
         boolean skipSnapshotBackFill =
                 
config.get(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
+        boolean parseOnLineSchemaChanges =
+                config.get(MySqlSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES);
 
         if (enableParallelRead) {
             validatePrimaryKeyIfEnableParallel(physicalSchema, chunkKeyColumn);
@@ -145,7 +147,8 @@ public class MySqlTableSourceFactory implements 
DynamicTableSourceFactory {
                 
JdbcUrlUtils.getJdbcProperties(context.getCatalogTable().getOptions()),
                 heartbeatInterval,
                 chunkKeyColumn,
-                skipSnapshotBackFill);
+                skipSnapshotBackFill,
+                parseOnLineSchemaChanges);
     }
 
     @Override
@@ -191,6 +194,7 @@ public class MySqlTableSourceFactory implements 
DynamicTableSourceFactory {
         options.add(MySqlSourceOptions.HEARTBEAT_INTERVAL);
         
options.add(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
         
options.add(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
+        options.add(MySqlSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES);
         return options;
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationSourceITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationSourceITCase.java
new file mode 100644
index 000000000..848869783
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationSourceITCase.java
@@ -0,0 +1,584 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.source;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.utils.TestCaseUtils;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
+import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.types.Row;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.testcontainers.DockerClientFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Random;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.api.common.JobStatus.RUNNING;
+import static org.apache.flink.cdc.common.utils.TestCaseUtils.DEFAULT_INTERVAL;
+import static org.apache.flink.cdc.common.utils.TestCaseUtils.DEFAULT_TIMEOUT;
+
+/**
+ * IT case for Evolving MySQL schema with gh-ost/pt-osc utility. See <a
+ * href="https://github.com/github/gh-ost";>github/gh-ost</a>/<a
+ * 
href="https://docs.percona.com/percona-toolkit/pt-online-schema-change.html";>doc/pt-osc</a>
 for
+ * more details.
+ */
+public class MySqlOnLineSchemaMigrationSourceITCase extends 
MySqlSourceTestBase {
+    private static final MySqlContainer MYSQL8_CONTAINER =
+            createMySqlContainer(MySqlVersion.V8_0, 
"docker/server-gtids/expire-seconds/my.cnf");
+
+    private static final String TEST_USER = "mysqluser";
+    private static final String TEST_PASSWORD = "mysqlpw";
+
+    private static final String PERCONA_TOOLKIT = 
"perconalab/percona-toolkit:3.5.7";
+
+    protected static final GenericContainer<?> PERCONA_TOOLKIT_CONTAINER =
+            createPerconaToolkitContainer();
+
+    private final UniqueDatabase customerDatabase =
+            new UniqueDatabase(MYSQL8_CONTAINER, "customer", TEST_USER, 
TEST_PASSWORD);
+
+    private final StreamExecutionEnvironment env =
+            StreamExecutionEnvironment.getExecutionEnvironment();
+    private final StreamTableEnvironment tEnv =
+            StreamTableEnvironment.create(
+                    env, 
EnvironmentSettings.newInstance().inStreamingMode().build());
+
+    private static final String GH_OST_DOWNLOAD_LINK =
+            
DockerClientFactory.instance().client().versionCmd().exec().getArch().equals("amd64")
+                    ? 
"https://github.com/github/gh-ost/releases/download/v1.1.6/gh-ost-binary-linux-amd64-20231207144046.tar.gz";
+                    : 
"https://github.com/github/gh-ost/releases/download/v1.1.6/gh-ost-binary-linux-arm64-20231207144046.tar.gz";;
+
+    @BeforeClass
+    public static void beforeClass() {
+        LOG.info("Starting containers...");
+        Startables.deepStart(Stream.of(MYSQL8_CONTAINER)).join();
+        Startables.deepStart(Stream.of(PERCONA_TOOLKIT_CONTAINER)).join();
+        LOG.info("Containers are started.");
+    }
+
+    @AfterClass
+    public static void afterClass() {
+        LOG.info("Stopping containers...");
+        MYSQL8_CONTAINER.stop();
+        PERCONA_TOOLKIT_CONTAINER.close();
+        LOG.info("Containers are stopped.");
+    }
+
+    @Before
+    public void before() {
+        TestValuesTableFactory.clearAllData();
+        env.setParallelism(DEFAULT_PARALLELISM);
+        env.enableCheckpointing(200);
+        customerDatabase.createAndInitialize();
+        System.setOut(new PrintStream(outCaptor));
+    }
+
+    @After
+    public void after() {
+        customerDatabase.dropDatabase();
+        System.setOut(sysOut);
+    }
+
+    private static void installGhOstCli(Container<?> container) {
+        try {
+            execInContainer(
+                    container,
+                    "download gh-ost tarball",
+                    "curl",
+                    "-L",
+                    "-o",
+                    "/tmp/gh-ost.tar.gz",
+                    GH_OST_DOWNLOAD_LINK);
+            execInContainer(
+                    container, "unzip binary", "tar", "-xzvf", 
"/tmp/gh-ost.tar.gz", "-C", "/bin");
+        } catch (IOException | InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static GenericContainer<?> createPerconaToolkitContainer() {
+        GenericContainer<?> perconaToolkit =
+                new GenericContainer<>(PERCONA_TOOLKIT)
+                        // keep container alive
+                        .withCommand("tail", "-f", "/dev/null")
+                        .withNetwork(NETWORK)
+                        .withLogConsumer(new Slf4jLogConsumer(LOG));
+        return perconaToolkit;
+    }
+
+    private final PrintStream sysOut = System.out;
+    private final ByteArrayOutputStream outCaptor = new 
ByteArrayOutputStream();
+
+    @Test
+    public void testGhOstSchemaMigrationFromScratch() throws Exception {
+        LOG.info("Step 1: Install gh-ost command line utility");
+        installGhOstCli(MYSQL8_CONTAINER);
+
+        LOG.info("Step 2: Start pipeline job");
+        MySqlSource<String> mySqlSource =
+                MySqlSource.<String>builder()
+                        .hostname(MYSQL8_CONTAINER.getHost())
+                        .port(MYSQL8_CONTAINER.getDatabasePort())
+                        .databaseList(customerDatabase.getDatabaseName())
+                        .tableList(customerDatabase.getDatabaseName() + 
".customers")
+                        .username(customerDatabase.getUsername())
+                        .password(customerDatabase.getPassword())
+                        .serverId("5401-5404")
+                        .deserializer(new JsonDebeziumDeserializationSchema())
+                        .serverTimeZone("UTC")
+                        .includeSchemaChanges(true) // output the schema 
changes as well
+                        .build();
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(3000);
+        env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), 
"MySqlParallelSource")
+                .setParallelism(4)
+                .print()
+                .setParallelism(1);
+
+        JobClient resultClient = env.executeAsync();
+
+        TestCaseUtils.repeatedCheck(
+                () -> resultClient.getJobStatus().get().equals(RUNNING),
+                DEFAULT_TIMEOUT,
+                DEFAULT_INTERVAL,
+                Arrays.asList(InterruptedException.class, 
NoSuchElementException.class));
+
+        {
+            String[] expected =
+                    new String[] {
+                        
"{\"id\":101,\"name\":\"user_1\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+                        
"{\"id\":102,\"name\":\"user_2\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+                        
"{\"id\":103,\"name\":\"user_3\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+                        
"{\"id\":109,\"name\":\"user_4\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+                        
"{\"id\":110,\"name\":\"user_5\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+                        
"{\"id\":111,\"name\":\"user_6\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+                        
"{\"id\":118,\"name\":\"user_7\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+                        
"{\"id\":121,\"name\":\"user_8\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+                        
"{\"id\":123,\"name\":\"user_9\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+                        
"{\"id\":1009,\"name\":\"user_10\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+                        
"{\"id\":1010,\"name\":\"user_11\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+                        
"{\"id\":1011,\"name\":\"user_12\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+                        
"{\"id\":1012,\"name\":\"user_13\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+                        
"{\"id\":1013,\"name\":\"user_14\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+                        
"{\"id\":1014,\"name\":\"user_15\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+                        
"{\"id\":1015,\"name\":\"user_16\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+                        
"{\"id\":1016,\"name\":\"user_17\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+                        
"{\"id\":1017,\"name\":\"user_18\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+                        
"{\"id\":1018,\"name\":\"user_19\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+                        
"{\"id\":1019,\"name\":\"user_20\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+                        
"{\"id\":2000,\"name\":\"user_21\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+                    };
+            TestCaseUtils.repeatedCheck(
+                    () -> 
Arrays.stream(expected).allMatch(outCaptor.toString()::contains));
+        }
+
+        // Wait for a little while until we're in Binlog streaming mode.
+        Thread.sleep(5_000);
+
+        {
+            LOG.info("Step 3: Evolve schema with gh-ost - ADD COLUMN");
+            execInContainer(
+                    MYSQL8_CONTAINER,
+                    "evolve schema",
+                    "gh-ost",
+                    "--user=" + TEST_USER,
+                    "--password=" + TEST_PASSWORD,
+                    "--database=" + customerDatabase.getDatabaseName(),
+                    "--table=customers",
+                    "--alter=add column ext int first",
+                    "--allow-on-master", // because we don't have a replica
+                    "--initially-drop-old-table", // drop previously generated 
temporary tables
+                    "--execute");
+
+            try (Connection connection = customerDatabase.getJdbcConnection();
+                    Statement statement = connection.createStatement()) {
+                // The new column `ext` has been inserted now
+                statement.execute(
+                        String.format(
+                                "INSERT INTO `%s`.`customers` VALUES (17, 
10000, 'Alice', 'Beijing', '123567891234');",
+                                customerDatabase.getDatabaseName()));
+            }
+
+            TestCaseUtils.repeatedCheck(
+                    () ->
+                            outCaptor
+                                    .toString()
+                                    .contains(
+                                            
"{\"ext\":17,\"id\":10000,\"name\":\"Alice\",\"address\":\"Beijing\",\"phone_number\":\"123567891234\"}"));
+        }
+
+        {
+            LOG.info("Step 4: Evolve schema with gh-ost - MODIFY COLUMN");
+            execInContainer(
+                    MYSQL8_CONTAINER,
+                    "evolve schema",
+                    "gh-ost",
+                    "--user=" + TEST_USER,
+                    "--password=" + TEST_PASSWORD,
+                    "--database=" + customerDatabase.getDatabaseName(),
+                    "--table=customers",
+                    "--alter=modify column ext double",
+                    "--allow-on-master",
+                    "--initially-drop-old-table",
+                    "--execute");
+
+            try (Connection connection = customerDatabase.getJdbcConnection();
+                    Statement statement = connection.createStatement()) {
+                statement.execute(
+                        String.format(
+                                "INSERT INTO `%s`.`customers` VALUES 
(2.718281828, 10001, 'Bob', 'Chongqing', '123567891234');",
+                                customerDatabase.getDatabaseName()));
+            }
+
+            TestCaseUtils.repeatedCheck(
+                    () ->
+                            outCaptor
+                                    .toString()
+                                    .contains(
+                                            
"{\"ext\":2.718281828,\"id\":10001,\"name\":\"Bob\",\"address\":\"Chongqing\",\"phone_number\":\"123567891234\"}"));
+        }
+
+        {
+            LOG.info("Step 5: Evolve schema with gh-ost - DROP COLUMN");
+            execInContainer(
+                    MYSQL8_CONTAINER,
+                    "evolve schema",
+                    "gh-ost",
+                    "--user=" + TEST_USER,
+                    "--password=" + TEST_PASSWORD,
+                    "--database=" + customerDatabase.getDatabaseName(),
+                    "--table=customers",
+                    "--alter=drop column ext",
+                    "--allow-on-master",
+                    "--initially-drop-old-table",
+                    "--execute");
+
+            try (Connection connection = customerDatabase.getJdbcConnection();
+                    Statement statement = connection.createStatement()) {
+                statement.execute(
+                        String.format(
+                                "INSERT INTO `%s`.`customers` VALUES (10002, 
'Cicada', 'Urumqi', '123567891234');",
+                                customerDatabase.getDatabaseName()));
+            }
+
+            TestCaseUtils.repeatedCheck(
+                    () ->
+                            outCaptor
+                                    .toString()
+                                    .contains(
+                                            
"{\"id\":10002,\"name\":\"Cicada\",\"address\":\"Urumqi\",\"phone_number\":\"123567891234\"}"));
+        }
+        resultClient.cancel();
+    }
+
+    @Test
+    public void testPtOscSchemaMigrationFromScratch() throws Exception {
+        LOG.info("Step 1: Start pipeline job");
+        MySqlSource<String> mySqlSource =
+                MySqlSource.<String>builder()
+                        .hostname(MYSQL8_CONTAINER.getHost())
+                        .port(MYSQL8_CONTAINER.getDatabasePort())
+                        .databaseList(customerDatabase.getDatabaseName())
+                        .tableList(customerDatabase.getDatabaseName() + 
".customers")
+                        .username(customerDatabase.getUsername())
+                        .password(customerDatabase.getPassword())
+                        .serverId("5401-5404")
+                        .deserializer(new JsonDebeziumDeserializationSchema())
+                        .serverTimeZone("UTC")
+                        .includeSchemaChanges(true) // output the schema 
changes as well
+                        .build();
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(3000);
+        env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), 
"MySqlParallelSource")
+                .setParallelism(4)
+                .print()
+                .setParallelism(1);
+
+        JobClient resultClient = env.executeAsync();
+
+        TestCaseUtils.repeatedCheck(
+                () -> resultClient.getJobStatus().get().equals(RUNNING),
+                DEFAULT_TIMEOUT,
+                DEFAULT_INTERVAL,
+                Arrays.asList(InterruptedException.class, 
NoSuchElementException.class));
+
+        {
+            String[] expected =
+                    new String[] {
+                        
"{\"id\":101,\"name\":\"user_1\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+                        
"{\"id\":102,\"name\":\"user_2\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+                        
"{\"id\":103,\"name\":\"user_3\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+                        
"{\"id\":109,\"name\":\"user_4\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+                        
"{\"id\":110,\"name\":\"user_5\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+                        
"{\"id\":111,\"name\":\"user_6\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+                        
"{\"id\":118,\"name\":\"user_7\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+                        
"{\"id\":121,\"name\":\"user_8\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+                        
"{\"id\":123,\"name\":\"user_9\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+                        
"{\"id\":1009,\"name\":\"user_10\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+                        
"{\"id\":1010,\"name\":\"user_11\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+                        
"{\"id\":1011,\"name\":\"user_12\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+                        
"{\"id\":1012,\"name\":\"user_13\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+                        
"{\"id\":1013,\"name\":\"user_14\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+                        
"{\"id\":1014,\"name\":\"user_15\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+                        
"{\"id\":1015,\"name\":\"user_16\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+                        
"{\"id\":1016,\"name\":\"user_17\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+                        
"{\"id\":1017,\"name\":\"user_18\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+                        
"{\"id\":1018,\"name\":\"user_19\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+                        
"{\"id\":1019,\"name\":\"user_20\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+                        
"{\"id\":2000,\"name\":\"user_21\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+                    };
+            TestCaseUtils.repeatedCheck(
+                    () -> 
Arrays.stream(expected).allMatch(outCaptor.toString()::contains));
+        }
+
+        // Wait for a little while until we're in Binlog streaming mode.
+        Thread.sleep(5000L);
+
+        LOG.info("Step 2: Evolve schema with pt-osc - ADD COLUMN");
+        execInContainer(
+                PERCONA_TOOLKIT_CONTAINER,
+                "evolve schema",
+                "pt-online-schema-change",
+                "--user=" + TEST_USER,
+                "--host=" + INTER_CONTAINER_MYSQL_ALIAS,
+                "--password=" + TEST_PASSWORD,
+                "P=3306,t=customers,D=" + customerDatabase.getDatabaseName(),
+                "--alter",
+                "add column ext int FIRST",
+                "--charset=utf8",
+                "--recursion-method=NONE", // Do not look for slave nodes
+                "--print",
+                "--execute");
+
+        try (Connection connection = customerDatabase.getJdbcConnection();
+                Statement statement = connection.createStatement()) {
+            // The new column `ext` has been inserted now
+            statement.execute(
+                    String.format(
+                            "INSERT INTO `%s`.`customers` VALUES (17, 10000, 
'Alice', 'Beijing', '123567891234');",
+                            customerDatabase.getDatabaseName()));
+
+            TestCaseUtils.repeatedCheck(
+                    () ->
+                            outCaptor
+                                    .toString()
+                                    .contains(
+                                            
"{\"ext\":17,\"id\":10000,\"name\":\"Alice\",\"address\":\"Beijing\",\"phone_number\":\"123567891234\"}"));
+        }
+
+        LOG.info("Step 3: Evolve schema with pt-osc - MODIFY COLUMN");
+        execInContainer(
+                PERCONA_TOOLKIT_CONTAINER,
+                "evolve schema",
+                "pt-online-schema-change",
+                "--user=" + TEST_USER,
+                "--host=" + INTER_CONTAINER_MYSQL_ALIAS,
+                "--password=" + TEST_PASSWORD,
+                "P=3306,t=customers,D=" + customerDatabase.getDatabaseName(),
+                "--alter",
+                "modify column ext double",
+                "--charset=utf8",
+                "--recursion-method=NONE",
+                "--print",
+                "--execute");
+
+        try (Connection connection = customerDatabase.getJdbcConnection();
+                Statement statement = connection.createStatement()) {
+            statement.execute(
+                    String.format(
+                            "INSERT INTO `%s`.`customers` VALUES (2.718281828, 
10001, 'Bob', 'Chongqing', '123567891234');",
+                            customerDatabase.getDatabaseName()));
+            TestCaseUtils.repeatedCheck(
+                    () ->
+                            outCaptor
+                                    .toString()
+                                    .contains(
+                                            
"{\"ext\":2.718281828,\"id\":10001,\"name\":\"Bob\",\"address\":\"Chongqing\",\"phone_number\":\"123567891234\"}"));
+        }
+
+        LOG.info("Step 4: Evolve schema with pt-osc - DROP COLUMN");
+        execInContainer(
+                PERCONA_TOOLKIT_CONTAINER,
+                "evolve schema",
+                "pt-online-schema-change",
+                "--user=" + TEST_USER,
+                "--host=" + INTER_CONTAINER_MYSQL_ALIAS,
+                "--password=" + TEST_PASSWORD,
+                "P=3306,t=customers,D=" + customerDatabase.getDatabaseName(),
+                "--alter",
+                "drop column ext",
+                "--charset=utf8",
+                "--recursion-method=NONE",
+                "--print",
+                "--execute");
+
+        try (Connection connection = customerDatabase.getJdbcConnection();
+                Statement statement = connection.createStatement()) {
+            statement.execute(
+                    String.format(
+                            "INSERT INTO `%s`.`customers` VALUES (10002, 
'Cicada', 'Urumqi', '123567891234');",
+                            customerDatabase.getDatabaseName()));
+            TestCaseUtils.repeatedCheck(
+                    () ->
+                            outCaptor
+                                    .toString()
+                                    .contains(
+                                            
"{\"id\":10002,\"name\":\"Cicada\",\"address\":\"Urumqi\",\"phone_number\":\"123567891234\"}"));
+        }
+    }
+
+    private static void execInContainer(Container<?> container, String prompt, 
String... commands)
+            throws IOException, InterruptedException {
+        {
+            LOG.info(
+                    "Starting to {} with the following command: `{}`",
+                    prompt,
+                    String.join(" ", commands));
+            Container.ExecResult execResult = 
container.execInContainer(commands);
+            if (execResult.getExitCode() == 0) {
+                LOG.info("Successfully {}. Stdout: {}", prompt, 
execResult.getStdout());
+            } else {
+                LOG.error(
+                        "Failed to {}. Exit code: {}, Stdout: {}, Stderr: {}",
+                        prompt,
+                        execResult.getExitCode(),
+                        execResult.getStdout(),
+                        execResult.getStderr());
+                throw new IOException("Failed to execute commands: " + 
String.join(" ", commands));
+            }
+        }
+    }
+
+    private List<Event> getSnapshotExpected(TableId tableId, Schema schema) {
+        return Stream.of(
+                        generate(schema, 101, "user_1", "Shanghai", 
"123567891234"),
+                        generate(schema, 102, "user_2", "Shanghai", 
"123567891234"),
+                        generate(schema, 103, "user_3", "Shanghai", 
"123567891234"),
+                        generate(schema, 109, "user_4", "Shanghai", 
"123567891234"),
+                        generate(schema, 110, "user_5", "Shanghai", 
"123567891234"),
+                        generate(schema, 111, "user_6", "Shanghai", 
"123567891234"),
+                        generate(schema, 118, "user_7", "Shanghai", 
"123567891234"),
+                        generate(schema, 121, "user_8", "Shanghai", 
"123567891234"),
+                        generate(schema, 123, "user_9", "Shanghai", 
"123567891234"),
+                        generate(schema, 1009, "user_10", "Shanghai", 
"123567891234"),
+                        generate(schema, 1010, "user_11", "Shanghai", 
"123567891234"),
+                        generate(schema, 1011, "user_12", "Shanghai", 
"123567891234"),
+                        generate(schema, 1012, "user_13", "Shanghai", 
"123567891234"),
+                        generate(schema, 1013, "user_14", "Shanghai", 
"123567891234"),
+                        generate(schema, 1014, "user_15", "Shanghai", 
"123567891234"),
+                        generate(schema, 1015, "user_16", "Shanghai", 
"123567891234"),
+                        generate(schema, 1016, "user_17", "Shanghai", 
"123567891234"),
+                        generate(schema, 1017, "user_18", "Shanghai", 
"123567891234"),
+                        generate(schema, 1018, "user_19", "Shanghai", 
"123567891234"),
+                        generate(schema, 1019, "user_20", "Shanghai", 
"123567891234"),
+                        generate(schema, 2000, "user_21", "Shanghai", 
"123567891234"))
+                .map(record -> DataChangeEvent.insertEvent(tableId, record))
+                .collect(Collectors.toList());
+    }
+
+    private BinaryRecordData generate(Schema schema, Object... fields) {
+        return (new 
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0])))
+                .generate(
+                        Arrays.stream(fields)
+                                .map(
+                                        e ->
+                                                (e instanceof String)
+                                                        ? 
BinaryStringData.fromString((String) e)
+                                                        : e)
+                                .toArray());
+    }
+
+    private String getServerId() {
+        final Random random = new Random();
+        int serverId = random.nextInt(100) + 5400;
+        return serverId + "-" + (serverId + env.getParallelism());
+    }
+
+    protected String getServerId(int base) {
+        return base + "-" + (base + DEFAULT_PARALLELISM);
+    }
+
+    private static void waitForSnapshotStarted(String sinkName) throws 
InterruptedException {
+        while (sinkSize(sinkName) == 0) {
+            Thread.sleep(100);
+        }
+    }
+
+    private static void waitForSinkSize(String sinkName, int expectedSize) {
+        TestCaseUtils.repeatedCheck(() -> sinkSize(sinkName) >= expectedSize);
+    }
+
+    private static int sinkSize(String sinkName) {
+        synchronized (TestValuesTableFactory.class) {
+            try {
+                return TestValuesTableFactory.getRawResults(sinkName).size();
+            } catch (IllegalArgumentException e) {
+                // job is not started yet
+                return 0;
+            }
+        }
+    }
+
+    private static List<String> fetchRows(Iterator<Row> iter, int size) {
+        List<String> rows = new ArrayList<>(size);
+        while (size > 0 && iter.hasNext()) {
+            Row row = iter.next();
+            rows.add(row.toString());
+            size--;
+        }
+        return rows;
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceTestBase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceTestBase.java
index 215dc3ec7..2b8e76500 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceTestBase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceTestBase.java
@@ -32,9 +32,11 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Rule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Network;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.lifecycle.Startables;
 
@@ -55,6 +57,8 @@ public abstract class MySqlSourceTestBase extends TestLogger {
     protected static final int DEFAULT_PARALLELISM = 4;
     protected static final MySqlContainer MYSQL_CONTAINER = 
createMySqlContainer(MySqlVersion.V5_7);
     protected InMemoryReporter metricReporter = 
InMemoryReporter.createWithRetainedMetrics();
+    public static final String INTER_CONTAINER_MYSQL_ALIAS = "mysql";
+    @ClassRule public static final Network NETWORK = Network.newNetwork();
 
     @Rule
     public final MiniClusterWithClientResource miniClusterResource =
@@ -96,6 +100,8 @@ public abstract class MySqlSourceTestBase extends TestLogger 
{
                         .withDatabaseName("flink-test")
                         .withUsername("flinkuser")
                         .withPassword("flinkpw")
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(INTER_CONTAINER_MYSQL_ALIAS)
                         .withLogConsumer(new Slf4jLogConsumer(LOG));
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlOnLineSchemaMigrationTableITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlOnLineSchemaMigrationTableITCase.java
new file mode 100644
index 000000000..92e5fc566
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlOnLineSchemaMigrationTableITCase.java
@@ -0,0 +1,598 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.table;
+
+import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.utils.TestCaseUtils;
+import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.testcontainers.DockerClientFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Random;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.api.common.JobStatus.RUNNING;
+import static org.apache.flink.cdc.common.utils.TestCaseUtils.DEFAULT_INTERVAL;
+import static org.apache.flink.cdc.common.utils.TestCaseUtils.DEFAULT_TIMEOUT;
+
+/**
+ * IT case for Evolving MySQL schema with gh-ost/pt-osc utility. See <a
+ * href="https://github.com/github/gh-ost";>github/gh-ost</a>/<a
+ * 
href="https://docs.percona.com/percona-toolkit/pt-online-schema-change.html";>doc/pt-osc</a>
 for
+ * more details.
+ */
+public class MySqlOnLineSchemaMigrationTableITCase extends MySqlSourceTestBase 
{
+    private static final MySqlContainer MYSQL8_CONTAINER =
+            createMySqlContainer(MySqlVersion.V8_0, 
"docker/server-gtids/expire-seconds/my.cnf");
+
+    private static final String TEST_USER = "mysqluser";
+    private static final String TEST_PASSWORD = "mysqlpw";
+
+    private static final String PERCONA_TOOLKIT = 
"perconalab/percona-toolkit:3.5.7";
+
+    protected static final GenericContainer<?> PERCONA_TOOLKIT_CONTAINER =
+            createPerconaToolkitContainer();
+
+    private final UniqueDatabase customerDatabase =
+            new UniqueDatabase(MYSQL8_CONTAINER, "customer", TEST_USER, 
TEST_PASSWORD);
+
+    private final StreamExecutionEnvironment env =
+            StreamExecutionEnvironment.getExecutionEnvironment();
+    private final StreamTableEnvironment tEnv =
+            StreamTableEnvironment.create(
+                    env, 
EnvironmentSettings.newInstance().inStreamingMode().build());
+
+    private static final String GH_OST_DOWNLOAD_LINK =
+            
DockerClientFactory.instance().client().versionCmd().exec().getArch().equals("amd64")
+                    ? 
"https://github.com/github/gh-ost/releases/download/v1.1.6/gh-ost-binary-linux-amd64-20231207144046.tar.gz";
+                    : 
"https://github.com/github/gh-ost/releases/download/v1.1.6/gh-ost-binary-linux-arm64-20231207144046.tar.gz";;
+
+    @BeforeClass
+    public static void beforeClass() {
+        LOG.info("Starting containers...");
+        Startables.deepStart(Stream.of(MYSQL8_CONTAINER)).join();
+        Startables.deepStart(Stream.of(PERCONA_TOOLKIT_CONTAINER)).join();
+        LOG.info("Containers are started.");
+    }
+
+    @AfterClass
+    public static void afterClass() {
+        LOG.info("Stopping containers...");
+        MYSQL8_CONTAINER.stop();
+        PERCONA_TOOLKIT_CONTAINER.close();
+        LOG.info("Containers are stopped.");
+    }
+
+    @Before
+    public void before() {
+        TestValuesTableFactory.clearAllData();
+        env.setParallelism(DEFAULT_PARALLELISM);
+        env.enableCheckpointing(200);
+        customerDatabase.createAndInitialize();
+    }
+
+    @After
+    public void after() {
+        customerDatabase.dropDatabase();
+    }
+
+    private static void installGhOstCli(Container<?> container) {
+        try {
+            execInContainer(
+                    container,
+                    "download gh-ost tarball",
+                    "curl",
+                    "-L",
+                    "-o",
+                    "/tmp/gh-ost.tar.gz",
+                    GH_OST_DOWNLOAD_LINK);
+            execInContainer(
+                    container, "unzip binary", "tar", "-xzvf", 
"/tmp/gh-ost.tar.gz", "-C", "/bin");
+        } catch (IOException | InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static GenericContainer<?> createPerconaToolkitContainer() {
+        GenericContainer<?> perconaToolkit =
+                new GenericContainer<>(PERCONA_TOOLKIT)
+                        // keep container alive
+                        .withCommand("tail", "-f", "/dev/null")
+                        .withNetwork(NETWORK)
+                        .withLogConsumer(new Slf4jLogConsumer(LOG));
+        return perconaToolkit;
+    }
+
+    @Test
+    public void testGhOstSchemaMigrationFromScratch() throws Exception {
+        LOG.info("Step 1: Install gh-ost command line utility");
+        installGhOstCli(MYSQL8_CONTAINER);
+
+        LOG.info("Step 2: Start pipeline job");
+        String sourceDDL =
+                String.format(
+                        "CREATE TABLE debezium_source ("
+                                + " `id` INT NOT NULL,"
+                                + " name STRING,"
+                                + " address STRING,"
+                                + " phone_number STRING,"
+                                + " primary key (`id`) not enforced"
+                                + ") WITH ("
+                                + " 'connector' = 'mysql-cdc',"
+                                + " 'hostname' = '%s',"
+                                + " 'port' = '%s',"
+                                + " 'username' = '%s',"
+                                + " 'password' = '%s',"
+                                + " 'database-name' = '%s',"
+                                + " 'table-name' = '%s',"
+                                + " 'scan.incremental.snapshot.enabled' = 
'%s',"
+                                + " 'server-time-zone' = 'UTC',"
+                                + " 'server-id' = '%s'"
+                                + ")",
+                        MYSQL8_CONTAINER.getHost(),
+                        MYSQL8_CONTAINER.getDatabasePort(),
+                        TEST_USER,
+                        TEST_PASSWORD,
+                        customerDatabase.getDatabaseName(),
+                        "customers",
+                        true,
+                        getServerId());
+
+        tEnv.executeSql(sourceDDL);
+
+        // async submit job
+        TableResult result = tEnv.executeSql("SELECT * FROM debezium_source");
+
+        // wait for the source startup, we don't have a better way to wait it, 
use sleep for now
+        TestCaseUtils.repeatedCheck(
+                () -> 
result.getJobClient().get().getJobStatus().get().equals(RUNNING),
+                DEFAULT_TIMEOUT,
+                DEFAULT_INTERVAL,
+                Arrays.asList(InterruptedException.class, 
NoSuchElementException.class));
+
+        CloseableIterator<Row> iterator = result.collect();
+
+        {
+            String[] expected =
+                    new String[] {
+                        "+I[101, user_1, Shanghai, 123567891234]",
+                        "+I[102, user_2, Shanghai, 123567891234]",
+                        "+I[103, user_3, Shanghai, 123567891234]",
+                        "+I[109, user_4, Shanghai, 123567891234]",
+                        "+I[110, user_5, Shanghai, 123567891234]",
+                        "+I[111, user_6, Shanghai, 123567891234]",
+                        "+I[118, user_7, Shanghai, 123567891234]",
+                        "+I[121, user_8, Shanghai, 123567891234]",
+                        "+I[123, user_9, Shanghai, 123567891234]",
+                        "+I[1009, user_10, Shanghai, 123567891234]",
+                        "+I[1010, user_11, Shanghai, 123567891234]",
+                        "+I[1011, user_12, Shanghai, 123567891234]",
+                        "+I[1012, user_13, Shanghai, 123567891234]",
+                        "+I[1013, user_14, Shanghai, 123567891234]",
+                        "+I[1014, user_15, Shanghai, 123567891234]",
+                        "+I[1015, user_16, Shanghai, 123567891234]",
+                        "+I[1016, user_17, Shanghai, 123567891234]",
+                        "+I[1017, user_18, Shanghai, 123567891234]",
+                        "+I[1018, user_19, Shanghai, 123567891234]",
+                        "+I[1019, user_20, Shanghai, 123567891234]",
+                        "+I[2000, user_21, Shanghai, 123567891234]"
+                    };
+            assertEqualsInAnyOrder(Arrays.asList(expected), 
fetchRows(iterator, expected.length));
+        }
+
+        // Wait for a little while until we're in Binlog streaming mode.
+        Thread.sleep(5_000);
+
+        {
+            LOG.info("Step 3: Evolve schema with gh-ost - ADD COLUMN");
+            execInContainer(
+                    MYSQL8_CONTAINER,
+                    "evolve schema",
+                    "gh-ost",
+                    "--user=" + TEST_USER,
+                    "--password=" + TEST_PASSWORD,
+                    "--database=" + customerDatabase.getDatabaseName(),
+                    "--table=customers",
+                    "--alter=add column ext int first",
+                    "--allow-on-master", // because we don't have a replica
+                    "--initially-drop-old-table", // drop previously generated 
temporary tables
+                    "--execute");
+
+            try (Connection connection = customerDatabase.getJdbcConnection();
+                    Statement statement = connection.createStatement()) {
+                // The new column `ext` has been inserted now
+                statement.execute(
+                        String.format(
+                                "INSERT INTO `%s`.`customers` VALUES (17, 
10000, 'Alice', 'Beijing', '123567891234');",
+                                customerDatabase.getDatabaseName()));
+            }
+
+            String[] expected =
+                    new String[] {
+                        "+I[10000, Alice, Beijing, 123567891234]",
+                    };
+            assertEqualsInAnyOrder(Arrays.asList(expected), 
fetchRows(iterator, expected.length));
+        }
+
+        {
+            LOG.info("Step 4: Evolve schema with gh-ost - MODIFY COLUMN");
+            execInContainer(
+                    MYSQL8_CONTAINER,
+                    "evolve schema",
+                    "gh-ost",
+                    "--user=" + TEST_USER,
+                    "--password=" + TEST_PASSWORD,
+                    "--database=" + customerDatabase.getDatabaseName(),
+                    "--table=customers",
+                    "--alter=modify column ext double",
+                    "--allow-on-master",
+                    "--initially-drop-old-table",
+                    "--execute");
+
+            try (Connection connection = customerDatabase.getJdbcConnection();
+                    Statement statement = connection.createStatement()) {
+                statement.execute(
+                        String.format(
+                                "INSERT INTO `%s`.`customers` VALUES 
(2.718281828, 10001, 'Bob', 'Chongqing', '123567891234');",
+                                customerDatabase.getDatabaseName()));
+            }
+
+            String[] expected =
+                    new String[] {
+                        "+I[10001, Bob, Chongqing, 123567891234]",
+                    };
+            assertEqualsInAnyOrder(Arrays.asList(expected), 
fetchRows(iterator, expected.length));
+        }
+
+        {
+            LOG.info("Step 5: Evolve schema with gh-ost - DROP COLUMN");
+            execInContainer(
+                    MYSQL8_CONTAINER,
+                    "evolve schema",
+                    "gh-ost",
+                    "--user=" + TEST_USER,
+                    "--password=" + TEST_PASSWORD,
+                    "--database=" + customerDatabase.getDatabaseName(),
+                    "--table=customers",
+                    "--alter=drop column ext",
+                    "--allow-on-master",
+                    "--initially-drop-old-table",
+                    "--execute");
+
+            try (Connection connection = customerDatabase.getJdbcConnection();
+                    Statement statement = connection.createStatement()) {
+                statement.execute(
+                        String.format(
+                                "INSERT INTO `%s`.`customers` VALUES (10002, 
'Cicada', 'Urumqi', '123567891234');",
+                                customerDatabase.getDatabaseName()));
+            }
+
+            String[] expected =
+                    new String[] {
+                        "+I[10002, Cicada, Urumqi, 123567891234]",
+                    };
+            assertEqualsInAnyOrder(Arrays.asList(expected), 
fetchRows(iterator, expected.length));
+        }
+    }
+
+    @Test
+    public void testPtOscSchemaMigrationFromScratch() throws Exception {
+        LOG.info("Step 1: Start pipeline job");
+        String sourceDDL =
+                String.format(
+                        "CREATE TABLE debezium_source ("
+                                + " `id` INT NOT NULL,"
+                                + " name STRING,"
+                                + " address STRING,"
+                                + " phone_number STRING,"
+                                + " primary key (`id`) not enforced"
+                                + ") WITH ("
+                                + " 'connector' = 'mysql-cdc',"
+                                + " 'hostname' = '%s',"
+                                + " 'port' = '%s',"
+                                + " 'username' = '%s',"
+                                + " 'password' = '%s',"
+                                + " 'database-name' = '%s',"
+                                + " 'table-name' = '%s',"
+                                + " 'scan.incremental.snapshot.enabled' = 
'%s',"
+                                + " 'server-time-zone' = 'UTC',"
+                                + " 'server-id' = '%s'"
+                                + ")",
+                        MYSQL8_CONTAINER.getHost(),
+                        MYSQL8_CONTAINER.getDatabasePort(),
+                        TEST_USER,
+                        TEST_PASSWORD,
+                        customerDatabase.getDatabaseName(),
+                        "customers",
+                        true,
+                        getServerId());
+
+        tEnv.executeSql(sourceDDL);
+
+        // async submit job
+        TableResult result = tEnv.executeSql("SELECT * FROM debezium_source");
+
+        // wait for the source startup, we don't have a better way to wait it, 
use sleep for now
+        TestCaseUtils.repeatedCheck(
+                () -> 
result.getJobClient().get().getJobStatus().get().equals(RUNNING),
+                DEFAULT_TIMEOUT,
+                DEFAULT_INTERVAL,
+                Arrays.asList(InterruptedException.class, 
NoSuchElementException.class));
+
+        CloseableIterator<Row> iterator = result.collect();
+        {
+            String[] expected =
+                    new String[] {
+                        "+I[101, user_1, Shanghai, 123567891234]",
+                        "+I[102, user_2, Shanghai, 123567891234]",
+                        "+I[103, user_3, Shanghai, 123567891234]",
+                        "+I[109, user_4, Shanghai, 123567891234]",
+                        "+I[110, user_5, Shanghai, 123567891234]",
+                        "+I[111, user_6, Shanghai, 123567891234]",
+                        "+I[118, user_7, Shanghai, 123567891234]",
+                        "+I[121, user_8, Shanghai, 123567891234]",
+                        "+I[123, user_9, Shanghai, 123567891234]",
+                        "+I[1009, user_10, Shanghai, 123567891234]",
+                        "+I[1010, user_11, Shanghai, 123567891234]",
+                        "+I[1011, user_12, Shanghai, 123567891234]",
+                        "+I[1012, user_13, Shanghai, 123567891234]",
+                        "+I[1013, user_14, Shanghai, 123567891234]",
+                        "+I[1014, user_15, Shanghai, 123567891234]",
+                        "+I[1015, user_16, Shanghai, 123567891234]",
+                        "+I[1016, user_17, Shanghai, 123567891234]",
+                        "+I[1017, user_18, Shanghai, 123567891234]",
+                        "+I[1018, user_19, Shanghai, 123567891234]",
+                        "+I[1019, user_20, Shanghai, 123567891234]",
+                        "+I[2000, user_21, Shanghai, 123567891234]"
+                    };
+            assertEqualsInAnyOrder(Arrays.asList(expected), 
fetchRows(iterator, expected.length));
+        }
+
+        // Wait for a little while until we're in Binlog streaming mode.
+        Thread.sleep(5_000);
+
+        LOG.info("Step 2: Evolve schema with pt-osc - ADD COLUMN");
+        execInContainer(
+                PERCONA_TOOLKIT_CONTAINER,
+                "evolve schema",
+                "pt-online-schema-change",
+                "--user=" + TEST_USER,
+                "--host=" + INTER_CONTAINER_MYSQL_ALIAS,
+                "--password=" + TEST_PASSWORD,
+                "P=3306,t=customers,D=" + customerDatabase.getDatabaseName(),
+                "--alter",
+                "add column ext int FIRST",
+                "--charset=utf8",
+                "--recursion-method=NONE", // Do not look for slave nodes
+                "--print",
+                "--execute");
+
+        try (Connection connection = customerDatabase.getJdbcConnection();
+                Statement statement = connection.createStatement()) {
+            // The new column `ext` has been inserted now
+            statement.execute(
+                    String.format(
+                            "INSERT INTO `%s`.`customers` VALUES (17, 10000, 
'Alice', 'Beijing', '123567891234');",
+                            customerDatabase.getDatabaseName()));
+
+            String[] expected =
+                    new String[] {
+                        "+I[10000, Alice, Beijing, 123567891234]",
+                    };
+            assertEqualsInAnyOrder(Arrays.asList(expected), 
fetchRows(iterator, expected.length));
+        }
+
+        LOG.info("Step 3: Evolve schema with pt-osc - MODIFY COLUMN");
+        execInContainer(
+                PERCONA_TOOLKIT_CONTAINER,
+                "evolve schema",
+                "pt-online-schema-change",
+                "--user=" + TEST_USER,
+                "--host=" + INTER_CONTAINER_MYSQL_ALIAS,
+                "--password=" + TEST_PASSWORD,
+                "P=3306,t=customers,D=" + customerDatabase.getDatabaseName(),
+                "--alter",
+                "modify column ext double",
+                "--charset=utf8",
+                "--recursion-method=NONE",
+                "--print",
+                "--execute");
+
+        try (Connection connection = customerDatabase.getJdbcConnection();
+                Statement statement = connection.createStatement()) {
+            statement.execute(
+                    String.format(
+                            "INSERT INTO `%s`.`customers` VALUES (2.718281828, 
10001, 'Bob', 'Chongqing', '123567891234');",
+                            customerDatabase.getDatabaseName()));
+            String[] expected =
+                    new String[] {
+                        "+I[10001, Bob, Chongqing, 123567891234]",
+                    };
+            assertEqualsInAnyOrder(Arrays.asList(expected), 
fetchRows(iterator, expected.length));
+        }
+
+        LOG.info("Step 4: Evolve schema with pt-osc - DROP COLUMN");
+        execInContainer(
+                PERCONA_TOOLKIT_CONTAINER,
+                "evolve schema",
+                "pt-online-schema-change",
+                "--user=" + TEST_USER,
+                "--host=" + INTER_CONTAINER_MYSQL_ALIAS,
+                "--password=" + TEST_PASSWORD,
+                "P=3306,t=customers,D=" + customerDatabase.getDatabaseName(),
+                "--alter",
+                "drop column ext",
+                "--charset=utf8",
+                "--recursion-method=NONE",
+                "--print",
+                "--execute");
+
+        try (Connection connection = customerDatabase.getJdbcConnection();
+                Statement statement = connection.createStatement()) {
+            statement.execute(
+                    String.format(
+                            "INSERT INTO `%s`.`customers` VALUES (10002, 
'Cicada', 'Urumqi', '123567891234');",
+                            customerDatabase.getDatabaseName()));
+            String[] expected =
+                    new String[] {
+                        "+I[10002, Cicada, Urumqi, 123567891234]",
+                    };
+            assertEqualsInAnyOrder(Arrays.asList(expected), 
fetchRows(iterator, expected.length));
+        }
+    }
+
+    private static void execInContainer(Container<?> container, String prompt, 
String... commands)
+            throws IOException, InterruptedException {
+        {
+            LOG.info(
+                    "Starting to {} with the following command: `{}`",
+                    prompt,
+                    String.join(" ", commands));
+            Container.ExecResult execResult = 
container.execInContainer(commands);
+            if (execResult.getExitCode() == 0) {
+                LOG.info("Successfully {}. Stdout: {}", prompt, 
execResult.getStdout());
+            } else {
+                LOG.error(
+                        "Failed to {}. Exit code: {}, Stdout: {}, Stderr: {}",
+                        prompt,
+                        execResult.getExitCode(),
+                        execResult.getStdout(),
+                        execResult.getStderr());
+                throw new IOException("Failed to execute commands: " + 
String.join(" ", commands));
+            }
+        }
+    }
+
+    private List<Event> getSnapshotExpected(TableId tableId, Schema schema) {
+        return Stream.of(
+                        generate(schema, 101, "user_1", "Shanghai", 
"123567891234"),
+                        generate(schema, 102, "user_2", "Shanghai", 
"123567891234"),
+                        generate(schema, 103, "user_3", "Shanghai", 
"123567891234"),
+                        generate(schema, 109, "user_4", "Shanghai", 
"123567891234"),
+                        generate(schema, 110, "user_5", "Shanghai", 
"123567891234"),
+                        generate(schema, 111, "user_6", "Shanghai", 
"123567891234"),
+                        generate(schema, 118, "user_7", "Shanghai", 
"123567891234"),
+                        generate(schema, 121, "user_8", "Shanghai", 
"123567891234"),
+                        generate(schema, 123, "user_9", "Shanghai", 
"123567891234"),
+                        generate(schema, 1009, "user_10", "Shanghai", 
"123567891234"),
+                        generate(schema, 1010, "user_11", "Shanghai", 
"123567891234"),
+                        generate(schema, 1011, "user_12", "Shanghai", 
"123567891234"),
+                        generate(schema, 1012, "user_13", "Shanghai", 
"123567891234"),
+                        generate(schema, 1013, "user_14", "Shanghai", 
"123567891234"),
+                        generate(schema, 1014, "user_15", "Shanghai", 
"123567891234"),
+                        generate(schema, 1015, "user_16", "Shanghai", 
"123567891234"),
+                        generate(schema, 1016, "user_17", "Shanghai", 
"123567891234"),
+                        generate(schema, 1017, "user_18", "Shanghai", 
"123567891234"),
+                        generate(schema, 1018, "user_19", "Shanghai", 
"123567891234"),
+                        generate(schema, 1019, "user_20", "Shanghai", 
"123567891234"),
+                        generate(schema, 2000, "user_21", "Shanghai", 
"123567891234"))
+                .map(record -> DataChangeEvent.insertEvent(tableId, record))
+                .collect(Collectors.toList());
+    }
+
+    private BinaryRecordData generate(Schema schema, Object... fields) {
+        return (new 
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0])))
+                .generate(
+                        Arrays.stream(fields)
+                                .map(
+                                        e ->
+                                                (e instanceof String)
+                                                        ? 
BinaryStringData.fromString((String) e)
+                                                        : e)
+                                .toArray());
+    }
+
+    private String getServerId() {
+        final Random random = new Random();
+        int serverId = random.nextInt(100) + 5400;
+        return serverId + "-" + (serverId + env.getParallelism());
+    }
+
+    protected String getServerId(int base) {
+        return base + "-" + (base + DEFAULT_PARALLELISM);
+    }
+
+    private static void waitForSnapshotStarted(String sinkName) throws 
InterruptedException {
+        while (sinkSize(sinkName) == 0) {
+            Thread.sleep(100);
+        }
+    }
+
+    private static void waitForSinkSize(String sinkName, int expectedSize)
+            throws InterruptedException {
+        while (sinkSize(sinkName) < expectedSize) {
+            Thread.sleep(100);
+        }
+    }
+
+    private static int sinkSize(String sinkName) {
+        synchronized (TestValuesTableFactory.class) {
+            try {
+                return TestValuesTableFactory.getRawResults(sinkName).size();
+            } catch (IllegalArgumentException e) {
+                // job is not started yet
+                return 0;
+            }
+        }
+    }
+
+    private static List<String> fetchRows(Iterator<Row> iter, int size) {
+        List<String> rows = new ArrayList<>(size);
+        while (size > 0 && iter.hasNext()) {
+            Row row = iter.next();
+            rows.add(row.toString());
+            size--;
+        }
+        return rows;
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
index 8339a1db5..e5acc934f 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
@@ -51,6 +51,7 @@ import static 
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOpt
 import static 
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_MAX_RETRIES;
 import static 
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_TIMEOUT;
 import static 
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.HEARTBEAT_INTERVAL;
+import static 
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES;
 import static 
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
 import static 
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
 import static 
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
@@ -127,7 +128,8 @@ public class MySqlTableSourceFactoryTest {
                         new Properties(),
                         HEARTBEAT_INTERVAL.defaultValue(),
                         null,
-                        
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
+                        SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
+                        PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
         assertEquals(expectedSource, actualSource);
     }
 
@@ -173,7 +175,8 @@ public class MySqlTableSourceFactoryTest {
                         new Properties(),
                         HEARTBEAT_INTERVAL.defaultValue(),
                         "testCol",
-                        
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
+                        SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
+                        PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
         assertEquals(expectedSource, actualSource);
     }
 
@@ -215,7 +218,8 @@ public class MySqlTableSourceFactoryTest {
                         new Properties(),
                         HEARTBEAT_INTERVAL.defaultValue(),
                         null,
-                        
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
+                        SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
+                        PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
         assertEquals(expectedSource, actualSource);
     }
 
@@ -255,7 +259,8 @@ public class MySqlTableSourceFactoryTest {
                         new Properties(),
                         HEARTBEAT_INTERVAL.defaultValue(),
                         null,
-                        
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
+                        SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
+                        PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
         assertEquals(expectedSource, actualSource);
     }
 
@@ -311,7 +316,8 @@ public class MySqlTableSourceFactoryTest {
                         jdbcProperties,
                         Duration.ofMillis(15213),
                         "testCol",
-                        true);
+                        true,
+                        PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
         assertEquals(expectedSource, actualSource);
         assertTrue(actualSource instanceof MySqlTableSource);
         MySqlTableSource actualMySqlTableSource = (MySqlTableSource) 
actualSource;
@@ -365,7 +371,8 @@ public class MySqlTableSourceFactoryTest {
                         new Properties(),
                         HEARTBEAT_INTERVAL.defaultValue(),
                         null,
-                        
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
+                        SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
+                        PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
         assertEquals(expectedSource, actualSource);
     }
 
@@ -403,7 +410,8 @@ public class MySqlTableSourceFactoryTest {
                         new Properties(),
                         HEARTBEAT_INTERVAL.defaultValue(),
                         null,
-                        
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
+                        SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
+                        PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
         assertEquals(expectedSource, actualSource);
     }
 
@@ -442,7 +450,8 @@ public class MySqlTableSourceFactoryTest {
                         new Properties(),
                         HEARTBEAT_INTERVAL.defaultValue(),
                         null,
-                        
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
+                        SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
+                        PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
         assertEquals(expectedSource, actualSource);
     }
 
@@ -482,7 +491,8 @@ public class MySqlTableSourceFactoryTest {
                         new Properties(),
                         HEARTBEAT_INTERVAL.defaultValue(),
                         null,
-                        
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
+                        SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
+                        PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
         assertEquals(expectedSource, actualSource);
     }
 
@@ -520,7 +530,8 @@ public class MySqlTableSourceFactoryTest {
                         new Properties(),
                         HEARTBEAT_INTERVAL.defaultValue(),
                         null,
-                        
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
+                        SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
+                        PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
         assertEquals(expectedSource, actualSource);
     }
 
@@ -563,7 +574,8 @@ public class MySqlTableSourceFactoryTest {
                         new Properties(),
                         HEARTBEAT_INTERVAL.defaultValue(),
                         null,
-                        
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
+                        SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
+                        PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
         expectedSource.producedDataType = 
SCHEMA_WITH_METADATA.toSourceRowDataType();
         expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name");
 

Reply via email to