EMsnap commented on code in PR #7750:
URL: https://github.com/apache/inlong/pull/7750#discussion_r1173266509


##########
inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java:
##########
@@ -219,12 +239,73 @@ private boolean shouldOutputRenameDdl(SourceRecord 
element, TableId tableId) {
                     }
                 }
             }
-        } catch (Exception e) {
-            LOG.error("parse ddl error {}", element, e);
+        } catch (JSQLParserException e) {
+            LOG.error("parse ddl error {}", historyRecord, e);
         }
         return false;
     }
 
+    /**
+     * after setting `gh-ost.ddl.change=true`, the alter ddl statements 
generated by gh-ost
+     * will be captured and stored in the state.
+     */
+    private void collectGhostDdl(SourceRecord element, MySqlSplitState 
splitState, HistoryRecord historyRecord) {
+        String ddl = historyRecord.document().getString(Fields.DDL_STATEMENTS);
+        if (StringUtils.isBlank(ddl)) {
+            return;
+        }
+        String tableName = 
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableName(element);
+        Pattern compile = Pattern.compile(this.ghostTableRegex);
+        Matcher matcher = compile.matcher(tableName);
+        if (matcher.find()) {
+            tableName = matcher.group(1);
+            String dbName = 
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getDbName(element);
+            TableId tableId = 
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableId(
+                    dbName,
+                    tableName);
+            MySqlBinlogSplitState mySqlBinlogSplitState = 
splitState.asBinlogSplitState();
+            if (ddl.toUpperCase().startsWith(DDL_OP_ALTER)
+                    && 
mySqlBinlogSplitState.getTableSchemas().containsKey(tableId)) {
+                String matchTableInSqlRegex = ghostTableRegex;
+                if (matchTableInSqlRegex.startsWith(CARET) && 
matchTableInSqlRegex.endsWith(DOLLAR)) {
+                    matchTableInSqlRegex = matchTableInSqlRegex.substring(1, 
matchTableInSqlRegex.length() - 1);
+                }
+                mySqlBinlogSplitState.recordTableDdl(
+                        tableId,
+                        ddl.replace(GHOST_TAG, "")
+                                .replaceAll("\\s+", " ")
+                                .replaceAll(matchTableInSqlRegex, tableName));
+            }
+        }
+    }
+
+    /**
+     * if gh-ost output the 'rename table`_a_gho` to `a`' ddl statement (where 
`a` is the captured table
+     * and `_a_gho` is the gh-ost table), its tableChanges won't be empty, and 
the tableName will contain
+     * both the captured table and the gh-ost generated table.
+     * We should retrieve the alter statements generated by gh-ost from the 
state and update the `source.table`
+     * and `historyRecord` values of the element.
+     */
+    private void updateGhostDdlElement(SourceRecord element, MySqlSplitState 
splitState, HistoryRecord historyRecord) {
+        String tableNames = 
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableName(element);
+        for (String tableName : tableNames.split(",")) {
+            Pattern compile = Pattern.compile(ghostTableRegex);
+            Matcher matcher = compile.matcher(tableName);
+            if (matcher.find()) {
+                tableName = matcher.group(1);
+                String dbName = 
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getDbName(element);
+                TableId tableId = 
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableId(

Review Comment:
   ditto



##########
inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java:
##########
@@ -219,12 +239,73 @@ private boolean shouldOutputRenameDdl(SourceRecord 
element, TableId tableId) {
                     }
                 }
             }
-        } catch (Exception e) {
-            LOG.error("parse ddl error {}", element, e);
+        } catch (JSQLParserException e) {
+            LOG.error("parse ddl error {}", historyRecord, e);
         }
         return false;
     }
 
+    /**
+     * after setting `gh-ost.ddl.change=true`, the alter ddl statements 
generated by gh-ost
+     * will be captured and stored in the state.
+     */
+    private void collectGhostDdl(SourceRecord element, MySqlSplitState 
splitState, HistoryRecord historyRecord) {
+        String ddl = historyRecord.document().getString(Fields.DDL_STATEMENTS);
+        if (StringUtils.isBlank(ddl)) {
+            return;
+        }
+        String tableName = 
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableName(element);
+        Pattern compile = Pattern.compile(this.ghostTableRegex);
+        Matcher matcher = compile.matcher(tableName);
+        if (matcher.find()) {
+            tableName = matcher.group(1);
+            String dbName = 
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getDbName(element);
+            TableId tableId = 
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableId(
+                    dbName,
+                    tableName);
+            MySqlBinlogSplitState mySqlBinlogSplitState = 
splitState.asBinlogSplitState();
+            if (ddl.toUpperCase().startsWith(DDL_OP_ALTER)
+                    && 
mySqlBinlogSplitState.getTableSchemas().containsKey(tableId)) {
+                String matchTableInSqlRegex = ghostTableRegex;
+                if (matchTableInSqlRegex.startsWith(CARET) && 
matchTableInSqlRegex.endsWith(DOLLAR)) {
+                    matchTableInSqlRegex = matchTableInSqlRegex.substring(1, 
matchTableInSqlRegex.length() - 1);
+                }
+                mySqlBinlogSplitState.recordTableDdl(
+                        tableId,
+                        ddl.replace(GHOST_TAG, "")
+                                .replaceAll("\\s+", " ")
+                                .replaceAll(matchTableInSqlRegex, tableName));
+            }
+        }
+    }
+
+    /**
+     * if gh-ost output the 'rename table`_a_gho` to `a`' ddl statement (where 
`a` is the captured table
+     * and `_a_gho` is the gh-ost table), its tableChanges won't be empty, and 
the tableName will contain
+     * both the captured table and the gh-ost generated table.
+     * We should retrieve the alter statements generated by gh-ost from the 
state and update the `source.table`
+     * and `historyRecord` values of the element.
+     */
+    private void updateGhostDdlElement(SourceRecord element, MySqlSplitState 
splitState, HistoryRecord historyRecord) {
+        String tableNames = 
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableName(element);
+        for (String tableName : tableNames.split(",")) {
+            Pattern compile = Pattern.compile(ghostTableRegex);
+            Matcher matcher = compile.matcher(tableName);
+            if (matcher.find()) {
+                tableName = matcher.group(1);
+                String dbName = 
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getDbName(element);

Review Comment:
   ditto



##########
inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java:
##########
@@ -219,12 +239,73 @@ private boolean shouldOutputRenameDdl(SourceRecord 
element, TableId tableId) {
                     }
                 }
             }
-        } catch (Exception e) {
-            LOG.error("parse ddl error {}", element, e);
+        } catch (JSQLParserException e) {
+            LOG.error("parse ddl error {}", historyRecord, e);
         }
         return false;
     }
 
+    /**
+     * after setting `gh-ost.ddl.change=true`, the alter ddl statements 
generated by gh-ost
+     * will be captured and stored in the state.
+     */
+    private void collectGhostDdl(SourceRecord element, MySqlSplitState 
splitState, HistoryRecord historyRecord) {
+        String ddl = historyRecord.document().getString(Fields.DDL_STATEMENTS);
+        if (StringUtils.isBlank(ddl)) {
+            return;
+        }
+        String tableName = 
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableName(element);
+        Pattern compile = Pattern.compile(this.ghostTableRegex);
+        Matcher matcher = compile.matcher(tableName);
+        if (matcher.find()) {
+            tableName = matcher.group(1);
+            String dbName = 
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getDbName(element);
+            TableId tableId = 
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableId(
+                    dbName,
+                    tableName);
+            MySqlBinlogSplitState mySqlBinlogSplitState = 
splitState.asBinlogSplitState();
+            if (ddl.toUpperCase().startsWith(DDL_OP_ALTER)
+                    && 
mySqlBinlogSplitState.getTableSchemas().containsKey(tableId)) {
+                String matchTableInSqlRegex = ghostTableRegex;
+                if (matchTableInSqlRegex.startsWith(CARET) && 
matchTableInSqlRegex.endsWith(DOLLAR)) {
+                    matchTableInSqlRegex = matchTableInSqlRegex.substring(1, 
matchTableInSqlRegex.length() - 1);
+                }
+                mySqlBinlogSplitState.recordTableDdl(
+                        tableId,
+                        ddl.replace(GHOST_TAG, "")
+                                .replaceAll("\\s+", " ")
+                                .replaceAll(matchTableInSqlRegex, tableName));
+            }
+        }
+    }
+
+    /**
+     * if gh-ost output the 'rename table`_a_gho` to `a`' ddl statement (where 
`a` is the captured table
+     * and `_a_gho` is the gh-ost table), its tableChanges won't be empty, and 
the tableName will contain
+     * both the captured table and the gh-ost generated table.
+     * We should retrieve the alter statements generated by gh-ost from the 
state and update the `source.table`
+     * and `historyRecord` values of the element.
+     */
+    private void updateGhostDdlElement(SourceRecord element, MySqlSplitState 
splitState, HistoryRecord historyRecord) {
+        String tableNames = 
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableName(element);

Review Comment:
   the method can be imported 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to