This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new cd02218e2 [INLONG-7554][Sort] MySQL CDC supports parsing gh-ost
records (#7750)
cd02218e2 is described below
commit cd02218e27c6c5f656e532328bde4e7a5b32552f
Author: emhui <[email protected]>
AuthorDate: Sun Apr 23 11:24:47 2023 +0800
[INLONG-7554][Sort] MySQL CDC supports parsing gh-ost records (#7750)
---
.../org/apache/inlong/sort/base/Constants.java | 30 ++++++
.../sort/cdc/mysql/source/MySqlSourceBuilder.java | 10 ++
.../cdc/mysql/source/config/MySqlSourceConfig.java | 16 ++-
.../source/config/MySqlSourceConfigFactory.java | 16 ++-
.../mysql/source/reader/MySqlRecordEmitter.java | 32 ++++--
.../cdc/mysql/source/split/MySqlBinlogSplit.java | 22 +++-
.../mysql/source/split/MySqlBinlogSplitState.java | 13 ++-
.../mysql/source/split/MySqlSplitSerializer.java | 41 +++++++-
.../sort/cdc/mysql/source/utils/GhostUtils.java | 112 +++++++++++++++++++++
.../sort/cdc/mysql/source/utils/RecordUtils.java | 18 ++++
.../mysql/table/MySqlTableInlongSourceFactory.java | 10 +-
.../sort/cdc/mysql/table/MySqlTableSource.java | 28 +++++-
.../apache/inlong/sort/parser/AllMigrateTest.java | 1 +
13 files changed, 325 insertions(+), 24 deletions(-)
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index cd7acc921..757be7260 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -114,6 +114,18 @@ public final class Constants {
* It is used for metric data to build schema identify
*/
public static final String SEMICOLON = ".";
+
+ /**
+ * The caret symbol (^) at the start of a regular expression to indicate
+ * that a match must occur at the beginning of the searched text.
+ */
+ public static final String CARET = "^";
+
+ /**
+ * The dollar symbol ($) at the end of a regular expression to indicate
+ * that a match must occur at the ending of the searched text.
+ */
+ public static final String DOLLAR = "$";
/**
* It is used for metric data to spilt schema identify
*/
@@ -137,6 +149,12 @@ public final class Constants {
public static final String DDL_FIELD_NAME = "ddl";
+ public static final String DDL_OP_ALTER = "ALTER";
+
+ public static final String DDL_OP_DROP = "DROP";
+
+ public static final String GHOST_TAG = "/* gh-ost */";
+
public static final ConfigOption<String> INLONG_METRIC =
ConfigOptions.key("inlong.metric.labels")
.stringType()
@@ -322,4 +340,16 @@ public final class Constants {
.defaultValue(10240L)
.withDescription(
"The flush max bytes, over this number in batch, will
flush data. The default value is 10KB.");
+ public static final ConfigOption<Boolean> GH_OST_DDL_CHANGE = ConfigOptions
+ .key("gh-ost.ddl.change")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether parse ddl changes of gh-ost, default value is
'false'.");
+ public static final ConfigOption<String> GH_OST_TABLE_REGEX = ConfigOptions
+ .key("gh-ost.table.regex")
+ .stringType()
+ .defaultValue("^_(.*)_(gho|ghc|del)$")
+ .withDescription(
+ "Matcher the original table name from the ddl of gh-ost.");
}
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSourceBuilder.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSourceBuilder.java
index 8cfaa0bae..726f5cae6 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSourceBuilder.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSourceBuilder.java
@@ -275,4 +275,14 @@ public class MySqlSourceBuilder<T> {
this.configFactory.includeIncremental(includeIncremental);
return this;
}
+
+ public MySqlSourceBuilder<T> ghostDdlChange(boolean ghostDdlChange) {
+ this.configFactory.ghostDdlChange(ghostDdlChange);
+ return this;
+ }
+
+ public MySqlSourceBuilder<T> ghostTableRegex(String ghostTableRegex) {
+ this.configFactory.ghostTableRegex(ghostTableRegex);
+ return this;
+ }
}
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfig.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfig.java
index 926bbebce..71a7dec3e 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfig.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfig.java
@@ -70,6 +70,8 @@ public class MySqlSourceConfig implements Serializable {
private final String inlongMetric;
private final String inlongAudit;
private final boolean includeIncremental;
+ private final boolean ghostDdlChange;
+ private final String ghostTableRegex;
MySqlSourceConfig(
String hostname,
@@ -95,7 +97,9 @@ public class MySqlSourceConfig implements Serializable {
Properties jdbcProperties,
String inlongMetric,
String inlongAudit,
- boolean includeIncremental) {
+ boolean includeIncremental,
+ boolean ghostDdlChange,
+ String ghostTableRegex) {
this.hostname = checkNotNull(hostname);
this.port = port;
this.username = checkNotNull(username);
@@ -122,6 +126,8 @@ public class MySqlSourceConfig implements Serializable {
this.inlongMetric = inlongMetric;
this.inlongAudit = inlongAudit;
this.includeIncremental = includeIncremental;
+ this.ghostDdlChange = ghostDdlChange;
+ this.ghostTableRegex = ghostTableRegex;
}
public String getHostname() {
@@ -232,4 +238,12 @@ public class MySqlSourceConfig implements Serializable {
public boolean isIncludeIncremental() {
return includeIncremental;
}
+
+ public boolean isGhostDdlChange() {
+ return ghostDdlChange;
+ }
+
+ public String getGhostTableRegex() {
+ return ghostTableRegex;
+ }
}
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfigFactory.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfigFactory.java
index 82b90d561..6b3d82f20 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfigFactory.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfigFactory.java
@@ -76,6 +76,8 @@ public class MySqlSourceConfigFactory implements Serializable
{
private String inlongMetric;
private String inlongAudit;
private boolean includeIncremental;
+ private boolean ghostDdlChange;
+ private String ghostTableRegex;
public MySqlSourceConfigFactory inlongMetric(String inlongMetric) {
this.inlongMetric = inlongMetric;
@@ -92,6 +94,16 @@ public class MySqlSourceConfigFactory implements
Serializable {
return this;
}
+ public MySqlSourceConfigFactory ghostDdlChange(boolean ghostDdlChange) {
+ this.ghostDdlChange = ghostDdlChange;
+ return this;
+ }
+
+ public MySqlSourceConfigFactory ghostTableRegex(String ghostTableRegex) {
+ this.ghostTableRegex = ghostTableRegex;
+ return this;
+ }
+
public MySqlSourceConfigFactory hostname(String hostname) {
this.hostname = hostname;
return this;
@@ -376,6 +388,8 @@ public class MySqlSourceConfigFactory implements
Serializable {
jdbcProperties,
inlongMetric,
inlongAudit,
- includeIncremental);
+ includeIncremental,
+ ghostDdlChange,
+ ghostTableRegex);
}
}
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
index 695c4363d..2d1c8ded8 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
@@ -25,6 +25,7 @@ import io.debezium.relational.ColumnFilterMode;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.relational.history.HistoryRecord;
+import io.debezium.relational.history.HistoryRecord.Fields;
import io.debezium.relational.history.TableChanges;
import io.debezium.relational.history.TableChanges.TableChange;
import java.util.Map.Entry;
@@ -33,9 +34,9 @@ import net.sf.jsqlparser.parser.CCJSqlParserUtil;
import net.sf.jsqlparser.schema.Table;
import net.sf.jsqlparser.statement.Statement;
import net.sf.jsqlparser.statement.alter.RenameTableStatement;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
-import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.util.Collector;
import org.apache.inlong.sort.base.enums.ReadPhase;
import org.apache.inlong.sort.cdc.base.debezium.DebeziumDeserializationSchema;
@@ -53,8 +54,8 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
-import static org.apache.inlong.sort.base.Constants.DDL_FIELD_NAME;
-import static
org.apache.inlong.sort.cdc.base.relational.JdbcSourceEventDispatcher.HISTORY_RECORD_FIELD;
+import static
org.apache.inlong.sort.cdc.mysql.source.utils.GhostUtils.collectGhostDdl;
+import static
org.apache.inlong.sort.cdc.mysql.source.utils.GhostUtils.updateGhostDdlElement;
import static
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getBinlogPosition;
import static
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getFetchTimestamp;
import static
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getHistoryRecord;
@@ -97,7 +98,8 @@ public final class MySqlRecordEmitter<T>
private volatile long snapProcessTime = 0L;
private boolean includeIncremental;
- public final ObjectMapper objectMapper = new ObjectMapper();
+ private boolean ghostDdlChange;
+ private String ghostTableRegex;
public MySqlRecordEmitter(
DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
@@ -110,6 +112,8 @@ public final class MySqlRecordEmitter<T>
this.includeIncremental = sourceConfig.isIncludeIncremental();
this.columnNameFilter = ColumnFilterUtil.createColumnFilter(
sourceConfig.getDbzConfiguration(), ColumnFilterMode.CATALOG);
+ this.ghostDdlChange = sourceConfig.isGhostDdlChange();
+ this.ghostTableRegex = sourceConfig.getGhostTableRegex();
}
@Override
@@ -131,6 +135,9 @@ public final class MySqlRecordEmitter<T>
splitState.asBinlogSplitState().recordSchema(tableChange.getId(), tableChange);
if (includeSchemaChanges) {
TableChange newTableChange =
ColumnFilterUtil.createTableChange(tableChange, columnNameFilter);
+ if (ghostDdlChange) {
+ updateGhostDdlElement(element, splitState,
historyRecord, ghostTableRegex);
+ }
outputDdlElement(element, output, splitState,
newTableChange);
}
}
@@ -139,9 +146,12 @@ public final class MySqlRecordEmitter<T>
TableId tableId = RecordUtils.getTableId(element);
// if this table is one of the captured tables, output the ddl
element.
if
(splitState.getMySQLSplit().getTableSchemas().containsKey(tableId)
- || shouldOutputRenameDdl(element, tableId)) {
+ || shouldOutputRenameDdl(historyRecord, tableId)) {
outputDdlElement(element, output, splitState, null);
}
+ if (ghostDdlChange) {
+ collectGhostDdl(element, splitState, historyRecord,
ghostTableRegex);
+ }
}
} else if (isDataChangeRecord(element)) {
@@ -200,12 +210,14 @@ public final class MySqlRecordEmitter<T>
/**
* if rename operation is "rename a to b" where a is the captured table
* this method extract table names a and b, if any of table name is the
captured table
- * we should output ddl element
+ * we should output ddl element.
*/
- private boolean shouldOutputRenameDdl(SourceRecord element, TableId
tableId) {
+ private boolean shouldOutputRenameDdl(HistoryRecord historyRecord, TableId
tableId) {
+ String ddl = historyRecord.document().getString(Fields.DDL_STATEMENTS);
+ if (StringUtils.isBlank(ddl)) {
+ return false;
+ }
try {
- String ddl = objectMapper.readTree(((Struct)
element.value()).get(HISTORY_RECORD_FIELD).toString())
- .get(DDL_FIELD_NAME).asText();
Statement statement = CCJSqlParserUtil.parse(ddl);
if (statement instanceof RenameTableStatement) {
RenameTableStatement renameTableStatement =
(RenameTableStatement) statement;
@@ -220,7 +232,7 @@ public final class MySqlRecordEmitter<T>
}
}
} catch (Exception e) {
- LOG.error("parse ddl error {}", element, e);
+ LOG.error("parse ddl error {}", historyRecord, e);
}
return false;
}
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlBinlogSplit.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlBinlogSplit.java
index 566343a76..6fb26a875 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlBinlogSplit.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlBinlogSplit.java
@@ -41,6 +41,7 @@ public class MySqlBinlogSplit extends MySqlSplit {
private final boolean isSuspended;
@Nullable
transient byte[] serializedFormCache;
+ private final Map<TableId, String> tableDdls;
public MySqlBinlogSplit(
String splitId,
@@ -49,7 +50,8 @@ public class MySqlBinlogSplit extends MySqlSplit {
List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos,
Map<TableId, TableChange> tableSchemas,
int totalFinishedSplitSize,
- boolean isSuspended) {
+ boolean isSuspended,
+ Map<TableId, String> tableDdls) {
super(splitId);
this.startingOffset = startingOffset;
this.endingOffset = endingOffset;
@@ -57,6 +59,7 @@ public class MySqlBinlogSplit extends MySqlSplit {
this.tableSchemas = tableSchemas;
this.totalFinishedSplitSize = totalFinishedSplitSize;
this.isSuspended = isSuspended;
+ this.tableDdls = tableDdls;
}
public MySqlBinlogSplit(
@@ -73,6 +76,7 @@ public class MySqlBinlogSplit extends MySqlSplit {
this.tableSchemas = tableSchemas;
this.totalFinishedSplitSize = totalFinishedSplitSize;
this.isSuspended = false;
+ this.tableDdls = new HashMap<>();
}
// -------------------------------------------------------------------
@@ -88,7 +92,8 @@ public class MySqlBinlogSplit extends MySqlSplit {
splitInfos,
binlogSplit.getTableSchemas(),
binlogSplit.getTotalFinishedSplitSize(),
- binlogSplit.isSuspended());
+ binlogSplit.isSuspended(),
+ binlogSplit.getTableDdls());
}
public static MySqlBinlogSplit fillTableSchemas(
@@ -101,7 +106,8 @@ public class MySqlBinlogSplit extends MySqlSplit {
binlogSplit.getFinishedSnapshotSplitInfos(),
tableSchemas,
binlogSplit.getTotalFinishedSplitSize(),
- binlogSplit.isSuspended());
+ binlogSplit.isSuspended(),
+ binlogSplit.getTableDdls());
}
public static MySqlBinlogSplit toNormalBinlogSplit(
@@ -113,7 +119,8 @@ public class MySqlBinlogSplit extends MySqlSplit {
suspendedBinlogSplit.getFinishedSnapshotSplitInfos(),
suspendedBinlogSplit.getTableSchemas(),
totalFinishedSplitSize,
- false);
+ false,
+ suspendedBinlogSplit.getTableDdls());
}
public static MySqlBinlogSplit toSuspendedBinlogSplit(MySqlBinlogSplit
normalBinlogSplit) {
@@ -124,7 +131,8 @@ public class MySqlBinlogSplit extends MySqlSplit {
new ArrayList<>(),
new HashMap<>(),
normalBinlogSplit.getTotalFinishedSplitSize(),
- true);
+ true,
+ normalBinlogSplit.getTableDdls());
}
public BinlogOffset getStartingOffset() {
@@ -156,6 +164,10 @@ public class MySqlBinlogSplit extends MySqlSplit {
return totalFinishedSplitSize == finishedSnapshotSplitInfos.size();
}
+ public Map<TableId, String> getTableDdls() {
+ return tableDdls;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlBinlogSplitState.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlBinlogSplitState.java
index 92b82d198..cfea39a59 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlBinlogSplitState.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlBinlogSplitState.java
@@ -34,12 +34,14 @@ public class MySqlBinlogSplitState extends MySqlSplitState {
private BinlogOffset startingOffset;
@Nullable
private BinlogOffset endingOffset;
+ private Map<TableId, String> tableDdls;
public MySqlBinlogSplitState(MySqlBinlogSplit split) {
super(split);
this.startingOffset = split.getStartingOffset();
this.endingOffset = split.getEndingOffset();
this.tableSchemas = split.getTableSchemas();
+ this.tableDdls = split.getTableDdls();
}
@Nullable
@@ -77,7 +79,8 @@ public class MySqlBinlogSplitState extends MySqlSplitState {
binlogSplit.asBinlogSplit().getFinishedSnapshotSplitInfos(),
getTableSchemas(),
binlogSplit.getTotalFinishedSplitSize(),
- binlogSplit.isSuspended());
+ binlogSplit.isSuspended(),
+ binlogSplit.getTableDdls());
}
@Override
@@ -91,4 +94,12 @@ public class MySqlBinlogSplitState extends MySqlSplitState {
+ split
+ '}';
}
+
+ public Map<TableId, String> getTableDdls() {
+ return tableDdls;
+ }
+
+ public void recordTableDdl(TableId tableId, String ddl) {
+ this.tableDdls.put(tableId, ddl);
+ }
}
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java
index 93c3b4353..ae8852324 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSplitSerializer.java
@@ -104,6 +104,41 @@ public final class MySqlSplitSerializer implements
SimpleVersionedSerializer<MyS
return tableSchemas;
}
+ public static void writeTableDdls(
+ Map<TableId, String> tableDdls, DataOutputSerializer out) throws
IOException {
+ final int size = tableDdls.size();
+ out.writeInt(size);
+ for (Map.Entry<TableId, String> entry : tableDdls.entrySet()) {
+ out.writeUTF(entry.getKey().toString());
+ out.writeUTF(entry.getValue());
+ }
+ }
+
+ public static Map<TableId, String> readTableDdls(int version,
DataInputDeserializer in)
+ throws IOException {
+ Map<TableId, String> tableDdls = new HashMap<>();
+ if (in.available() <= 0) {
+ return tableDdls;
+ }
+ final int size = in.readInt();
+ for (int i = 0; i < size; i++) {
+ TableId tableId = TableId.parse(in.readUTF());
+ final String ddl;
+ switch (version) {
+ case 1:
+ case 2:
+ case 3:
+ case 4:
+ ddl = in.readUTF();
+ break;
+ default:
+ throw new IOException("Unknown version: " + version);
+ }
+ tableDdls.put(tableId, ddl);
+ }
+ return tableDdls;
+ }
+
private static void writeFinishedSplitsInfo(
List<FinishedSnapshotSplitInfo> finishedSplitsInfo,
DataOutputSerializer out)
throws IOException {
@@ -228,6 +263,7 @@ public final class MySqlSplitSerializer implements
SimpleVersionedSerializer<MyS
writeTableSchemas(binlogSplit.getTableSchemas(), out);
out.writeInt(binlogSplit.getTotalFinishedSplitSize());
out.writeBoolean(binlogSplit.isSuspended());
+ writeTableDdls(binlogSplit.getTableDdls(), out);
final byte[] result = out.getCopyOfBuffer();
out.clear();
// optimization: cache the serialized from, so we avoid the byte
work during repeated
@@ -296,10 +332,12 @@ public final class MySqlSplitSerializer implements
SimpleVersionedSerializer<MyS
Map<TableId, TableChange> tableChangeMap =
readTableSchemas(version, in);
int totalFinishedSplitSize = finishedSplitsInfo.size();
boolean isSuspended = false;
+ Map<TableId, String> tableDdls = null;
if (version >= 3) {
totalFinishedSplitSize = in.readInt();
if (version > 3) {
isSuspended = in.readBoolean();
+ tableDdls = readTableDdls(version, in);
}
}
in.releaseArrays();
@@ -310,7 +348,8 @@ public final class MySqlSplitSerializer implements
SimpleVersionedSerializer<MyS
finishedSplitsInfo,
tableChangeMap,
totalFinishedSplitSize,
- isSuspended);
+ isSuspended,
+ tableDdls);
} else if (splitKind == METRIC_SPLIT_FLAG) {
long numBytesIn = 0L;
long numRecordsIn = 0L;
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/GhostUtils.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/GhostUtils.java
new file mode 100644
index 000000000..77f769c4d
--- /dev/null
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/GhostUtils.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.mysql.source.utils;
+
+import static org.apache.inlong.sort.base.Constants.CARET;
+import static org.apache.inlong.sort.base.Constants.DDL_OP_ALTER;
+import static org.apache.inlong.sort.base.Constants.DOLLAR;
+import static org.apache.inlong.sort.base.Constants.GHOST_TAG;
+import static org.apache.inlong.sort.base.Constants.TABLE_NAME;
+import static
org.apache.inlong.sort.cdc.base.relational.JdbcSourceEventDispatcher.HISTORY_RECORD_FIELD;
+
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.HistoryRecord;
+import io.debezium.relational.history.HistoryRecord.Fields;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.sort.cdc.mysql.source.split.MySqlBinlogSplitState;
+import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSplitState;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+/**
+ * Utility class to deal gh-ost record.
+ */
+public class GhostUtils {
+
+ /**
+ * after setting `gh-ost.ddl.change=true`, the alter ddl statements
generated by gh-ost
+ * will be captured and stored in the state.
+ */
+ public static void collectGhostDdl(SourceRecord element, MySqlSplitState
splitState, HistoryRecord historyRecord,
+ String ghostTableRegex) {
+ String ddl = historyRecord.document().getString(Fields.DDL_STATEMENTS);
+ if (StringUtils.isBlank(ddl)) {
+ return;
+ }
+ String tableName =
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableName(element);
+ Pattern compile = Pattern.compile(ghostTableRegex);
+ Matcher matcher = compile.matcher(tableName);
+ if (matcher.find()) {
+ tableName = matcher.group(1);
+ TableId tableId = getTableId(element, tableName);
+ MySqlBinlogSplitState mySqlBinlogSplitState =
splitState.asBinlogSplitState();
+ if (ddl.toUpperCase().startsWith(DDL_OP_ALTER)
+ &&
mySqlBinlogSplitState.getTableSchemas().containsKey(tableId)) {
+ String matchTableInSqlRegex = ghostTableRegex;
+ if (matchTableInSqlRegex.startsWith(CARET) &&
matchTableInSqlRegex.endsWith(DOLLAR)) {
+ matchTableInSqlRegex = matchTableInSqlRegex.substring(1,
matchTableInSqlRegex.length() - 1);
+ }
+ mySqlBinlogSplitState.recordTableDdl(
+ tableId,
+ ddl.replace(GHOST_TAG, "")
+ .replaceAll("\\s+", " ")
+ .replaceAll(matchTableInSqlRegex, tableName));
+ }
+ }
+ }
+
+ /**
+ * if gh-ost output the 'rename table`_a_gho` to `a`' ddl statement (where
`a` is the captured table
+ * and `_a_gho` is the gh-ost table), its tableChanges won't be empty, and
the tableName will contain
+ * both the captured table and the gh-ost generated table.
+ * We should retrieve the alter statements generated by gh-ost from the
state and update the `source.table`
+ * and `historyRecord` values of the element.
+ */
+ public static void updateGhostDdlElement(SourceRecord element,
MySqlSplitState splitState,
+ HistoryRecord historyRecord,
+ String ghostTableRegex) {
+ String tableNames =
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableName(element);
+ for (String tableName : tableNames.split(",")) {
+ Pattern compile = Pattern.compile(ghostTableRegex);
+ Matcher matcher = compile.matcher(tableName);
+ if (matcher.find()) {
+ tableName = matcher.group(1);
+ TableId tableId = getTableId(element, tableName);
+ String ddl =
splitState.asBinlogSplitState().getTableDdls().get(tableId);
+ Struct value = (Struct) element.value();
+ // update source.table and historyRecord
+ value.getStruct(Fields.SOURCE).put(TABLE_NAME, tableName);
+ value.put(HISTORY_RECORD_FIELD,
historyRecord.document().set(Fields.DDL_STATEMENTS, ddl).toString());
+ }
+ }
+ }
+
+ /**
+ * because the table name format generated by gh-ost is
`_tableName_[gho|ghc|del]` (default),
+ * we need to obtain its source table name `tableName` and generate the
tableId of the source table.
+ */
+ private static TableId getTableId(SourceRecord element, String tableName) {
+ String dbName = RecordUtils.getDbName(element);
+ return RecordUtils.getTableId(
+ dbName,
+ tableName);
+ }
+
+}
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
index c05b58764..022cc7a44 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
@@ -49,8 +49,10 @@ import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkState;
import static
org.apache.inlong.sort.cdc.mysql.debezium.dispatcher.EventDispatcherImpl.HISTORY_RECORD_FIELD;
+import static
org.apache.inlong.sort.cdc.mysql.debezium.dispatcher.SignalEventDispatcher.DATABASE_NAME;
import static
org.apache.inlong.sort.cdc.mysql.debezium.dispatcher.SignalEventDispatcher.SIGNAL_EVENT_VALUE_SCHEMA_NAME;
import static
org.apache.inlong.sort.cdc.mysql.debezium.dispatcher.SignalEventDispatcher.SPLIT_ID_KEY;
+import static
org.apache.inlong.sort.cdc.mysql.debezium.dispatcher.SignalEventDispatcher.TABLE_NAME;
import static
org.apache.inlong.sort.cdc.mysql.debezium.dispatcher.SignalEventDispatcher.WATERMARK_KIND;
/**
@@ -467,4 +469,20 @@ public class RecordUtils {
}
return Optional.empty();
}
+
+ public static String getTableName(SourceRecord element) {
+ Struct value = (Struct) element.value();
+ Struct source = value.getStruct(FieldName.SOURCE);
+ return source.getString(TABLE_NAME);
+ }
+
+ public static String getDbName(SourceRecord element) {
+ Struct value = (Struct) element.value();
+ Struct source = value.getStruct(FieldName.SOURCE);
+ return source.getString(DATABASE_NAME);
+ }
+
+ public static TableId getTableId(String dbName, String tableName) {
+ return new TableId(dbName, null, tableName);
+ }
}
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
index cd65fcb05..95034b8d8 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
@@ -37,6 +37,8 @@ import java.util.regex.Pattern;
import static org.apache.flink.util.Preconditions.checkState;
import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
+import static org.apache.inlong.sort.base.Constants.GH_OST_DDL_CHANGE;
+import static org.apache.inlong.sort.base.Constants.GH_OST_TABLE_REGEX;
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
import static
org.apache.inlong.sort.cdc.base.debezium.table.DebeziumOptions.getDebeziumProperties;
@@ -157,6 +159,8 @@ public class MySqlTableInlongSourceFactory implements
DynamicTableSourceFactory
: config.get(ROW_KINDS_FILTERED);
boolean enableParallelRead =
config.get(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
final boolean includeSchemaChange = config.get(INCLUDE_SCHEMA_CHANGE);
+ final boolean ghostDdlChange = config.get(GH_OST_DDL_CHANGE);
+ final String ghostTableRegex = config.get(GH_OST_TABLE_REGEX);
if (enableParallelRead) {
validateStartupOptionIfEnableParallel(startupOptions);
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE,
splitSize, 1);
@@ -198,7 +202,9 @@ public class MySqlTableInlongSourceFactory implements
DynamicTableSourceFactory
inlongAudit,
rowKindFiltered,
includeSchemaChange,
- includeIncremental);
+ includeIncremental,
+ ghostDdlChange,
+ ghostTableRegex);
}
@Override
@@ -246,6 +252,8 @@ public class MySqlTableInlongSourceFactory implements
DynamicTableSourceFactory
options.add(AUDIT_KEYS);
options.add(INCLUDE_INCREMENTAL);
options.add(INCLUDE_SCHEMA_CHANGE);
+ options.add(GH_OST_DDL_CHANGE);
+ options.add(GH_OST_TABLE_REGEX);
return options;
}
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
index bd1c06e85..7ade95e11 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
@@ -86,6 +86,8 @@ public class MySqlTableSource implements ScanTableSource,
SupportsReadingMetadat
private final String inlongAudit;
private final boolean includeIncremental;
private final boolean includeSchemaChange;
+ private final boolean ghostDdlChange;
+ private final String ghostTableRegex;
//
--------------------------------------------------------------------------------------------
// Mutable attributes
//
--------------------------------------------------------------------------------------------
@@ -133,7 +135,9 @@ public class MySqlTableSource implements ScanTableSource,
SupportsReadingMetadat
String inlongAudit,
String rowKindsFiltered,
boolean includeSchemaChange,
- boolean includeIncremental) {
+ boolean includeIncremental,
+ boolean ghostDdlChange,
+ String ghostTableRegex) {
this.physicalSchema = physicalSchema;
this.port = port;
this.hostname = checkNotNull(hostname);
@@ -167,6 +171,8 @@ public class MySqlTableSource implements ScanTableSource,
SupportsReadingMetadat
this.rowKindsFiltered = rowKindsFiltered;
this.includeIncremental = includeIncremental;
this.includeSchemaChange = includeSchemaChange;
+ this.ghostDdlChange = ghostDdlChange;
+ this.ghostTableRegex = ghostTableRegex;
}
@Override
@@ -230,6 +236,8 @@ public class MySqlTableSource implements ScanTableSource,
SupportsReadingMetadat
.inlongMetric(inlongMetric)
.inlongAudit(inlongAudit)
.includeIncremental(includeIncremental)
+ .ghostDdlChange(ghostDdlChange)
+ .ghostTableRegex(ghostTableRegex)
.build();
return SourceProvider.of(parallelSource);
} else {
@@ -321,7 +329,9 @@ public class MySqlTableSource implements ScanTableSource,
SupportsReadingMetadat
inlongAudit,
rowKindsFiltered,
includeSchemaChange,
- includeIncremental);
+ includeIncremental,
+ ghostDdlChange,
+ ghostTableRegex);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
@@ -361,7 +371,12 @@ public class MySqlTableSource implements ScanTableSource,
SupportsReadingMetadat
&& Objects.equals(metadataKeys, that.metadataKeys)
&& Objects.equals(jdbcProperties, that.jdbcProperties)
&& Objects.equals(inlongMetric, that.inlongMetric)
- && Objects.equals(inlongAudit, that.inlongAudit);
+ && Objects.equals(inlongAudit, that.inlongAudit)
+ && Objects.equals(rowKindsFiltered, that.rowKindsFiltered)
+ && Objects.equals(includeSchemaChange,
that.includeSchemaChange)
+ && Objects.equals(includeIncremental, that.includeIncremental)
+ && Objects.equals(ghostDdlChange, that.ghostDdlChange)
+ && Objects.equals(ghostDdlChange, that.ghostDdlChange);
}
@Override
@@ -392,7 +407,12 @@ public class MySqlTableSource implements ScanTableSource,
SupportsReadingMetadat
scanNewlyAddedTableEnabled,
jdbcProperties,
inlongMetric,
- inlongAudit);
+ inlongAudit,
+ rowKindsFiltered,
+ includeSchemaChange,
+ includeIncremental,
+ ghostDdlChange,
+ ghostTableRegex);
}
@Override
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
index 43598bf93..fddceb58e 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
@@ -56,6 +56,7 @@ public class AllMigrateTest {
option.put("migrate-all", "true");
option.put("include-incremental", "true");
option.put("include-schema-change", "true");
+ option.put("gh-ost.ddl.change", "true");
List<String> tables = new ArrayList(10);
tables.add("test.*");
List<FieldInfo> fields = Collections.singletonList(