This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 0e099d95ac [INLONG-8796][Sort] Add SchemaChangeEventHandler to deal
schema change event by each connector (#8806)
0e099d95ac is described below
commit 0e099d95ac737d39e587f2994baf1e993d57e4d4
Author: e-mhui <[email protected]>
AuthorDate: Mon Aug 28 10:58:14 2023 +0800
[INLONG-8796][Sort] Add SchemaChangeEventHandler to deal schema change
event by each connector (#8806)
---
.../base/relational/JdbcSourceEventDispatcher.java | 22 ++++--------
.../handler/SchemaChangeEventHandler.java | 28 +++++++++++++++
.../inlong/sort/cdc/base/util/RecordUtils.java | 16 ++-------
.../reader/fetch/OracleSourceFetchTaskContext.java | 4 ++-
.../handler/OracleSchemaChangeEventHandler.java | 40 ++++++++++++++++++++++
.../relational/OracleSourceEventDispatcher.java | 7 ++--
6 files changed, 85 insertions(+), 32 deletions(-)
diff --git
a/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/relational/JdbcSourceEventDispatcher.java
b/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/relational/JdbcSourceEventDispatcher.java
index 42fd025b2e..e5cde80b00 100644
---
a/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/relational/JdbcSourceEventDispatcher.java
+++
b/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/relational/JdbcSourceEventDispatcher.java
@@ -17,6 +17,7 @@
package org.apache.inlong.sort.cdc.base.relational;
+import
org.apache.inlong.sort.cdc.base.relational.handler.SchemaChangeEventHandler;
import org.apache.inlong.sort.cdc.base.source.meta.offset.Offset;
import org.apache.inlong.sort.cdc.base.source.meta.split.SourceSplitBase;
import org.apache.inlong.sort.cdc.base.source.meta.wartermark.WatermarkEvent;
@@ -47,11 +48,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collection;
-import java.util.HashMap;
import java.util.Map;
-import static
org.apache.inlong.sort.cdc.base.util.RecordUtils.isMysqlConnector;
-
/**
* A subclass implementation of {@link EventDispatcher}.
*
@@ -81,6 +79,7 @@ public class JdbcSourceEventDispatcher extends
EventDispatcher<TableId> {
public final Schema schemaChangeKeySchema;
public final Schema schemaChangeValueSchema;
public final String topic;
+ private final SchemaChangeEventHandler schemaChangeEventHandler;
public JdbcSourceEventDispatcher(
CommonConnectorConfig connectorConfig,
@@ -90,7 +89,8 @@ public class JdbcSourceEventDispatcher extends
EventDispatcher<TableId> {
DataCollectionFilters.DataCollectionFilter<TableId> filter,
ChangeEventCreator changeEventCreator,
EventMetadataProvider metadataProvider,
- SchemaNameAdjuster schemaNameAdjuster) {
+ SchemaNameAdjuster schemaNameAdjuster,
+ SchemaChangeEventHandler schemaChangeEventHandler) {
super(
connectorConfig,
topicSelector,
@@ -130,6 +130,7 @@ public class JdbcSourceEventDispatcher extends
EventDispatcher<TableId> {
connectorConfig.getSourceInfoStructMaker().schema())
.field(HISTORY_RECORD_FIELD,
Schema.OPTIONAL_STRING_SCHEMA)
.build();
+ this.schemaChangeEventHandler = schemaChangeEventHandler;
}
public ChangeEventQueue<DataChangeEvent> getQueue() {
@@ -193,22 +194,13 @@ public class JdbcSourceEventDispatcher extends
EventDispatcher<TableId> {
}
private Struct schemaChangeRecordValue(SchemaChangeEvent event) throws
IOException {
- Map<String, Object> source = new HashMap<>();
- if (isMysqlConnector(event.getSource())) {
- Struct sourceInfo = event.getSource();
- String fileName =
sourceInfo.getString(BINLOG_FILENAME_OFFSET_KEY);
- Long pos = sourceInfo.getInt64(BINLOG_POSITION_OFFSET_KEY);
- Long serverId = sourceInfo.getInt64(SERVER_ID_KEY);
- source.put(SERVER_ID_KEY, serverId);
- source.put(BINLOG_FILENAME_OFFSET_KEY, fileName);
- source.put(BINLOG_POSITION_OFFSET_KEY, pos);
- }
+ Map<String, Object> source =
schemaChangeEventHandler.parseSource(event);
HistoryRecord historyRecord =
new HistoryRecord(
source,
event.getOffset(),
event.getDatabase(),
- null,
+ event.getSchema(),
event.getDdl(),
event.getTableChanges());
String historyStr =
DOCUMENT_WRITER.write(historyRecord.document());
diff --git
a/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/relational/handler/SchemaChangeEventHandler.java
b/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/relational/handler/SchemaChangeEventHandler.java
new file mode 100644
index 0000000000..85aeac4a5d
--- /dev/null
+++
b/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/relational/handler/SchemaChangeEventHandler.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.base.relational.handler;
+
+import io.debezium.schema.SchemaChangeEvent;
+
+import java.util.Map;
+
+/** SchemaChangeEvent Handler */
+public interface SchemaChangeEventHandler {
+
+ Map<String, Object> parseSource(SchemaChangeEvent event);
+}
diff --git
a/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/RecordUtils.java
b/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/RecordUtils.java
index 1392b984ed..e63da827a7 100644
---
a/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/RecordUtils.java
+++
b/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/RecordUtils.java
@@ -56,8 +56,7 @@ public class RecordUtils {
.asList("CHAR", "NCHAR", "NVARCHAR2", "NVCHAER", "VARCHAR",
"VARCHAR2", "CLOB", "NCLOB", "XMLType");
private static final List<String> BINARY_TYPE = Arrays.asList("BLOB",
"ROWID");
private static final List<String> BIGINT_TYPE = Arrays.asList("INTERVAL
DAY TO SECOND", "INTERVAL YEAR TO MONTH");
- public static final String MYSQL_SCHEMA_CHANGE_EVENT_KEY_NAME =
"io.debezium.connector.mysql.SchemaChangeKey";
- public static final String ORACLE_SCHEMA_CHANGE_EVENT_KEY_NAME =
"io.debezium.connector.oracle.SchemaChangeKey";
+ public static final String SCHEMA_CHANGE_EVENT_KEY_NAME =
"io.debezium.connector.*.SchemaChangeKey";
public static final String CONNECTOR = "connector";
public static final String MYSQL_CONNECTOR = "mysql";
@@ -145,18 +144,7 @@ public class RecordUtils {
*/
public static boolean isSchemaChangeEvent(SourceRecord sourceRecord) {
Schema keySchema = sourceRecord.keySchema();
- return keySchema != null &&
(MYSQL_SCHEMA_CHANGE_EVENT_KEY_NAME.equalsIgnoreCase(keySchema.name())
- ||
ORACLE_SCHEMA_CHANGE_EVENT_KEY_NAME.equalsIgnoreCase(keySchema.name()));
- }
-
- /**
- * Whether the source belong Mysql Connector
- * @param source
- * @return true if the source belong Mysql Connector
- */
- public static boolean isMysqlConnector(Struct source) {
- String connector = source.getString(CONNECTOR);
- return MYSQL_CONNECTOR.equalsIgnoreCase(connector);
+ return keySchema != null &&
(keySchema.name().matches(SCHEMA_CHANGE_EVENT_KEY_NAME));
}
public static boolean isDdlRecord(Struct value) {
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java
index ffe106b488..0d1bfbd45e 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java
@@ -25,6 +25,7 @@ import
org.apache.inlong.sort.cdc.base.source.meta.split.SourceSplitBase;
import
org.apache.inlong.sort.cdc.base.source.reader.external.JdbcSourceFetchTaskContext;
import org.apache.inlong.sort.cdc.oracle.source.config.OracleSourceConfig;
import org.apache.inlong.sort.cdc.oracle.source.meta.offset.RedoLogOffset;
+import
org.apache.inlong.sort.cdc.oracle.source.reader.handler.OracleSchemaChangeEventHandler;
import
org.apache.inlong.sort.cdc.oracle.source.relational.OracleSourceEventDispatcher;
import org.apache.inlong.sort.cdc.oracle.source.utils.OracleUtils;
@@ -134,7 +135,8 @@ public class OracleSourceFetchTaskContext extends
JdbcSourceFetchTaskContext {
connectorConfig.getTableFilters().dataCollectionFilter(),
DataChangeEvent::new,
metadataProvider,
- schemaNameAdjuster);
+ schemaNameAdjuster,
+ new OracleSchemaChangeEventHandler());
final OracleChangeEventSourceMetricsFactory
changeEventSourceMetricsFactory =
new OracleChangeEventSourceMetricsFactory(
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/handler/OracleSchemaChangeEventHandler.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/handler/OracleSchemaChangeEventHandler.java
new file mode 100644
index 0000000000..d954e450c2
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/handler/OracleSchemaChangeEventHandler.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.oracle.source.reader.handler;
+
+import
org.apache.inlong.sort.cdc.base.relational.handler.SchemaChangeEventHandler;
+
+import io.debezium.schema.SchemaChangeEvent;
+import org.apache.kafka.connect.data.Struct;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static io.debezium.connector.oracle.SourceInfo.SCN_KEY;
+
+public class OracleSchemaChangeEventHandler implements
SchemaChangeEventHandler {
+
+ @Override
+ public Map<String, Object> parseSource(SchemaChangeEvent event) {
+ Map<String, Object> source = new HashMap<>();
+ Struct sourceInfo = event.getSource();
+ String scn = sourceInfo.getString(SCN_KEY);
+ source.put(SCN_KEY, scn);
+ return source;
+ }
+}
\ No newline at end of file
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/relational/OracleSourceEventDispatcher.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/relational/OracleSourceEventDispatcher.java
index 2a0b8e77c5..5e7a67f96b 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/relational/OracleSourceEventDispatcher.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/relational/OracleSourceEventDispatcher.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sort.cdc.oracle.source.relational;
import org.apache.inlong.sort.cdc.base.relational.JdbcSourceEventDispatcher;
+import
org.apache.inlong.sort.cdc.base.relational.handler.SchemaChangeEventHandler;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.base.ChangeEventQueue;
@@ -66,7 +67,8 @@ public class OracleSourceEventDispatcher extends
JdbcSourceEventDispatcher {
DataCollectionFilters.DataCollectionFilter<TableId> filter,
ChangeEventCreator changeEventCreator,
EventMetadataProvider metadataProvider,
- SchemaNameAdjuster schemaNameAdjuster) {
+ SchemaNameAdjuster schemaNameAdjuster,
+ SchemaChangeEventHandler schemaChangeEventHandler) {
super(
connectorConfig,
topicSelector,
@@ -75,7 +77,8 @@ public class OracleSourceEventDispatcher extends
JdbcSourceEventDispatcher {
filter,
changeEventCreator,
metadataProvider,
- schemaNameAdjuster);
+ schemaNameAdjuster,
+ schemaChangeEventHandler);
}
@Override