This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e43b8f95 [INLONG-7825][Sort] Fix cannot output ddl statement of 
captured tables (#7863)
9e43b8f95 is described below

commit 9e43b8f952aea3087565ba17145e9c741581cb82
Author: emhui <[email protected]>
AuthorDate: Mon Apr 17 12:44:13 2023 +0800

    [INLONG-7825][Sort] Fix cannot output ddl statement of captured tables 
(#7863)
---
 .../main/java/org/apache/inlong/sort/base/Constants.java   |  2 --
 .../sort/cdc/mysql/source/reader/MySqlRecordEmitter.java   | 14 ++++----------
 2 files changed, 4 insertions(+), 12 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index 99fc91610..702329e6c 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -136,8 +136,6 @@ public final class Constants {
 
     public static final String DDL_FIELD_NAME = "ddl";
 
-    public static final String DDL_OP_DROP = "DROP";
-
     public static final ConfigOption<String> INLONG_METRIC =
             ConfigOptions.key("inlong.metric.labels")
                     .stringType()
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
index 3ad89632f..94f7b2c60 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
@@ -25,7 +25,6 @@ import io.debezium.relational.ColumnFilterMode;
 import io.debezium.relational.TableId;
 import io.debezium.relational.Tables;
 import io.debezium.relational.history.HistoryRecord;
-import io.debezium.relational.history.HistoryRecord.Fields;
 import io.debezium.relational.history.TableChanges;
 import io.debezium.relational.history.TableChanges.TableChange;
 import org.apache.flink.api.connector.source.SourceOutput;
@@ -47,7 +46,6 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
-import static org.apache.inlong.sort.base.Constants.DDL_OP_DROP;
 import static 
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getBinlogPosition;
 import static 
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getFetchTimestamp;
 import static 
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getHistoryRecord;
@@ -126,15 +124,11 @@ public final class MySqlRecordEmitter<T>
                 }
             }
 
-            // for drop table ddl, there's no table change events
             if (tableChanges.isEmpty()) {
-                String ddl = 
historyRecord.document().getString(Fields.DDL_STATEMENTS);
-                if (ddl.toUpperCase().startsWith(DDL_OP_DROP)) {
-                    TableId tableId = RecordUtils.getTableId(element);
-                    // if this table is one of the captured tables, output the 
ddl element
-                    if 
(splitState.getMySQLSplit().getTableSchemas().containsKey(tableId)) {
-                        outputDdlElement(element, output, splitState, null);
-                    }
+                TableId tableId = RecordUtils.getTableId(element);
+                // if this table is one of the captured tables, output the ddl 
element
+                if 
(splitState.getMySQLSplit().getTableSchemas().containsKey(tableId)) {
+                    outputDdlElement(element, output, splitState, null);
                 }
             }
 

Reply via email to