This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 64fe57a  [improve] adjust the code framework related to CDC. (#319)
64fe57a is described below

commit 64fe57a22e867e8f7e56de8832a4420cd4147802
Author: bingquanzhao <[email protected]>
AuthorDate: Thu Feb 29 14:52:41 2024 +0800

    [improve] adjust the code framework related to CDC. (#319)
---
 .../serializer/jsondebezium/CdcDataChange.java     | 40 ++++++++++++
 .../serializer/jsondebezium/CdcSchemaChange.java}  | 28 +++++----
 .../jsondebezium/JsonDebeziumDataChange.java       |  9 +--
 .../jsondebezium/JsonDebeziumSchemaChange.java     |  6 +-
 .../apache/doris/flink/tools/cdc/DatabaseSync.java | 39 ++++++++----
 .../doris/flink/tools/cdc/JdbcSourceSchema.java    | 71 ++++++++++++++++++++++
 .../flink/tools/cdc/ParsingProcessFunction.java    | 12 ++--
 .../apache/doris/flink/tools/cdc/SourceSchema.java | 52 +++-------------
 .../doris/flink/tools/cdc/mysql/MysqlSchema.java   |  4 +-
 .../doris/flink/tools/cdc/oracle/OracleSchema.java |  4 +-
 .../flink/tools/cdc/postgres/PostgresSchema.java   |  4 +-
 .../flink/tools/cdc/sqlserver/SqlServerSchema.java |  4 +-
 12 files changed, 185 insertions(+), 88 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcDataChange.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcDataChange.java
new file mode 100644
index 0000000..c344aae
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcDataChange.java
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.doris.flink.sink.writer.ChangeEvent;
+import org.apache.doris.flink.sink.writer.serializer.DorisRecord;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * When cdc connector captures data changes from the source database you need 
to inherit this class
+ * to complete the synchronized data changes to Doris schema. Supports data 
messages serialized to
+ * json
+ */
+public abstract class CdcDataChange implements ChangeEvent {
+
+    protected abstract DorisRecord serialize(String record, JsonNode 
recordRoot, String op)
+            throws IOException;
+
+    protected abstract Map<String, Object> extractBeforeRow(JsonNode record);
+
+    protected abstract Map<String, Object> extractAfterRow(JsonNode record);
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcSchemaChange.java
similarity index 51%
copy from 
flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java
copy to 
flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcSchemaChange.java
index bf0eee7..858a5ef 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcSchemaChange.java
@@ -15,21 +15,25 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.flink.tools.cdc.mysql;
+package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
 
-import org.apache.doris.flink.tools.cdc.SourceSchema;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.doris.flink.sink.writer.ChangeEvent;
 
-import java.sql.DatabaseMetaData;
+import java.io.IOException;
 
-public class MysqlSchema extends SourceSchema {
+/**
+ * When cdc connector captures data changes about source database schema 
changes, you need to
+ * inherit this class to complete the synchronized changes to Doris schema. 
Supports data messages
+ * serialized to json
+ */
+public abstract class CdcSchemaChange implements ChangeEvent {
 
-    public MysqlSchema(
-            DatabaseMetaData metaData, String databaseName, String tableName, 
String tableComment)
-            throws Exception {
-        super(metaData, databaseName, null, tableName, tableComment);
-    }
+    protected abstract String extractDatabase(JsonNode record);
 
-    public String convertToDorisType(String fieldType, Integer precision, 
Integer scale) {
-        return MysqlType.toDorisType(fieldType, precision, scale);
-    }
+    protected abstract String extractTable(JsonNode record);
+
+    public abstract boolean schemaChange(JsonNode recordRoot) throws 
IOException;
+
+    protected abstract String getCdcTableIdentifier(JsonNode record);
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java
index 67aef02..5075adf 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.doris.flink.cfg.DorisOptions;
-import org.apache.doris.flink.sink.writer.ChangeEvent;
 import org.apache.doris.flink.sink.writer.serializer.DorisRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,7 +40,7 @@ import static 
org.apache.doris.flink.sink.util.DeleteOperation.addDeleteSign;
  * into doris through stream load.<br>
  * Supported data changes include: read, insert, update, delete.
  */
-public class JsonDebeziumDataChange implements ChangeEvent {
+public class JsonDebeziumDataChange extends CdcDataChange {
     private static final Logger LOG = 
LoggerFactory.getLogger(JsonDebeziumDataChange.class);
 
     private static final String OP_READ = "r"; // snapshot read
@@ -122,11 +121,13 @@ public class JsonDebeziumDataChange implements 
ChangeEvent {
         return updateRow.toString().getBytes(StandardCharsets.UTF_8);
     }
 
-    private Map<String, Object> extractBeforeRow(JsonNode record) {
+    @Override
+    protected Map<String, Object> extractBeforeRow(JsonNode record) {
         return extractRow(record.get("before"));
     }
 
-    private Map<String, Object> extractAfterRow(JsonNode record) {
+    @Override
+    protected Map<String, Object> extractAfterRow(JsonNode record) {
         return extractRow(record.get("after"));
     }
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
index f449857..ccb2046 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
@@ -27,7 +27,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.NullNode;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.sink.schema.SchemaChangeManager;
-import org.apache.doris.flink.sink.writer.ChangeEvent;
 import org.apache.doris.flink.tools.cdc.SourceSchema;
 
 import java.util.Map;
@@ -43,7 +42,7 @@ import java.util.regex.Pattern;
  * comment synchronization, supports multi-column changes, and supports column 
name rename. Need to
  * be enabled by configuring use-new-schema-change.
  */
-public abstract class JsonDebeziumSchemaChange implements ChangeEvent {
+public abstract class JsonDebeziumSchemaChange extends CdcSchemaChange {
     protected static String addDropDDLRegex =
             
"ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+(COLUMN\\s+)?([^\\s]+)(\\s+([^\\s]+))?.*";
     protected Pattern addDropDDLPattern;
@@ -69,6 +68,7 @@ public abstract class JsonDebeziumSchemaChange implements 
ChangeEvent {
         return sourceTableName.equals(dbTbl);
     }
 
+    @Override
     protected String extractDatabase(JsonNode record) {
         if (record.get("source").has("schema")) {
             // compatible with schema
@@ -78,6 +78,7 @@ public abstract class JsonDebeziumSchemaChange implements 
ChangeEvent {
         }
     }
 
+    @Override
     protected String extractTable(JsonNode record) {
         return extractJsonNode(record.get("source"), "table");
     }
@@ -102,6 +103,7 @@ public abstract class JsonDebeziumSchemaChange implements 
ChangeEvent {
     }
 
     @VisibleForTesting
+    @Override
     public String getCdcTableIdentifier(JsonNode record) {
         String db = extractJsonNode(record.get("source"), "db");
         String schema = extractJsonNode(record.get("source"), "schema");
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 2aa09b6..8627a47 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -35,6 +35,7 @@ import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.sink.DorisSink;
 import org.apache.doris.flink.sink.writer.WriteMode;
+import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer;
 import 
org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerializer;
 import org.apache.doris.flink.table.DorisConfigOptions;
 import org.slf4j.Logger;
@@ -159,7 +160,7 @@ public abstract class DatabaseSync {
             streamSource.sinkTo(buildDorisSink());
         } else {
             SingleOutputStreamOperator<Void> parsedStream =
-                    streamSource.process(new 
ParsingProcessFunction(converter));
+                    streamSource.process(buildProcessFunction());
             for (String table : dorisTables) {
                 OutputTag<String> recordOutputTag =
                         ParsingProcessFunction.createRecordOutputTag(table);
@@ -200,16 +201,26 @@ public abstract class DatabaseSync {
         return buildDorisSink(null);
     }
 
+    public ParsingProcessFunction buildProcessFunction() {
+        return new ParsingProcessFunction(converter);
+    }
+
     /** create doris sink. */
     public DorisSink<String> buildDorisSink(String table) {
         String fenodes = sinkConfig.getString(DorisConfigOptions.FENODES);
         String benodes = sinkConfig.getString(DorisConfigOptions.BENODES);
         String user = sinkConfig.getString(DorisConfigOptions.USERNAME);
         String passwd = sinkConfig.getString(DorisConfigOptions.PASSWORD, "");
+        String jdbcUrl = sinkConfig.getString(DorisConfigOptions.JDBC_URL);
 
         DorisSink.Builder<String> builder = DorisSink.builder();
         DorisOptions.Builder dorisBuilder = DorisOptions.builder();
-        
dorisBuilder.setFenodes(fenodes).setBenodes(benodes).setUsername(user).setPassword(passwd);
+        dorisBuilder
+                .setJdbcUrl(jdbcUrl)
+                .setFenodes(fenodes)
+                .setBenodes(benodes)
+                .setUsername(user)
+                .setPassword(passwd);
         sinkConfig
                 .getOptional(DorisConfigOptions.AUTO_REDIRECT)
                 .ifPresent(dorisBuilder::setAutoRedirect);
@@ -284,21 +295,23 @@ public abstract class DatabaseSync {
         DorisExecutionOptions executionOptions = executionBuilder.build();
         builder.setDorisReadOptions(DorisReadOptions.builder().build())
                 .setDorisExecutionOptions(executionOptions)
-                .setSerializer(
-                        JsonDebeziumSchemaSerializer.builder()
-                                .setDorisOptions(dorisBuilder.build())
-                                .setNewSchemaChange(newSchemaChange)
-                                .setExecutionOptions(executionOptions)
-                                .setTableMapping(tableMapping)
-                                .setTableProperties(tableConfig)
-                                .setTargetDatabase(database)
-                                .setTargetTablePrefix(tablePrefix)
-                                .setTargetTableSuffix(tableSuffix)
-                                .build())
+                .setSerializer(buildSchemaSerializer(dorisBuilder, 
executionOptions))
                 .setDorisOptions(dorisBuilder.build());
         return builder.build();
     }
 
+    public DorisRecordSerializer<String> buildSchemaSerializer(
+            DorisOptions.Builder dorisBuilder, DorisExecutionOptions 
executionOptions) {
+        return JsonDebeziumSchemaSerializer.builder()
+                .setDorisOptions(dorisBuilder.build())
+                .setNewSchemaChange(newSchemaChange)
+                .setExecutionOptions(executionOptions)
+                .setTableMapping(tableMapping)
+                .setTableProperties(tableConfig)
+                .setTargetDatabase(database)
+                .build();
+    }
+
     /** Filter table that need to be synchronized. */
     protected boolean isSyncNeeded(String tableName) {
         boolean sync = true;
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java
new file mode 100644
index 0000000..86d6336
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc;
+
+import org.apache.doris.flink.catalog.doris.FieldSchema;
+
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+
+/**
+ * JdbcSourceSchema is a subclass of SourceSchema, used to build metadata 
about jdbc-related
+ * databases.
+ */
+public abstract class JdbcSourceSchema extends SourceSchema {
+
+    public JdbcSourceSchema(
+            DatabaseMetaData metaData,
+            String databaseName,
+            String schemaName,
+            String tableName,
+            String tableComment)
+            throws Exception {
+        super(databaseName, schemaName, tableName, tableComment);
+        fields = new LinkedHashMap<>();
+        try (ResultSet rs = metaData.getColumns(databaseName, schemaName, 
tableName, null)) {
+            while (rs.next()) {
+                String fieldName = rs.getString("COLUMN_NAME");
+                String comment = rs.getString("REMARKS");
+                String fieldType = rs.getString("TYPE_NAME");
+                Integer precision = rs.getInt("COLUMN_SIZE");
+
+                if (rs.wasNull()) {
+                    precision = null;
+                }
+                Integer scale = rs.getInt("DECIMAL_DIGITS");
+                if (rs.wasNull()) {
+                    scale = null;
+                }
+                String dorisTypeStr = convertToDorisType(fieldType, precision, 
scale);
+                fields.put(fieldName, new FieldSchema(fieldName, dorisTypeStr, 
comment));
+            }
+        }
+
+        primaryKeys = new ArrayList<>();
+        try (ResultSet rs = metaData.getPrimaryKeys(databaseName, schemaName, 
tableName)) {
+            while (rs.next()) {
+                String fieldName = rs.getString("COLUMN_NAME");
+                primaryKeys.add(fieldName);
+            }
+        }
+    }
+
+    public abstract String convertToDorisType(String fieldType, Integer 
precision, Integer scale);
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java
index 95b687c..787d0ae 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java
@@ -29,7 +29,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 public class ParsingProcessFunction extends ProcessFunction<String, Void> {
-    private ObjectMapper objectMapper = new ObjectMapper();
+    protected ObjectMapper objectMapper = new ObjectMapper();
     private transient Map<String, OutputTag<String>> recordOutputTags;
     private DatabaseSync.TableNameConverter converter;
 
@@ -46,13 +46,17 @@ public class ParsingProcessFunction extends 
ProcessFunction<String, Void> {
     public void processElement(
             String record, ProcessFunction<String, Void>.Context context, 
Collector<Void> collector)
             throws Exception {
-        JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);
-        String tableName = extractJsonNode(recordRoot.get("source"), "table");
+        String tableName = getRecordTableName(record);
         String dorisName = converter.convert(tableName);
         context.output(getRecordOutputTag(dorisName), record);
     }
 
-    private String extractJsonNode(JsonNode record, String key) {
+    protected String getRecordTableName(String record) throws Exception {
+        JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);
+        return extractJsonNode(recordRoot.get("source"), "table");
+    }
+
+    protected String extractJsonNode(JsonNode record, String key) {
         return record != null && record.get(key) != null ? 
record.get(key).asText() : null;
     }
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
index c525202..e09eb00 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
@@ -23,8 +23,6 @@ import org.apache.doris.flink.catalog.doris.DataModel;
 import org.apache.doris.flink.catalog.doris.FieldSchema;
 import org.apache.doris.flink.catalog.doris.TableSchema;
 
-import java.sql.DatabaseMetaData;
-import java.sql.ResultSet;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedHashMap;
@@ -33,59 +31,23 @@ import java.util.Map;
 import java.util.StringJoiner;
 
 public abstract class SourceSchema {
-    private final String databaseName;
-    private final String schemaName;
-    private final String tableName;
-    private final String tableComment;
-    private final LinkedHashMap<String, FieldSchema> fields;
-    public final List<String> primaryKeys;
+    protected final String databaseName;
+    protected final String schemaName;
+    protected final String tableName;
+    protected final String tableComment;
+    protected LinkedHashMap<String, FieldSchema> fields;
+    public List<String> primaryKeys;
     public DataModel model = DataModel.UNIQUE;
 
     public SourceSchema(
-            DatabaseMetaData metaData,
-            String databaseName,
-            String schemaName,
-            String tableName,
-            String tableComment)
+            String databaseName, String schemaName, String tableName, String 
tableComment)
             throws Exception {
         this.databaseName = databaseName;
         this.schemaName = schemaName;
         this.tableName = tableName;
         this.tableComment = tableComment;
-
-        fields = new LinkedHashMap<>();
-        try (ResultSet rs = metaData.getColumns(databaseName, schemaName, 
tableName, null)) {
-            while (rs.next()) {
-                String fieldName = rs.getString("COLUMN_NAME");
-                String comment = rs.getString("REMARKS");
-                String fieldType = rs.getString("TYPE_NAME");
-                String defaultValue = rs.getString("COLUMN_DEF");
-                Integer precision = rs.getInt("COLUMN_SIZE");
-                if (rs.wasNull()) {
-                    precision = null;
-                }
-
-                Integer scale = rs.getInt("DECIMAL_DIGITS");
-                if (rs.wasNull()) {
-                    scale = null;
-                }
-                String dorisTypeStr = convertToDorisType(fieldType, precision, 
scale);
-                fields.put(
-                        fieldName, new FieldSchema(fieldName, dorisTypeStr, 
defaultValue, comment));
-            }
-        }
-
-        primaryKeys = new ArrayList<>();
-        try (ResultSet rs = metaData.getPrimaryKeys(databaseName, schemaName, 
tableName)) {
-            while (rs.next()) {
-                String fieldName = rs.getString("COLUMN_NAME");
-                primaryKeys.add(fieldName);
-            }
-        }
     }
 
-    public abstract String convertToDorisType(String fieldType, Integer 
precision, Integer scale);
-
     public String getTableIdentifier() {
         return getString(databaseName, schemaName, tableName);
     }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java
index bf0eee7..f84ca94 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java
@@ -17,11 +17,11 @@
 
 package org.apache.doris.flink.tools.cdc.mysql;
 
-import org.apache.doris.flink.tools.cdc.SourceSchema;
+import org.apache.doris.flink.tools.cdc.JdbcSourceSchema;
 
 import java.sql.DatabaseMetaData;
 
-public class MysqlSchema extends SourceSchema {
+public class MysqlSchema extends JdbcSourceSchema {
 
     public MysqlSchema(
             DatabaseMetaData metaData, String databaseName, String tableName, 
String tableComment)
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java
index c085754..f843b6d 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java
@@ -17,11 +17,11 @@
 
 package org.apache.doris.flink.tools.cdc.oracle;
 
-import org.apache.doris.flink.tools.cdc.SourceSchema;
+import org.apache.doris.flink.tools.cdc.JdbcSourceSchema;
 
 import java.sql.DatabaseMetaData;
 
-public class OracleSchema extends SourceSchema {
+public class OracleSchema extends JdbcSourceSchema {
 
     public OracleSchema(
             DatabaseMetaData metaData,
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresSchema.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresSchema.java
index ecb6edf..3208116 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresSchema.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresSchema.java
@@ -17,11 +17,11 @@
 
 package org.apache.doris.flink.tools.cdc.postgres;
 
-import org.apache.doris.flink.tools.cdc.SourceSchema;
+import org.apache.doris.flink.tools.cdc.JdbcSourceSchema;
 
 import java.sql.DatabaseMetaData;
 
-public class PostgresSchema extends SourceSchema {
+public class PostgresSchema extends JdbcSourceSchema {
 
     public PostgresSchema(
             DatabaseMetaData metaData,
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerSchema.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerSchema.java
index ce060ea..6d5ab9a 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerSchema.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerSchema.java
@@ -17,11 +17,11 @@
 
 package org.apache.doris.flink.tools.cdc.sqlserver;
 
-import org.apache.doris.flink.tools.cdc.SourceSchema;
+import org.apache.doris.flink.tools.cdc.JdbcSourceSchema;
 
 import java.sql.DatabaseMetaData;
 
-public class SqlServerSchema extends SourceSchema {
+public class SqlServerSchema extends JdbcSourceSchema {
 
     public SqlServerSchema(
             DatabaseMetaData metaData,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to