This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 64fe57a [improve] adjust the code framework related to CDC. (#319)
64fe57a is described below
commit 64fe57a22e867e8f7e56de8832a4420cd4147802
Author: bingquanzhao <[email protected]>
AuthorDate: Thu Feb 29 14:52:41 2024 +0800
[improve] adjust the code framework related to CDC. (#319)
---
.../serializer/jsondebezium/CdcDataChange.java | 40 ++++++++++++
.../serializer/jsondebezium/CdcSchemaChange.java} | 28 +++++----
.../jsondebezium/JsonDebeziumDataChange.java | 9 +--
.../jsondebezium/JsonDebeziumSchemaChange.java | 6 +-
.../apache/doris/flink/tools/cdc/DatabaseSync.java | 39 ++++++++----
.../doris/flink/tools/cdc/JdbcSourceSchema.java | 71 ++++++++++++++++++++++
.../flink/tools/cdc/ParsingProcessFunction.java | 12 ++--
.../apache/doris/flink/tools/cdc/SourceSchema.java | 52 +++-------------
.../doris/flink/tools/cdc/mysql/MysqlSchema.java | 4 +-
.../doris/flink/tools/cdc/oracle/OracleSchema.java | 4 +-
.../flink/tools/cdc/postgres/PostgresSchema.java | 4 +-
.../flink/tools/cdc/sqlserver/SqlServerSchema.java | 4 +-
12 files changed, 185 insertions(+), 88 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcDataChange.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcDataChange.java
new file mode 100644
index 0000000..c344aae
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcDataChange.java
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.doris.flink.sink.writer.ChangeEvent;
+import org.apache.doris.flink.sink.writer.serializer.DorisRecord;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * When cdc connector captures data changes from the source database you need
to inherit this class
+ * to complete the synchronized data changes to Doris schema. Supports data
messages serialized to
+ * json
+ */
+public abstract class CdcDataChange implements ChangeEvent {
+
+ protected abstract DorisRecord serialize(String record, JsonNode
recordRoot, String op)
+ throws IOException;
+
+ protected abstract Map<String, Object> extractBeforeRow(JsonNode record);
+
+ protected abstract Map<String, Object> extractAfterRow(JsonNode record);
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcSchemaChange.java
similarity index 51%
copy from
flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java
copy to
flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcSchemaChange.java
index bf0eee7..858a5ef 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcSchemaChange.java
@@ -15,21 +15,25 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.flink.tools.cdc.mysql;
+package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
-import org.apache.doris.flink.tools.cdc.SourceSchema;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.doris.flink.sink.writer.ChangeEvent;
-import java.sql.DatabaseMetaData;
+import java.io.IOException;
-public class MysqlSchema extends SourceSchema {
+/**
+ * When cdc connector captures data changes about source database schema
changes, you need to
+ * inherit this class to complete the synchronized changes to Doris schema.
Supports data messages
+ * serialized to json
+ */
+public abstract class CdcSchemaChange implements ChangeEvent {
- public MysqlSchema(
- DatabaseMetaData metaData, String databaseName, String tableName,
String tableComment)
- throws Exception {
- super(metaData, databaseName, null, tableName, tableComment);
- }
+ protected abstract String extractDatabase(JsonNode record);
- public String convertToDorisType(String fieldType, Integer precision,
Integer scale) {
- return MysqlType.toDorisType(fieldType, precision, scale);
- }
+ protected abstract String extractTable(JsonNode record);
+
+ public abstract boolean schemaChange(JsonNode recordRoot) throws
IOException;
+
+ protected abstract String getCdcTableIdentifier(JsonNode record);
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java
index 67aef02..5075adf 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.doris.flink.cfg.DorisOptions;
-import org.apache.doris.flink.sink.writer.ChangeEvent;
import org.apache.doris.flink.sink.writer.serializer.DorisRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,7 +40,7 @@ import static
org.apache.doris.flink.sink.util.DeleteOperation.addDeleteSign;
* into doris through stream load.<br>
* Supported data changes include: read, insert, update, delete.
*/
-public class JsonDebeziumDataChange implements ChangeEvent {
+public class JsonDebeziumDataChange extends CdcDataChange {
private static final Logger LOG =
LoggerFactory.getLogger(JsonDebeziumDataChange.class);
private static final String OP_READ = "r"; // snapshot read
@@ -122,11 +121,13 @@ public class JsonDebeziumDataChange implements
ChangeEvent {
return updateRow.toString().getBytes(StandardCharsets.UTF_8);
}
- private Map<String, Object> extractBeforeRow(JsonNode record) {
+ @Override
+ protected Map<String, Object> extractBeforeRow(JsonNode record) {
return extractRow(record.get("before"));
}
- private Map<String, Object> extractAfterRow(JsonNode record) {
+ @Override
+ protected Map<String, Object> extractAfterRow(JsonNode record) {
return extractRow(record.get("after"));
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
index f449857..ccb2046 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
@@ -27,7 +27,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.NullNode;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.sink.schema.SchemaChangeManager;
-import org.apache.doris.flink.sink.writer.ChangeEvent;
import org.apache.doris.flink.tools.cdc.SourceSchema;
import java.util.Map;
@@ -43,7 +42,7 @@ import java.util.regex.Pattern;
* comment synchronization, supports multi-column changes, and supports column
name rename. Need to
* be enabled by configuring use-new-schema-change.
*/
-public abstract class JsonDebeziumSchemaChange implements ChangeEvent {
+public abstract class JsonDebeziumSchemaChange extends CdcSchemaChange {
protected static String addDropDDLRegex =
"ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+(COLUMN\\s+)?([^\\s]+)(\\s+([^\\s]+))?.*";
protected Pattern addDropDDLPattern;
@@ -69,6 +68,7 @@ public abstract class JsonDebeziumSchemaChange implements
ChangeEvent {
return sourceTableName.equals(dbTbl);
}
+ @Override
protected String extractDatabase(JsonNode record) {
if (record.get("source").has("schema")) {
// compatible with schema
@@ -78,6 +78,7 @@ public abstract class JsonDebeziumSchemaChange implements
ChangeEvent {
}
}
+ @Override
protected String extractTable(JsonNode record) {
return extractJsonNode(record.get("source"), "table");
}
@@ -102,6 +103,7 @@ public abstract class JsonDebeziumSchemaChange implements
ChangeEvent {
}
@VisibleForTesting
+ @Override
public String getCdcTableIdentifier(JsonNode record) {
String db = extractJsonNode(record.get("source"), "db");
String schema = extractJsonNode(record.get("source"), "schema");
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 2aa09b6..8627a47 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -35,6 +35,7 @@ import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.writer.WriteMode;
+import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer;
import
org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerializer;
import org.apache.doris.flink.table.DorisConfigOptions;
import org.slf4j.Logger;
@@ -159,7 +160,7 @@ public abstract class DatabaseSync {
streamSource.sinkTo(buildDorisSink());
} else {
SingleOutputStreamOperator<Void> parsedStream =
- streamSource.process(new
ParsingProcessFunction(converter));
+ streamSource.process(buildProcessFunction());
for (String table : dorisTables) {
OutputTag<String> recordOutputTag =
ParsingProcessFunction.createRecordOutputTag(table);
@@ -200,16 +201,26 @@ public abstract class DatabaseSync {
return buildDorisSink(null);
}
+ public ParsingProcessFunction buildProcessFunction() {
+ return new ParsingProcessFunction(converter);
+ }
+
/** create doris sink. */
public DorisSink<String> buildDorisSink(String table) {
String fenodes = sinkConfig.getString(DorisConfigOptions.FENODES);
String benodes = sinkConfig.getString(DorisConfigOptions.BENODES);
String user = sinkConfig.getString(DorisConfigOptions.USERNAME);
String passwd = sinkConfig.getString(DorisConfigOptions.PASSWORD, "");
+ String jdbcUrl = sinkConfig.getString(DorisConfigOptions.JDBC_URL);
DorisSink.Builder<String> builder = DorisSink.builder();
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
-
dorisBuilder.setFenodes(fenodes).setBenodes(benodes).setUsername(user).setPassword(passwd);
+ dorisBuilder
+ .setJdbcUrl(jdbcUrl)
+ .setFenodes(fenodes)
+ .setBenodes(benodes)
+ .setUsername(user)
+ .setPassword(passwd);
sinkConfig
.getOptional(DorisConfigOptions.AUTO_REDIRECT)
.ifPresent(dorisBuilder::setAutoRedirect);
@@ -284,21 +295,23 @@ public abstract class DatabaseSync {
DorisExecutionOptions executionOptions = executionBuilder.build();
builder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisExecutionOptions(executionOptions)
- .setSerializer(
- JsonDebeziumSchemaSerializer.builder()
- .setDorisOptions(dorisBuilder.build())
- .setNewSchemaChange(newSchemaChange)
- .setExecutionOptions(executionOptions)
- .setTableMapping(tableMapping)
- .setTableProperties(tableConfig)
- .setTargetDatabase(database)
- .setTargetTablePrefix(tablePrefix)
- .setTargetTableSuffix(tableSuffix)
- .build())
+ .setSerializer(buildSchemaSerializer(dorisBuilder,
executionOptions))
.setDorisOptions(dorisBuilder.build());
return builder.build();
}
+ public DorisRecordSerializer<String> buildSchemaSerializer(
+ DorisOptions.Builder dorisBuilder, DorisExecutionOptions
executionOptions) {
+ return JsonDebeziumSchemaSerializer.builder()
+ .setDorisOptions(dorisBuilder.build())
+ .setNewSchemaChange(newSchemaChange)
+ .setExecutionOptions(executionOptions)
+ .setTableMapping(tableMapping)
+ .setTableProperties(tableConfig)
+ .setTargetDatabase(database)
+ .build();
+ }
+
/** Filter table that need to be synchronized. */
protected boolean isSyncNeeded(String tableName) {
boolean sync = true;
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java
new file mode 100644
index 0000000..86d6336
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc;
+
+import org.apache.doris.flink.catalog.doris.FieldSchema;
+
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+
+/**
+ * JdbcSourceSchema is a subclass of SourceSchema, used to build metadata
about jdbc-related
+ * databases.
+ */
+public abstract class JdbcSourceSchema extends SourceSchema {
+
+ public JdbcSourceSchema(
+ DatabaseMetaData metaData,
+ String databaseName,
+ String schemaName,
+ String tableName,
+ String tableComment)
+ throws Exception {
+ super(databaseName, schemaName, tableName, tableComment);
+ fields = new LinkedHashMap<>();
+ try (ResultSet rs = metaData.getColumns(databaseName, schemaName,
tableName, null)) {
+ while (rs.next()) {
+ String fieldName = rs.getString("COLUMN_NAME");
+ String comment = rs.getString("REMARKS");
+ String fieldType = rs.getString("TYPE_NAME");
+ Integer precision = rs.getInt("COLUMN_SIZE");
+
+ if (rs.wasNull()) {
+ precision = null;
+ }
+ Integer scale = rs.getInt("DECIMAL_DIGITS");
+ if (rs.wasNull()) {
+ scale = null;
+ }
+ String dorisTypeStr = convertToDorisType(fieldType, precision,
scale);
+ fields.put(fieldName, new FieldSchema(fieldName, dorisTypeStr,
comment));
+ }
+ }
+
+ primaryKeys = new ArrayList<>();
+ try (ResultSet rs = metaData.getPrimaryKeys(databaseName, schemaName,
tableName)) {
+ while (rs.next()) {
+ String fieldName = rs.getString("COLUMN_NAME");
+ primaryKeys.add(fieldName);
+ }
+ }
+ }
+
+ public abstract String convertToDorisType(String fieldType, Integer
precision, Integer scale);
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java
index 95b687c..787d0ae 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java
@@ -29,7 +29,7 @@ import java.util.HashMap;
import java.util.Map;
public class ParsingProcessFunction extends ProcessFunction<String, Void> {
- private ObjectMapper objectMapper = new ObjectMapper();
+ protected ObjectMapper objectMapper = new ObjectMapper();
private transient Map<String, OutputTag<String>> recordOutputTags;
private DatabaseSync.TableNameConverter converter;
@@ -46,13 +46,17 @@ public class ParsingProcessFunction extends
ProcessFunction<String, Void> {
public void processElement(
String record, ProcessFunction<String, Void>.Context context,
Collector<Void> collector)
throws Exception {
- JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);
- String tableName = extractJsonNode(recordRoot.get("source"), "table");
+ String tableName = getRecordTableName(record);
String dorisName = converter.convert(tableName);
context.output(getRecordOutputTag(dorisName), record);
}
- private String extractJsonNode(JsonNode record, String key) {
+ protected String getRecordTableName(String record) throws Exception {
+ JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);
+ return extractJsonNode(recordRoot.get("source"), "table");
+ }
+
+ protected String extractJsonNode(JsonNode record, String key) {
return record != null && record.get(key) != null ?
record.get(key).asText() : null;
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
index c525202..e09eb00 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
@@ -23,8 +23,6 @@ import org.apache.doris.flink.catalog.doris.DataModel;
import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.catalog.doris.TableSchema;
-import java.sql.DatabaseMetaData;
-import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
@@ -33,59 +31,23 @@ import java.util.Map;
import java.util.StringJoiner;
public abstract class SourceSchema {
- private final String databaseName;
- private final String schemaName;
- private final String tableName;
- private final String tableComment;
- private final LinkedHashMap<String, FieldSchema> fields;
- public final List<String> primaryKeys;
+ protected final String databaseName;
+ protected final String schemaName;
+ protected final String tableName;
+ protected final String tableComment;
+ protected LinkedHashMap<String, FieldSchema> fields;
+ public List<String> primaryKeys;
public DataModel model = DataModel.UNIQUE;
public SourceSchema(
- DatabaseMetaData metaData,
- String databaseName,
- String schemaName,
- String tableName,
- String tableComment)
+ String databaseName, String schemaName, String tableName, String
tableComment)
throws Exception {
this.databaseName = databaseName;
this.schemaName = schemaName;
this.tableName = tableName;
this.tableComment = tableComment;
-
- fields = new LinkedHashMap<>();
- try (ResultSet rs = metaData.getColumns(databaseName, schemaName,
tableName, null)) {
- while (rs.next()) {
- String fieldName = rs.getString("COLUMN_NAME");
- String comment = rs.getString("REMARKS");
- String fieldType = rs.getString("TYPE_NAME");
- String defaultValue = rs.getString("COLUMN_DEF");
- Integer precision = rs.getInt("COLUMN_SIZE");
- if (rs.wasNull()) {
- precision = null;
- }
-
- Integer scale = rs.getInt("DECIMAL_DIGITS");
- if (rs.wasNull()) {
- scale = null;
- }
- String dorisTypeStr = convertToDorisType(fieldType, precision,
scale);
- fields.put(
- fieldName, new FieldSchema(fieldName, dorisTypeStr,
defaultValue, comment));
- }
- }
-
- primaryKeys = new ArrayList<>();
- try (ResultSet rs = metaData.getPrimaryKeys(databaseName, schemaName,
tableName)) {
- while (rs.next()) {
- String fieldName = rs.getString("COLUMN_NAME");
- primaryKeys.add(fieldName);
- }
- }
}
- public abstract String convertToDorisType(String fieldType, Integer
precision, Integer scale);
-
public String getTableIdentifier() {
return getString(databaseName, schemaName, tableName);
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java
index bf0eee7..f84ca94 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlSchema.java
@@ -17,11 +17,11 @@
package org.apache.doris.flink.tools.cdc.mysql;
-import org.apache.doris.flink.tools.cdc.SourceSchema;
+import org.apache.doris.flink.tools.cdc.JdbcSourceSchema;
import java.sql.DatabaseMetaData;
-public class MysqlSchema extends SourceSchema {
+public class MysqlSchema extends JdbcSourceSchema {
public MysqlSchema(
DatabaseMetaData metaData, String databaseName, String tableName,
String tableComment)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java
index c085754..f843b6d 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleSchema.java
@@ -17,11 +17,11 @@
package org.apache.doris.flink.tools.cdc.oracle;
-import org.apache.doris.flink.tools.cdc.SourceSchema;
+import org.apache.doris.flink.tools.cdc.JdbcSourceSchema;
import java.sql.DatabaseMetaData;
-public class OracleSchema extends SourceSchema {
+public class OracleSchema extends JdbcSourceSchema {
public OracleSchema(
DatabaseMetaData metaData,
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresSchema.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresSchema.java
index ecb6edf..3208116 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresSchema.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresSchema.java
@@ -17,11 +17,11 @@
package org.apache.doris.flink.tools.cdc.postgres;
-import org.apache.doris.flink.tools.cdc.SourceSchema;
+import org.apache.doris.flink.tools.cdc.JdbcSourceSchema;
import java.sql.DatabaseMetaData;
-public class PostgresSchema extends SourceSchema {
+public class PostgresSchema extends JdbcSourceSchema {
public PostgresSchema(
DatabaseMetaData metaData,
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerSchema.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerSchema.java
index ce060ea..6d5ab9a 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerSchema.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerSchema.java
@@ -17,11 +17,11 @@
package org.apache.doris.flink.tools.cdc.sqlserver;
-import org.apache.doris.flink.tools.cdc.SourceSchema;
+import org.apache.doris.flink.tools.cdc.JdbcSourceSchema;
import java.sql.DatabaseMetaData;
-public class SqlServerSchema extends SourceSchema {
+public class SqlServerSchema extends JdbcSourceSchema {
public SqlServerSchema(
DatabaseMetaData metaData,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]