This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e099d95ac [INLONG-8796][Sort] Add SchemaChangeEventHandler to deal 
schema change event by each connector (#8806)
0e099d95ac is described below

commit 0e099d95ac737d39e587f2994baf1e993d57e4d4
Author: e-mhui <[email protected]>
AuthorDate: Mon Aug 28 10:58:14 2023 +0800

    [INLONG-8796][Sort] Add SchemaChangeEventHandler to deal schema change 
event by each connector (#8806)
---
 .../base/relational/JdbcSourceEventDispatcher.java | 22 ++++--------
 .../handler/SchemaChangeEventHandler.java          | 28 +++++++++++++++
 .../inlong/sort/cdc/base/util/RecordUtils.java     | 16 ++-------
 .../reader/fetch/OracleSourceFetchTaskContext.java |  4 ++-
 .../handler/OracleSchemaChangeEventHandler.java    | 40 ++++++++++++++++++++++
 .../relational/OracleSourceEventDispatcher.java    |  7 ++--
 6 files changed, 85 insertions(+), 32 deletions(-)

diff --git 
a/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/relational/JdbcSourceEventDispatcher.java
 
b/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/relational/JdbcSourceEventDispatcher.java
index 42fd025b2e..e5cde80b00 100644
--- 
a/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/relational/JdbcSourceEventDispatcher.java
+++ 
b/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/relational/JdbcSourceEventDispatcher.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.sort.cdc.base.relational;
 
+import 
org.apache.inlong.sort.cdc.base.relational.handler.SchemaChangeEventHandler;
 import org.apache.inlong.sort.cdc.base.source.meta.offset.Offset;
 import org.apache.inlong.sort.cdc.base.source.meta.split.SourceSplitBase;
 import org.apache.inlong.sort.cdc.base.source.meta.wartermark.WatermarkEvent;
@@ -47,11 +48,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Map;
 
-import static 
org.apache.inlong.sort.cdc.base.util.RecordUtils.isMysqlConnector;
-
 /**
  * A subclass implementation of {@link EventDispatcher}.
  *
@@ -81,6 +79,7 @@ public class JdbcSourceEventDispatcher extends 
EventDispatcher<TableId> {
     public final Schema schemaChangeKeySchema;
     public final Schema schemaChangeValueSchema;
     public final String topic;
+    private final SchemaChangeEventHandler schemaChangeEventHandler;
 
     public JdbcSourceEventDispatcher(
             CommonConnectorConfig connectorConfig,
@@ -90,7 +89,8 @@ public class JdbcSourceEventDispatcher extends 
EventDispatcher<TableId> {
             DataCollectionFilters.DataCollectionFilter<TableId> filter,
             ChangeEventCreator changeEventCreator,
             EventMetadataProvider metadataProvider,
-            SchemaNameAdjuster schemaNameAdjuster) {
+            SchemaNameAdjuster schemaNameAdjuster,
+            SchemaChangeEventHandler schemaChangeEventHandler) {
         super(
                 connectorConfig,
                 topicSelector,
@@ -130,6 +130,7 @@ public class JdbcSourceEventDispatcher extends 
EventDispatcher<TableId> {
                                 
connectorConfig.getSourceInfoStructMaker().schema())
                         .field(HISTORY_RECORD_FIELD, 
Schema.OPTIONAL_STRING_SCHEMA)
                         .build();
+        this.schemaChangeEventHandler = schemaChangeEventHandler;
     }
 
     public ChangeEventQueue<DataChangeEvent> getQueue() {
@@ -193,22 +194,13 @@ public class JdbcSourceEventDispatcher extends 
EventDispatcher<TableId> {
         }
 
         private Struct schemaChangeRecordValue(SchemaChangeEvent event) throws 
IOException {
-            Map<String, Object> source = new HashMap<>();
-            if (isMysqlConnector(event.getSource())) {
-                Struct sourceInfo = event.getSource();
-                String fileName = 
sourceInfo.getString(BINLOG_FILENAME_OFFSET_KEY);
-                Long pos = sourceInfo.getInt64(BINLOG_POSITION_OFFSET_KEY);
-                Long serverId = sourceInfo.getInt64(SERVER_ID_KEY);
-                source.put(SERVER_ID_KEY, serverId);
-                source.put(BINLOG_FILENAME_OFFSET_KEY, fileName);
-                source.put(BINLOG_POSITION_OFFSET_KEY, pos);
-            }
+            Map<String, Object> source = 
schemaChangeEventHandler.parseSource(event);
             HistoryRecord historyRecord =
                     new HistoryRecord(
                             source,
                             event.getOffset(),
                             event.getDatabase(),
-                            null,
+                            event.getSchema(),
                             event.getDdl(),
                             event.getTableChanges());
             String historyStr = 
DOCUMENT_WRITER.write(historyRecord.document());
diff --git 
a/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/relational/handler/SchemaChangeEventHandler.java
 
b/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/relational/handler/SchemaChangeEventHandler.java
new file mode 100644
index 0000000000..85aeac4a5d
--- /dev/null
+++ 
b/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/relational/handler/SchemaChangeEventHandler.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.base.relational.handler;
+
+import io.debezium.schema.SchemaChangeEvent;
+
+import java.util.Map;
+
+/** SchemaChangeEvent Handler */
+public interface SchemaChangeEventHandler {
+
+    Map<String, Object> parseSource(SchemaChangeEvent event);
+}
diff --git 
a/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/RecordUtils.java
 
b/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/RecordUtils.java
index 1392b984ed..e63da827a7 100644
--- 
a/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/RecordUtils.java
+++ 
b/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/RecordUtils.java
@@ -56,8 +56,7 @@ public class RecordUtils {
             .asList("CHAR", "NCHAR", "NVARCHAR2", "NVCHAER", "VARCHAR", 
"VARCHAR2", "CLOB", "NCLOB", "XMLType");
     private static final List<String> BINARY_TYPE = Arrays.asList("BLOB", 
"ROWID");
     private static final List<String> BIGINT_TYPE = Arrays.asList("INTERVAL 
DAY TO SECOND", "INTERVAL YEAR TO MONTH");
-    public static final String MYSQL_SCHEMA_CHANGE_EVENT_KEY_NAME = 
"io.debezium.connector.mysql.SchemaChangeKey";
-    public static final String ORACLE_SCHEMA_CHANGE_EVENT_KEY_NAME = 
"io.debezium.connector.oracle.SchemaChangeKey";
+    public static final String SCHEMA_CHANGE_EVENT_KEY_NAME = 
"io.debezium.connector.*.SchemaChangeKey";
     public static final String CONNECTOR = "connector";
     public static final String MYSQL_CONNECTOR = "mysql";
 
@@ -145,18 +144,7 @@ public class RecordUtils {
      */
     public static boolean isSchemaChangeEvent(SourceRecord sourceRecord) {
         Schema keySchema = sourceRecord.keySchema();
-        return keySchema != null && 
(MYSQL_SCHEMA_CHANGE_EVENT_KEY_NAME.equalsIgnoreCase(keySchema.name())
-                || 
ORACLE_SCHEMA_CHANGE_EVENT_KEY_NAME.equalsIgnoreCase(keySchema.name()));
-    }
-
-    /**
-     * Whether the source belong Mysql Connector
-     * @param source
-     * @return true if the source belong Mysql Connector
-     */
-    public static boolean isMysqlConnector(Struct source) {
-        String connector = source.getString(CONNECTOR);
-        return MYSQL_CONNECTOR.equalsIgnoreCase(connector);
+        return keySchema != null && 
(keySchema.name().matches(SCHEMA_CHANGE_EVENT_KEY_NAME));
     }
 
     public static boolean isDdlRecord(Struct value) {
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java
index ffe106b488..0d1bfbd45e 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java
@@ -25,6 +25,7 @@ import 
org.apache.inlong.sort.cdc.base.source.meta.split.SourceSplitBase;
 import 
org.apache.inlong.sort.cdc.base.source.reader.external.JdbcSourceFetchTaskContext;
 import org.apache.inlong.sort.cdc.oracle.source.config.OracleSourceConfig;
 import org.apache.inlong.sort.cdc.oracle.source.meta.offset.RedoLogOffset;
+import 
org.apache.inlong.sort.cdc.oracle.source.reader.handler.OracleSchemaChangeEventHandler;
 import 
org.apache.inlong.sort.cdc.oracle.source.relational.OracleSourceEventDispatcher;
 import org.apache.inlong.sort.cdc.oracle.source.utils.OracleUtils;
 
@@ -134,7 +135,8 @@ public class OracleSourceFetchTaskContext extends 
JdbcSourceFetchTaskContext {
                         
connectorConfig.getTableFilters().dataCollectionFilter(),
                         DataChangeEvent::new,
                         metadataProvider,
-                        schemaNameAdjuster);
+                        schemaNameAdjuster,
+                        new OracleSchemaChangeEventHandler());
 
         final OracleChangeEventSourceMetricsFactory 
changeEventSourceMetricsFactory =
                 new OracleChangeEventSourceMetricsFactory(
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/handler/OracleSchemaChangeEventHandler.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/handler/OracleSchemaChangeEventHandler.java
new file mode 100644
index 0000000000..d954e450c2
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/handler/OracleSchemaChangeEventHandler.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.oracle.source.reader.handler;
+
+import 
org.apache.inlong.sort.cdc.base.relational.handler.SchemaChangeEventHandler;
+
+import io.debezium.schema.SchemaChangeEvent;
+import org.apache.kafka.connect.data.Struct;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static io.debezium.connector.oracle.SourceInfo.SCN_KEY;
+
+public class OracleSchemaChangeEventHandler implements 
SchemaChangeEventHandler {
+
+    @Override
+    public Map<String, Object> parseSource(SchemaChangeEvent event) {
+        Map<String, Object> source = new HashMap<>();
+        Struct sourceInfo = event.getSource();
+        String scn = sourceInfo.getString(SCN_KEY);
+        source.put(SCN_KEY, scn);
+        return source;
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/relational/OracleSourceEventDispatcher.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/relational/OracleSourceEventDispatcher.java
index 2a0b8e77c5..5e7a67f96b 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/relational/OracleSourceEventDispatcher.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/relational/OracleSourceEventDispatcher.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sort.cdc.oracle.source.relational;
 
 import org.apache.inlong.sort.cdc.base.relational.JdbcSourceEventDispatcher;
+import 
org.apache.inlong.sort.cdc.base.relational.handler.SchemaChangeEventHandler;
 
 import io.debezium.config.CommonConnectorConfig;
 import io.debezium.connector.base.ChangeEventQueue;
@@ -66,7 +67,8 @@ public class OracleSourceEventDispatcher extends 
JdbcSourceEventDispatcher {
             DataCollectionFilters.DataCollectionFilter<TableId> filter,
             ChangeEventCreator changeEventCreator,
             EventMetadataProvider metadataProvider,
-            SchemaNameAdjuster schemaNameAdjuster) {
+            SchemaNameAdjuster schemaNameAdjuster,
+            SchemaChangeEventHandler schemaChangeEventHandler) {
         super(
                 connectorConfig,
                 topicSelector,
@@ -75,7 +77,8 @@ public class OracleSourceEventDispatcher extends 
JdbcSourceEventDispatcher {
                 filter,
                 changeEventCreator,
                 metadataProvider,
-                schemaNameAdjuster);
+                schemaNameAdjuster,
+                schemaChangeEventHandler);
     }
 
     @Override

Reply via email to