This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new cd02218e2 [INLONG-7554][Sort] MySQL CDC supports parsing gh-ost 
records (#7750)
cd02218e2 is described below

commit cd02218e27c6c5f656e532328bde4e7a5b32552f
Author: emhui <[email protected]>
AuthorDate: Sun Apr 23 11:24:47 2023 +0800

    [INLONG-7554][Sort] MySQL CDC supports parsing gh-ost records (#7750)
---
 .../org/apache/inlong/sort/base/Constants.java     |  30 ++++++
 .../sort/cdc/mysql/source/MySqlSourceBuilder.java  |  10 ++
 .../cdc/mysql/source/config/MySqlSourceConfig.java |  16 ++-
 .../source/config/MySqlSourceConfigFactory.java    |  16 ++-
 .../mysql/source/reader/MySqlRecordEmitter.java    |  32 ++++--
 .../cdc/mysql/source/split/MySqlBinlogSplit.java   |  22 +++-
 .../mysql/source/split/MySqlBinlogSplitState.java  |  13 ++-
 .../mysql/source/split/MySqlSplitSerializer.java   |  41 +++++++-
 .../sort/cdc/mysql/source/utils/GhostUtils.java    | 112 +++++++++++++++++++++
 .../sort/cdc/mysql/source/utils/RecordUtils.java   |  18 ++++
 .../mysql/table/MySqlTableInlongSourceFactory.java |  10 +-
 .../sort/cdc/mysql/table/MySqlTableSource.java     |  28 +++++-
 .../apache/inlong/sort/parser/AllMigrateTest.java  |   1 +
 13 files changed, 325 insertions(+), 24 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index cd7acc921..757be7260 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -114,6 +114,18 @@ public final class Constants {
      * It is used for metric data to build schema identify
      */
     public static final String SEMICOLON = ".";
+
+    /**
+     * The caret symbol (^) at the start of a regular expression to indicate
+     * that a match must occur at the beginning of the searched text.
+     */
+    public static final String CARET = "^";
+
+    /**
+     * The dollar symbol ($) at the end of a regular expression to indicate
+     * that a match must occur at the ending of the searched text.
+     */
+    public static final String DOLLAR = "$";
     /**
      * It is used for metric data to spilt schema identify
      */
@@ -137,6 +149,12 @@ public final class Constants {
 
     public static final String DDL_FIELD_NAME = "ddl";
 
+    public static final String DDL_OP_ALTER = "ALTER";
+
+    public static final String DDL_OP_DROP = "DROP";
+
+    public static final String GHOST_TAG = "/* gh-ost */";
+
     public static final ConfigOption<String> INLONG_METRIC =
             ConfigOptions.key("inlong.metric.labels")
                     .stringType()
@@ -322,4 +340,16 @@ public final class Constants {
             .defaultValue(10240L)
             .withDescription(
                     "The flush max bytes, over this number in batch, will 
flush data. The default value is 10KB.");
+    public static final ConfigOption<Boolean> GH_OST_DDL_CHANGE = ConfigOptions
+            .key("gh-ost.ddl.change")
+            .booleanType()
+            .defaultValue(false)
+            .withDescription(
+                    "Whether parse ddl changes of gh-ost, default value is 
'false'.");
+    public static final ConfigOption<String> GH_OST_TABLE_REGEX = ConfigOptions
+            .key("gh-ost.table.regex")
+            .stringType()
+            .defaultValue("^_(.*)_(gho|ghc|del)$")
+            .withDescription(
+                    "Matcher the original table name from the ddl of gh-ost.");
 }
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSourceBuilder.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSourceBuilder.java
index 8cfaa0bae..726f5cae6 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSourceBuilder.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSourceBuilder.java
@@ -275,4 +275,14 @@ public class MySqlSourceBuilder<T> {
         this.configFactory.includeIncremental(includeIncremental);
         return this;
     }
+
+    public MySqlSourceBuilder<T> ghostDdlChange(boolean ghostDdlChange) {
+        this.configFactory.ghostDdlChange(ghostDdlChange);
+        return this;
+    }
+
+    public MySqlSourceBuilder<T> ghostTableRegex(String ghostTableRegex) {
+        this.configFactory.ghostTableRegex(ghostTableRegex);
+        return this;
+    }
 }
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfig.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfig.java
index 926bbebce..71a7dec3e 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfig.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfig.java
@@ -70,6 +70,8 @@ public class MySqlSourceConfig implements Serializable {
     private final String inlongMetric;
     private final String inlongAudit;
     private final boolean includeIncremental;
+    private final boolean ghostDdlChange;
+    private final String ghostTableRegex;
 
     MySqlSourceConfig(
             String hostname,
@@ -95,7 +97,9 @@ public class MySqlSourceConfig implements Serializable {
             Properties jdbcProperties,
             String inlongMetric,
             String inlongAudit,
-            boolean includeIncremental) {
+            boolean includeIncremental,
+            boolean ghostDdlChange,
+            String ghostTableRegex) {
         this.hostname = checkNotNull(hostname);
         this.port = port;
         this.username = checkNotNull(username);
@@ -122,6 +126,8 @@ public class MySqlSourceConfig implements Serializable {
         this.inlongMetric = inlongMetric;
         this.inlongAudit = inlongAudit;
         this.includeIncremental = includeIncremental;
+        this.ghostDdlChange = ghostDdlChange;
+        this.ghostTableRegex = ghostTableRegex;
     }
 
     public String getHostname() {
@@ -232,4 +238,12 @@ public class MySqlSourceConfig implements Serializable {
     public boolean isIncludeIncremental() {
         return includeIncremental;
     }
+
+    public boolean isGhostDdlChange() {
+        return ghostDdlChange;
+    }
+
+    public String getGhostTableRegex() {
+        return ghostTableRegex;
+    }
 }
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfigFactory.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfigFactory.java
index 82b90d561..6b3d82f20 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfigFactory.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfigFactory.java
@@ -76,6 +76,8 @@ public class MySqlSourceConfigFactory implements Serializable 
{
     private String inlongMetric;
     private String inlongAudit;
     private boolean includeIncremental;
+    private boolean ghostDdlChange;
+    private String ghostTableRegex;
 
     public MySqlSourceConfigFactory inlongMetric(String inlongMetric) {
         this.inlongMetric = inlongMetric;
@@ -92,6 +94,16 @@ public class MySqlSourceConfigFactory implements 
Serializable {
         return this;
     }
 
+    public MySqlSourceConfigFactory ghostDdlChange(boolean ghostDdlChange) {
+        this.ghostDdlChange = ghostDdlChange;
+        return this;
+    }
+
+    public MySqlSourceConfigFactory ghostTableRegex(String ghostTableRegex) {
+        this.ghostTableRegex = ghostTableRegex;
+        return this;
+    }
+
     public MySqlSourceConfigFactory hostname(String hostname) {
         this.hostname = hostname;
         return this;
@@ -376,6 +388,8 @@ public class MySqlSourceConfigFactory implements 
Serializable {
                 jdbcProperties,
                 inlongMetric,
                 inlongAudit,
-                includeIncremental);
+                includeIncremental,
+                ghostDdlChange,
+                ghostTableRegex);
     }
 }
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
index 695c4363d..2d1c8ded8 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
@@ -25,6 +25,7 @@ import io.debezium.relational.ColumnFilterMode;
 import io.debezium.relational.TableId;
 import io.debezium.relational.Tables;
 import io.debezium.relational.history.HistoryRecord;
+import io.debezium.relational.history.HistoryRecord.Fields;
 import io.debezium.relational.history.TableChanges;
 import io.debezium.relational.history.TableChanges.TableChange;
 import java.util.Map.Entry;
@@ -33,9 +34,9 @@ import net.sf.jsqlparser.parser.CCJSqlParserUtil;
 import net.sf.jsqlparser.schema.Table;
 import net.sf.jsqlparser.statement.Statement;
 import net.sf.jsqlparser.statement.alter.RenameTableStatement;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.connector.base.source.reader.RecordEmitter;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.util.Collector;
 import org.apache.inlong.sort.base.enums.ReadPhase;
 import org.apache.inlong.sort.cdc.base.debezium.DebeziumDeserializationSchema;
@@ -53,8 +54,8 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
-import static org.apache.inlong.sort.base.Constants.DDL_FIELD_NAME;
-import static 
org.apache.inlong.sort.cdc.base.relational.JdbcSourceEventDispatcher.HISTORY_RECORD_FIELD;
+import static 
org.apache.inlong.sort.cdc.mysql.source.utils.GhostUtils.collectGhostDdl;
+import static 
org.apache.inlong.sort.cdc.mysql.source.utils.GhostUtils.updateGhostDdlElement;
 import static 
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getBinlogPosition;
 import static 
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getFetchTimestamp;
 import static 
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getHistoryRecord;
@@ -97,7 +98,8 @@ public final class MySqlRecordEmitter<T>
     private volatile long snapProcessTime = 0L;
 
     private boolean includeIncremental;
-    public final ObjectMapper objectMapper = new ObjectMapper();
+    private boolean ghostDdlChange;
+    private String ghostTableRegex;
 
     public MySqlRecordEmitter(
             DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
@@ -110,6 +112,8 @@ public final class MySqlRecordEmitter<T>
         this.includeIncremental = sourceConfig.isIncludeIncremental();
         this.columnNameFilter = ColumnFilterUtil.createColumnFilter(
                 sourceConfig.getDbzConfiguration(), ColumnFilterMode.CATALOG);
+        this.ghostDdlChange = sourceConfig.isGhostDdlChange();
+        this.ghostTableRegex = sourceConfig.getGhostTableRegex();
     }
 
     @Override
@@ -131,6 +135,9 @@ public final class MySqlRecordEmitter<T>
                 
splitState.asBinlogSplitState().recordSchema(tableChange.getId(), tableChange);
                 if (includeSchemaChanges) {
                     TableChange newTableChange = 
ColumnFilterUtil.createTableChange(tableChange, columnNameFilter);
+                    if (ghostDdlChange) {
+                        updateGhostDdlElement(element, splitState, 
historyRecord, ghostTableRegex);
+                    }
                     outputDdlElement(element, output, splitState, 
newTableChange);
                 }
             }
@@ -139,9 +146,12 @@ public final class MySqlRecordEmitter<T>
                 TableId tableId = RecordUtils.getTableId(element);
                 // if this table is one of the captured tables, output the ddl 
element.
                 if 
(splitState.getMySQLSplit().getTableSchemas().containsKey(tableId)
-                        || shouldOutputRenameDdl(element, tableId)) {
+                        || shouldOutputRenameDdl(historyRecord, tableId)) {
                     outputDdlElement(element, output, splitState, null);
                 }
+                if (ghostDdlChange) {
+                    collectGhostDdl(element, splitState, historyRecord, 
ghostTableRegex);
+                }
             }
 
         } else if (isDataChangeRecord(element)) {
@@ -200,12 +210,14 @@ public final class MySqlRecordEmitter<T>
     /**
      * if rename operation is "rename a to b" where a is the captured table
      * this method extract table names a and b, if any of table name is the 
captured table
-     * we should output ddl element
+     * we should output ddl element.
      */
-    private boolean shouldOutputRenameDdl(SourceRecord element, TableId 
tableId) {
+    private boolean shouldOutputRenameDdl(HistoryRecord historyRecord, TableId 
tableId) {
+        String ddl = historyRecord.document().getString(Fields.DDL_STATEMENTS);
+        if (StringUtils.isBlank(ddl)) {
+            return false;
+        }
         try {
-            String ddl = objectMapper.readTree(((Struct) 
element.value()).get(HISTORY_RECORD_FIELD).toString())
-                    .get(DDL_FIELD_NAME).asText();
             Statement statement = CCJSqlParserUtil.parse(ddl);
             if (statement instanceof RenameTableStatement) {
                 RenameTableStatement renameTableStatement = 
(RenameTableStatement) statement;
@@ -220,7 +232,7 @@ public final class MySqlRecordEmitter<T>
                 }
             }
         } catch (Exception e) {
-            LOG.error("parse ddl error {}", element, e);
+            LOG.error("parse ddl error {}", historyRecord, e);
         }
         return false;
     }
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlBinlogSplit.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlBinlogSplit.java
index 566343a76..6fb26a875 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlBinlogSplit.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlBinlogSplit.java
@@ -41,6 +41,7 @@ public class MySqlBinlogSplit extends MySqlSplit {
     private final boolean isSuspended;
     @Nullable
     transient byte[] serializedFormCache;
+    private final Map<TableId, String> tableDdls;
 
     public MySqlBinlogSplit(
             String splitId,
@@ -49,7 +50,8 @@ public class MySqlBinlogSplit extends MySqlSplit {
             List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos,
             Map<TableId, TableChange> tableSchemas,
             int totalFinishedSplitSize,
-            boolean isSuspended) {
+            boolean isSuspended,
+            Map<TableId, String> tableDdls) {
         super(splitId);
         this.startingOffset = startingOffset;
         this.endingOffset = endingOffset;
@@ -57,6 +59,7 @@ public class MySqlBinlogSplit extends MySqlSplit {
         this.tableSchemas = tableSchemas;
         this.totalFinishedSplitSize = totalFinishedSplitSize;
         this.isSuspended = isSuspended;
+        this.tableDdls = tableDdls;
     }
 
     public MySqlBinlogSplit(
@@ -73,6 +76,7 @@ public class MySqlBinlogSplit extends MySqlSplit {
         this.tableSchemas = tableSchemas;
         this.totalFinishedSplitSize = totalFinishedSplitSize;
         this.isSuspended = false;
+        this.tableDdls = new HashMap<>();
     }
 
     // -------------------------------------------------------------------
@@ -88,7 +92,8 @@ public class MySqlBinlogSplit extends MySqlSplit {
                 splitInfos,
                 binlogSplit.getTableSchemas(),
                 binlogSplit.getTotalFinishedSplitSize(),
-                binlogSplit.isSuspended());
+                binlogSplit.isSuspended(),
+                binlogSplit.getTableDdls());
     }
 
     public static MySqlBinlogSplit fillTableSchemas(
@@ -101,7 +106,8 @@ public class MySqlBinlogSplit extends MySqlSplit {
                 binlogSplit.getFinishedSnapshotSplitInfos(),
                 tableSchemas,
                 binlogSplit.getTotalFinishedSplitSize(),
-                binlogSplit.isSuspended());
+                binlogSplit.isSuspended(),
+                binlogSplit.getTableDdls());
     }
 
     public static MySqlBinlogSplit toNormalBinlogSplit(
@@ -113,7 +119,8 @@ public class MySqlBinlogSplit extends MySqlSplit {
                 suspendedBinlogSplit.getFinishedSnapshotSplitInfos(),
                 suspendedBinlogSplit.getTableSchemas(),
                 totalFinishedSplitSize,
-                false);
+                false,
+                suspendedBinlogSplit.getTableDdls());
     }
 
     public static MySqlBinlogSplit toSuspendedBinlogSplit(MySqlBinlogSplit 
normalBinlogSplit) {
@@ -124,7 +131,8 @@ public class MySqlBinlogSplit extends MySqlSplit {
                 new ArrayList<>(),
                 new HashMap<>(),
                 normalBinlogSplit.getTotalFinishedSplitSize(),
-                true);
+                true,
+                normalBinlogSplit.getTableDdls());
     }
 
     public BinlogOffset getStartingOffset() {
@@ -156,6 +164,10 @@ public class MySqlBinlogSplit extends MySqlSplit {
         return totalFinishedSplitSize == finishedSnapshotSplitInfos.size();
     }
 
+    public Map<TableId, String> getTableDdls() {
+        return tableDdls;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlBinlogSplitState.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlBinlogSplitState.java
index 92b82d198..cfea39a59 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlBinlogSplitState.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlBinlogSplitState.java
@@ -34,12 +34,14 @@ public class MySqlBinlogSplitState extends MySqlSplitState {
     private BinlogOffset startingOffset;
     @Nullable
     private BinlogOffset endingOffset;
+    private Map<TableId, String> tableDdls;
 
     public MySqlBinlogSplitState(MySqlBinlogSplit split) {
         super(split);
         this.startingOffset = split.getStartingOffset();
         this.endingOffset = split.getEndingOffset();
         this.tableSchemas = split.getTableSchemas();
+        this.tableDdls = split.getTableDdls();
     }
 
     @Nullable
@@ -77,7 +79,8 @@ public class MySqlBinlogSplitState extends MySqlSplitState {
                 binlogSplit.asBinlogSplit().getFinishedSnapshotSplitInfos(),
                 getTableSchemas(),
                 binlogSplit.getTotalFinishedSplitSize(),
-                binlogSplit.isSuspended());
+                binlogSplit.isSuspended(),
+                binlogSplit.getTableDdls());
     }
 
     @Override
@@ -91,4 +94,12 @@ public class MySqlBinlogSplitState extends MySqlSplitState {
                 + split
                 + '}';
     }
+
+    public Map<TableId, String> getTableDdls() {
+        return tableDdls;
+    }
+
+    public void recordTableDdl(TableId tableId, String ddl) {
+        this.tableDdls.put(tableId, ddl);
+    }
 }
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java
index 93c3b4353..ae8852324 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java
@@ -104,6 +104,41 @@ public final class MySqlSplitSerializer implements 
SimpleVersionedSerializer<MyS
         return tableSchemas;
     }
 
+    public static void writeTableDdls(
+            Map<TableId, String> tableDdls, DataOutputSerializer out) throws 
IOException {
+        final int size = tableDdls.size();
+        out.writeInt(size);
+        for (Map.Entry<TableId, String> entry : tableDdls.entrySet()) {
+            out.writeUTF(entry.getKey().toString());
+            out.writeUTF(entry.getValue());
+        }
+    }
+
+    public static Map<TableId, String> readTableDdls(int version, 
DataInputDeserializer in)
+            throws IOException {
+        Map<TableId, String> tableDdls = new HashMap<>();
+        if (in.available() <= 0) {
+            return tableDdls;
+        }
+        final int size = in.readInt();
+        for (int i = 0; i < size; i++) {
+            TableId tableId = TableId.parse(in.readUTF());
+            final String ddl;
+            switch (version) {
+                case 1:
+                case 2:
+                case 3:
+                case 4:
+                    ddl = in.readUTF();
+                    break;
+                default:
+                    throw new IOException("Unknown version: " + version);
+            }
+            tableDdls.put(tableId, ddl);
+        }
+        return tableDdls;
+    }
+
     private static void writeFinishedSplitsInfo(
             List<FinishedSnapshotSplitInfo> finishedSplitsInfo, 
DataOutputSerializer out)
             throws IOException {
@@ -228,6 +263,7 @@ public final class MySqlSplitSerializer implements 
SimpleVersionedSerializer<MyS
             writeTableSchemas(binlogSplit.getTableSchemas(), out);
             out.writeInt(binlogSplit.getTotalFinishedSplitSize());
             out.writeBoolean(binlogSplit.isSuspended());
+            writeTableDdls(binlogSplit.getTableDdls(), out);
             final byte[] result = out.getCopyOfBuffer();
             out.clear();
             // optimization: cache the serialized from, so we avoid the byte 
work during repeated
@@ -296,10 +332,12 @@ public final class MySqlSplitSerializer implements 
SimpleVersionedSerializer<MyS
             Map<TableId, TableChange> tableChangeMap = 
readTableSchemas(version, in);
             int totalFinishedSplitSize = finishedSplitsInfo.size();
             boolean isSuspended = false;
+            Map<TableId, String> tableDdls = null;
             if (version >= 3) {
                 totalFinishedSplitSize = in.readInt();
                 if (version > 3) {
                     isSuspended = in.readBoolean();
+                    tableDdls = readTableDdls(version, in);
                 }
             }
             in.releaseArrays();
@@ -310,7 +348,8 @@ public final class MySqlSplitSerializer implements 
SimpleVersionedSerializer<MyS
                     finishedSplitsInfo,
                     tableChangeMap,
                     totalFinishedSplitSize,
-                    isSuspended);
+                    isSuspended,
+                    tableDdls);
         } else if (splitKind == METRIC_SPLIT_FLAG) {
             long numBytesIn = 0L;
             long numRecordsIn = 0L;
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/GhostUtils.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/GhostUtils.java
new file mode 100644
index 000000000..77f769c4d
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/GhostUtils.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.mysql.source.utils;
+
+import static org.apache.inlong.sort.base.Constants.CARET;
+import static org.apache.inlong.sort.base.Constants.DDL_OP_ALTER;
+import static org.apache.inlong.sort.base.Constants.DOLLAR;
+import static org.apache.inlong.sort.base.Constants.GHOST_TAG;
+import static org.apache.inlong.sort.base.Constants.TABLE_NAME;
+import static 
org.apache.inlong.sort.cdc.base.relational.JdbcSourceEventDispatcher.HISTORY_RECORD_FIELD;
+
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.HistoryRecord;
+import io.debezium.relational.history.HistoryRecord.Fields;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.sort.cdc.mysql.source.split.MySqlBinlogSplitState;
+import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSplitState;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+/**
+ * Utility class to deal gh-ost record.
+ */
+public class GhostUtils {
+
+    /**
+     * after setting `gh-ost.ddl.change=true`, the alter ddl statements 
generated by gh-ost
+     * will be captured and stored in the state.
+     */
+    public static void collectGhostDdl(SourceRecord element, MySqlSplitState 
splitState, HistoryRecord historyRecord,
+            String ghostTableRegex) {
+        String ddl = historyRecord.document().getString(Fields.DDL_STATEMENTS);
+        if (StringUtils.isBlank(ddl)) {
+            return;
+        }
+        String tableName = 
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableName(element);
+        Pattern compile = Pattern.compile(ghostTableRegex);
+        Matcher matcher = compile.matcher(tableName);
+        if (matcher.find()) {
+            tableName = matcher.group(1);
+            TableId tableId = getTableId(element, tableName);
+            MySqlBinlogSplitState mySqlBinlogSplitState = 
splitState.asBinlogSplitState();
+            if (ddl.toUpperCase().startsWith(DDL_OP_ALTER)
+                    && 
mySqlBinlogSplitState.getTableSchemas().containsKey(tableId)) {
+                String matchTableInSqlRegex = ghostTableRegex;
+                if (matchTableInSqlRegex.startsWith(CARET) && 
matchTableInSqlRegex.endsWith(DOLLAR)) {
+                    matchTableInSqlRegex = matchTableInSqlRegex.substring(1, 
matchTableInSqlRegex.length() - 1);
+                }
+                mySqlBinlogSplitState.recordTableDdl(
+                        tableId,
+                        ddl.replace(GHOST_TAG, "")
+                                .replaceAll("\\s+", " ")
+                                .replaceAll(matchTableInSqlRegex, tableName));
+            }
+        }
+    }
+
+    /**
+     * if gh-ost output the 'rename table`_a_gho` to `a`' ddl statement (where 
`a` is the captured table
+     * and `_a_gho` is the gh-ost table), its tableChanges won't be empty, and 
the tableName will contain
+     * both the captured table and the gh-ost generated table.
+     * We should retrieve the alter statements generated by gh-ost from the 
state and update the `source.table`
+     * and `historyRecord` values of the element.
+     */
+    public static void updateGhostDdlElement(SourceRecord element, 
MySqlSplitState splitState,
+            HistoryRecord historyRecord,
+            String ghostTableRegex) {
+        String tableNames = 
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableName(element);
+        for (String tableName : tableNames.split(",")) {
+            Pattern compile = Pattern.compile(ghostTableRegex);
+            Matcher matcher = compile.matcher(tableName);
+            if (matcher.find()) {
+                tableName = matcher.group(1);
+                TableId tableId = getTableId(element, tableName);
+                String ddl = 
splitState.asBinlogSplitState().getTableDdls().get(tableId);
+                Struct value = (Struct) element.value();
+                // update source.table and historyRecord
+                value.getStruct(Fields.SOURCE).put(TABLE_NAME, tableName);
+                value.put(HISTORY_RECORD_FIELD, 
historyRecord.document().set(Fields.DDL_STATEMENTS, ddl).toString());
+            }
+        }
+    }
+
+    /**
+     * because the table name format generated by gh-ost is 
`_tableName_[gho|ghc|del]` (default),
+     * we need to obtain its source table name `tableName` and generate the 
tableId of the source table.
+     */
+    private static TableId getTableId(SourceRecord element, String tableName) {
+        String dbName = RecordUtils.getDbName(element);
+        return RecordUtils.getTableId(
+                dbName,
+                tableName);
+    }
+
+}
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
index c05b58764..022cc7a44 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
@@ -49,8 +49,10 @@ import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkState;
 import static 
org.apache.inlong.sort.cdc.mysql.debezium.dispatcher.EventDispatcherImpl.HISTORY_RECORD_FIELD;
+import static 
org.apache.inlong.sort.cdc.mysql.debezium.dispatcher.SignalEventDispatcher.DATABASE_NAME;
 import static 
org.apache.inlong.sort.cdc.mysql.debezium.dispatcher.SignalEventDispatcher.SIGNAL_EVENT_VALUE_SCHEMA_NAME;
 import static 
org.apache.inlong.sort.cdc.mysql.debezium.dispatcher.SignalEventDispatcher.SPLIT_ID_KEY;
+import static 
org.apache.inlong.sort.cdc.mysql.debezium.dispatcher.SignalEventDispatcher.TABLE_NAME;
 import static 
org.apache.inlong.sort.cdc.mysql.debezium.dispatcher.SignalEventDispatcher.WATERMARK_KIND;
 
 /**
@@ -467,4 +469,20 @@ public class RecordUtils {
         }
         return Optional.empty();
     }
+
+    public static String getTableName(SourceRecord element) {
+        Struct value = (Struct) element.value();
+        Struct source = value.getStruct(FieldName.SOURCE);
+        return source.getString(TABLE_NAME);
+    }
+
+    public static String getDbName(SourceRecord element) {
+        Struct value = (Struct) element.value();
+        Struct source = value.getStruct(FieldName.SOURCE);
+        return source.getString(DATABASE_NAME);
+    }
+
+    public static TableId getTableId(String dbName, String tableName) {
+        return new TableId(dbName, null, tableName);
+    }
 }
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
index cd65fcb05..95034b8d8 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
@@ -37,6 +37,8 @@ import java.util.regex.Pattern;
 
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
+import static org.apache.inlong.sort.base.Constants.GH_OST_DDL_CHANGE;
+import static org.apache.inlong.sort.base.Constants.GH_OST_TABLE_REGEX;
 import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
 import static 
org.apache.inlong.sort.cdc.base.debezium.table.DebeziumOptions.getDebeziumProperties;
@@ -157,6 +159,8 @@ public class MySqlTableInlongSourceFactory implements 
DynamicTableSourceFactory
                 : config.get(ROW_KINDS_FILTERED);
         boolean enableParallelRead = 
config.get(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
         final boolean includeSchemaChange = config.get(INCLUDE_SCHEMA_CHANGE);
+        final boolean ghostDdlChange = config.get(GH_OST_DDL_CHANGE);
+        final String ghostTableRegex = config.get(GH_OST_TABLE_REGEX);
         if (enableParallelRead) {
             validateStartupOptionIfEnableParallel(startupOptions);
             validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 
splitSize, 1);
@@ -198,7 +202,9 @@ public class MySqlTableInlongSourceFactory implements 
DynamicTableSourceFactory
                 inlongAudit,
                 rowKindFiltered,
                 includeSchemaChange,
-                includeIncremental);
+                includeIncremental,
+                ghostDdlChange,
+                ghostTableRegex);
     }
 
     @Override
@@ -246,6 +252,8 @@ public class MySqlTableInlongSourceFactory implements 
DynamicTableSourceFactory
         options.add(AUDIT_KEYS);
         options.add(INCLUDE_INCREMENTAL);
         options.add(INCLUDE_SCHEMA_CHANGE);
+        options.add(GH_OST_DDL_CHANGE);
+        options.add(GH_OST_TABLE_REGEX);
         return options;
     }
 
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
index bd1c06e85..7ade95e11 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
@@ -86,6 +86,8 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
     private final String inlongAudit;
     private final boolean includeIncremental;
     private final boolean includeSchemaChange;
+    private final boolean ghostDdlChange;
+    private final String ghostTableRegex;
     // 
--------------------------------------------------------------------------------------------
     // Mutable attributes
     // 
--------------------------------------------------------------------------------------------
@@ -133,7 +135,9 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
             String inlongAudit,
             String rowKindsFiltered,
             boolean includeSchemaChange,
-            boolean includeIncremental) {
+            boolean includeIncremental,
+            boolean ghostDdlChange,
+            String ghostTableRegex) {
         this.physicalSchema = physicalSchema;
         this.port = port;
         this.hostname = checkNotNull(hostname);
@@ -167,6 +171,8 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
         this.rowKindsFiltered = rowKindsFiltered;
         this.includeIncremental = includeIncremental;
         this.includeSchemaChange = includeSchemaChange;
+        this.ghostDdlChange = ghostDdlChange;
+        this.ghostTableRegex = ghostTableRegex;
     }
 
     @Override
@@ -230,6 +236,8 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
                             .inlongMetric(inlongMetric)
                             .inlongAudit(inlongAudit)
                             .includeIncremental(includeIncremental)
+                            .ghostDdlChange(ghostDdlChange)
+                            .ghostTableRegex(ghostTableRegex)
                             .build();
             return SourceProvider.of(parallelSource);
         } else {
@@ -321,7 +329,9 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
                         inlongAudit,
                         rowKindsFiltered,
                         includeSchemaChange,
-                        includeIncremental);
+                        includeIncremental,
+                        ghostDdlChange,
+                        ghostTableRegex);
         source.metadataKeys = metadataKeys;
         source.producedDataType = producedDataType;
         return source;
@@ -361,7 +371,12 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
                 && Objects.equals(metadataKeys, that.metadataKeys)
                 && Objects.equals(jdbcProperties, that.jdbcProperties)
                 && Objects.equals(inlongMetric, that.inlongMetric)
-                && Objects.equals(inlongAudit, that.inlongAudit);
+                && Objects.equals(inlongAudit, that.inlongAudit)
+                && Objects.equals(rowKindsFiltered, that.rowKindsFiltered)
+                && Objects.equals(includeSchemaChange, 
that.includeSchemaChange)
+                && Objects.equals(includeIncremental, that.includeIncremental)
+                && Objects.equals(ghostDdlChange, that.ghostDdlChange)
+                && Objects.equals(ghostDdlChange, that.ghostDdlChange);
     }
 
     @Override
@@ -392,7 +407,12 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
                 scanNewlyAddedTableEnabled,
                 jdbcProperties,
                 inlongMetric,
-                inlongAudit);
+                inlongAudit,
+                rowKindsFiltered,
+                includeSchemaChange,
+                includeIncremental,
+                ghostDdlChange,
+                ghostTableRegex);
     }
 
     @Override
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
index 43598bf93..fddceb58e 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
@@ -56,6 +56,7 @@ public class AllMigrateTest {
         option.put("migrate-all", "true");
         option.put("include-incremental", "true");
         option.put("include-schema-change", "true");
+        option.put("gh-ost.ddl.change", "true");
         List<String> tables = new ArrayList(10);
         tables.add("test.*");
         List<FieldInfo> fields = Collections.singletonList(


Reply via email to