This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 9e43b8f95 [INLONG-7825][Sort] Fix cannot output ddl statement of
captured tables (#7863)
9e43b8f95 is described below
commit 9e43b8f952aea3087565ba17145e9c741581cb82
Author: emhui <[email protected]>
AuthorDate: Mon Apr 17 12:44:13 2023 +0800
[INLONG-7825][Sort] Fix cannot output ddl statement of captured tables
(#7863)
---
.../main/java/org/apache/inlong/sort/base/Constants.java | 2 --
.../sort/cdc/mysql/source/reader/MySqlRecordEmitter.java | 14 ++++----------
2 files changed, 4 insertions(+), 12 deletions(-)
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index 99fc91610..702329e6c 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -136,8 +136,6 @@ public final class Constants {
public static final String DDL_FIELD_NAME = "ddl";
- public static final String DDL_OP_DROP = "DROP";
-
public static final ConfigOption<String> INLONG_METRIC =
ConfigOptions.key("inlong.metric.labels")
.stringType()
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
index 3ad89632f..94f7b2c60 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
@@ -25,7 +25,6 @@ import io.debezium.relational.ColumnFilterMode;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.relational.history.HistoryRecord;
-import io.debezium.relational.history.HistoryRecord.Fields;
import io.debezium.relational.history.TableChanges;
import io.debezium.relational.history.TableChanges.TableChange;
import org.apache.flink.api.connector.source.SourceOutput;
@@ -47,7 +46,6 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
-import static org.apache.inlong.sort.base.Constants.DDL_OP_DROP;
import static
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getBinlogPosition;
import static
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getFetchTimestamp;
import static
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getHistoryRecord;
@@ -126,15 +124,11 @@ public final class MySqlRecordEmitter<T>
}
}
- // for drop table ddl, there's no table change events
if (tableChanges.isEmpty()) {
- String ddl =
historyRecord.document().getString(Fields.DDL_STATEMENTS);
- if (ddl.toUpperCase().startsWith(DDL_OP_DROP)) {
- TableId tableId = RecordUtils.getTableId(element);
- // if this table is one of the captured tables, output the
ddl element
- if
(splitState.getMySQLSplit().getTableSchemas().containsKey(tableId)) {
- outputDdlElement(element, output, splitState, null);
- }
+ TableId tableId = RecordUtils.getTableId(element);
+ // if this table is one of the captured tables, output the ddl
element
+ if
(splitState.getMySQLSplit().getTableSchemas().containsKey(tableId)) {
+ outputDdlElement(element, output, splitState, null);
}
}