This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 0eb3aa7 [Feature](CDC) Add auto create table (#248)
0eb3aa7 is described below
commit 0eb3aa7075befc7478e014bbadd754af17deaf75
Author: wudi <[email protected]>
AuthorDate: Thu Nov 30 17:53:00 2023 +0800
[Feature](CDC) Add auto create table (#248)
---
.../apache/doris/flink/sink/writer/EventType.java} | 25 +---
.../serializer/JsonDebeziumSchemaSerializer.java | 147 ++++++++++++++++-----
.../doris/flink/table/DorisConfigOptions.java | 2 +-
.../org/apache/doris/flink/tools/cdc/CdcTools.java | 1 +
.../apache/doris/flink/tools/cdc/DatabaseSync.java | 27 +++-
.../flink/tools/cdc/mysql/MysqlDatabaseSync.java | 10 +-
.../flink/tools/cdc/oracle/OracleDatabaseSync.java | 10 +-
.../tools/cdc/postgres/PostgresDatabaseSync.java | 10 +-
.../tools/cdc/sqlserver/SqlServerDatabaseSync.java | 10 +-
.../writer/TestJsonDebeziumSchemaSerializer.java | 31 ++++-
.../doris/flink/tools/cdc/DatabaseSyncTest.java | 16 +++
11 files changed, 225 insertions(+), 64 deletions(-)
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/EventType.java
similarity index 51%
copy from
flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
copy to
flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/EventType.java
index daab90b..d26bf27 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/EventType.java
@@ -14,27 +14,10 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.flink.tools.cdc;
-import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
-import org.junit.Test;
-import java.util.Arrays;
+package org.apache.doris.flink.sink.writer;
-/**
- * Unit tests for the {@link DatabaseSync}.
- **/
-public class DatabaseSyncTest {
- @Test
- public void multiToOneRulesParserTest() throws Exception{
- String[][] testCase = {
- {"a_.*|b_.*","a|b"} // Normal condition
-// ,{"a_.*|b_.*","a|b|c"} // Unequal length
-// ,{"",""} // Null value
-// ,{"***....","a"} // Abnormal regular expression
- };
- DatabaseSync databaseSync = new MysqlDatabaseSync();
- Arrays.stream(testCase).forEach(arr->{
- databaseSync.multiToOneRulesParser(arr[0], arr[1]);
- });
- }
+public enum EventType {
+ ALTER,
+ CREATE
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
index 0c4309b..29d9cee 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
@@ -26,12 +26,14 @@ import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.NullNode;
import org.apache.commons.collections.CollectionUtils;
import org.apache.doris.flink.catalog.doris.FieldSchema;
+import org.apache.doris.flink.catalog.doris.TableSchema;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.exception.IllegalArgumentException;
import org.apache.doris.flink.sink.schema.SchemaChangeHelper;
import org.apache.doris.flink.sink.schema.SchemaChangeHelper.DDLSchema;
import org.apache.doris.flink.sink.schema.SchemaChangeManager;
+import org.apache.doris.flink.sink.writer.EventType;
import org.apache.doris.flink.tools.cdc.SourceConnector;
import org.apache.doris.flink.tools.cdc.SourceSchema;
import org.apache.doris.flink.tools.cdc.mysql.MysqlType;
@@ -41,6 +43,7 @@ import
org.apache.doris.flink.tools.cdc.sqlserver.SqlServerType;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,6 +51,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@@ -67,7 +71,6 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
private static final String OP_CREATE = "c"; // insert
private static final String OP_UPDATE = "u"; // update
private static final String OP_DELETE = "d"; // delete
-
public static final String EXECUTE_DDL = "ALTER TABLE %s %s COLUMN %s %s";
// alter table tbl add cloumn aca int
private static final String addDropDDLRegex
=
"ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+(COLUMN\\s+)?([^\\s]+)(\\s+([^\\s]+))?.*";
@@ -214,25 +217,40 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
return false;
}
- // db,table
- Tuple2<String, String> tuple = getDorisTableTuple(recordRoot);
- if(tuple == null){
+ EventType eventType = extractEventType(recordRoot);
+ if(eventType == null){
return false;
}
-
- List<String> ddlSqlList = extractDDLList(recordRoot);
- if (CollectionUtils.isEmpty(ddlSqlList)) {
- LOG.info("ddl can not do schema change:{}", recordRoot);
- return false;
- }
-
- List<DDLSchema> ddlSchemas = SchemaChangeHelper.getDdlSchemas();
- for (int i = 0; i < ddlSqlList.size(); i++) {
- DDLSchema ddlSchema = ddlSchemas.get(i);
- String ddlSql = ddlSqlList.get(i);
- boolean doSchemaChange = checkSchemaChange(tuple.f0, tuple.f1,
ddlSchema);
- status = doSchemaChange && schemaChangeManager.execute(ddlSql,
tuple.f0);
- LOG.info("schema change status:{}, ddl:{}", status, ddlSql);
+ if(eventType.equals(EventType.CREATE)){
+ TableSchema tableSchema = extractCreateTableSchema(recordRoot);
+ status = schemaChangeManager.createTable(tableSchema);
+ if(status){
+ String cdcTbl = getCdcTableIdentifier(recordRoot);
+ String dorisTbl = getCreateTableIdentifier(recordRoot);
+ tableMapping.put(cdcTbl, dorisTbl);
+ LOG.info("create table ddl status: {}", status);
+ }
+ } else if (eventType.equals(EventType.ALTER)){
+ // db,table
+ Tuple2<String, String> tuple = getDorisTableTuple(recordRoot);
+ if(tuple == null){
+ return false;
+ }
+ List<String> ddlSqlList = extractDDLList(recordRoot);
+ if (CollectionUtils.isEmpty(ddlSqlList)) {
+ LOG.info("ddl can not do schema change:{}", recordRoot);
+ return false;
+ }
+ List<DDLSchema> ddlSchemas =
SchemaChangeHelper.getDdlSchemas();
+ for (int i = 0; i < ddlSqlList.size(); i++) {
+ DDLSchema ddlSchema = ddlSchemas.get(i);
+ String ddlSql = ddlSqlList.get(i);
+ boolean doSchemaChange = checkSchemaChange(tuple.f0,
tuple.f1, ddlSchema);
+ status = doSchemaChange &&
schemaChangeManager.execute(ddlSql, tuple.f0);
+ LOG.info("schema change status:{}, ddl:{}", status,
ddlSql);
+ }
+ } else{
+ LOG.info("Unsupported event type {}", eventType);
}
} catch (Exception ex) {
LOG.warn("schema change error :", ex);
@@ -240,18 +258,26 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
return status;
}
+ protected JsonNode extractTableChange(JsonNode record) throws
JsonProcessingException {
+ JsonNode historyRecord = extractHistoryRecord(record);
+ JsonNode tableChanges = historyRecord.get("tableChanges");
+ if(!Objects.isNull(tableChanges)){
+ JsonNode tableChange = tableChanges.get(0);
+ return tableChange;
+ }
+ return null;
+ }
+
+ /**
+ * Parse Alter Event
+ */
@VisibleForTesting
- public List<String> extractDDLList(JsonNode record) throws
JsonProcessingException {
+ public List<String> extractDDLList(JsonNode record) throws IOException{
String dorisTable = getDorisTableIdentifier(record);
JsonNode historyRecord = extractHistoryRecord(record);
- JsonNode tableChanges = historyRecord.get("tableChanges");
String ddl = extractJsonNode(historyRecord, "ddl");
- if (Objects.isNull(tableChanges) || Objects.isNull(ddl)) {
- return new ArrayList<>();
- }
- LOG.debug("received debezium ddl :{}", ddl);
- JsonNode tableChange = tableChanges.get(0);
- if (Objects.isNull(tableChange) ||
!tableChange.get("type").asText().equals("ALTER")) {
+ JsonNode tableChange = extractTableChange(record);
+ if (Objects.isNull(tableChange) || Objects.isNull(ddl)) {
return null;
}
@@ -285,6 +311,47 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
return SchemaChangeHelper.generateDDLSql(dorisTable);
}
+ @VisibleForTesting
+ public TableSchema extractCreateTableSchema(JsonNode record) throws
JsonProcessingException {
+ String dorisTable = getCreateTableIdentifier(record);
+ JsonNode tableChange = extractTableChange(record);
+ JsonNode pkColumns =
tableChange.get("table").get("primaryKeyColumnNames");
+ JsonNode columns = tableChange.get("table").get("columns");
+ String tblComment = tableChange.get("table").get("comment").asText();
+ Map<String, FieldSchema> field = new LinkedHashMap<>();
+ for (JsonNode column : columns) {
+ buildFieldSchema(field, column);
+ }
+ List<String> pkList = new ArrayList<>();
+ for(JsonNode column : pkColumns){
+ String fieldName = column.asText();
+ pkList.add(fieldName);
+ }
+
+ TableSchema tableSchema = new TableSchema();
+ tableSchema.setFields(field);
+ tableSchema.setKeys(pkList);
+ tableSchema.setDistributeKeys(buildDistributeKeys(pkList, field));
+ tableSchema.setTableComment(tblComment);
+
+ String[] split = dorisTable.split("\\.");
+ Preconditions.checkArgument(split.length == 2);
+ tableSchema.setDatabase(split[0]);
+ tableSchema.setTable(split[1]);
+ return tableSchema;
+ }
+
+ private List<String> buildDistributeKeys(List<String> primaryKeys,
Map<String, FieldSchema> fields) {
+ if (!CollectionUtil.isNullOrEmpty(primaryKeys)) {
+ return primaryKeys;
+ }
+ if(!fields.isEmpty()){
+ Map.Entry<String, FieldSchema> firstField =
fields.entrySet().iterator().next();
+ return Collections.singletonList(firstField.getKey());
+ }
+ return new ArrayList<>();
+ }
+
@VisibleForTesting
public void setOriginFieldSchemaMap(Map<String, FieldSchema>
originFieldSchemaMap) {
this.originFieldSchemaMap = originFieldSchemaMap;
@@ -334,6 +401,12 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
return SourceSchema.getString(db, schema, table);
}
+ public String getCreateTableIdentifier(JsonNode record){
+ String db = extractJsonNode(record.get("source"), "db");
+ String table = extractJsonNode(record.get("source"), "table");
+ return db + "." + table;
+ }
+
public String getDorisTableIdentifier(String cdcTableIdentifier){
if(!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())){
return dorisOptions.getTableIdentifier();
@@ -405,6 +478,23 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
return extractJsonNode(record.get("source"), "table");
}
+ /**
+ * Parse event type
+ */
+ protected EventType extractEventType(JsonNode record) throws
JsonProcessingException {
+ JsonNode tableChange = extractTableChange(record);
+ if(tableChange == null || tableChange.get("type") == null){
+ return null;
+ }
+ String type = tableChange.get("type").asText();
+ if(EventType.ALTER.toString().equalsIgnoreCase(type)){
+ return EventType.ALTER;
+ }else if(EventType.CREATE.toString().equalsIgnoreCase(type)){
+ return EventType.CREATE;
+ }
+ return null;
+ }
+
private String extractJsonNode(JsonNode record, String key) {
return record != null && record.get(key) != null &&
!(record.get(key) instanceof NullNode) ?
record.get(key).asText() : null;
@@ -425,7 +515,7 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
}
private JsonNode extractHistoryRecord(JsonNode record) throws
JsonProcessingException {
- if (record.has("historyRecord")) {
+ if (record != null && record.has("historyRecord")) {
return objectMapper.readTree(record.get("historyRecord").asText());
}
// The ddl passed by some scenes will not be included in the
historyRecord, such as DebeziumSourceFunction
@@ -452,8 +542,6 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
return null;
}
-
-
@VisibleForTesting
public void fillOriginSchema(JsonNode columns) {
if (Objects.nonNull(originFieldSchemaMap)) {
@@ -623,5 +711,4 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
return type;
}
-
}
\ No newline at end of file
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index af01ea5..546dc6e 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -89,7 +89,7 @@ public class DorisConfigOptions {
.defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT)
.withDescription("");
public static final ConfigOption<Integer> DORIS_DESERIALIZE_QUEUE_SIZE =
ConfigOptions
- .key("doris.request.retriesdoris.deserialize.queue.size")
+ .key("doris.deserialize.queue.size")
.intType()
.defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT)
.withDescription("");
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
index d05fa15..fe35ee3 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
@@ -129,6 +129,7 @@ public class CdcTools {
.setTableConfig(tableMap)
.setCreateTableOnly(createTableOnly)
.setNewSchemaChange(useNewSchemaChange)
+ .setSingleSink(singleSink)
.create();
databaseSync.build();
if(StringUtils.isNullOrWhitespaceOnly(jobName)){
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index bf26214..02ab034 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -45,6 +45,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
public abstract class DatabaseSync {
private static final Logger LOG =
LoggerFactory.getLogger(DatabaseSync.class);
@@ -83,6 +84,11 @@ public abstract class DatabaseSync {
public abstract DataStreamSource<String>
buildCdcSource(StreamExecutionEnvironment env);
+ /**
+ * Get the prefix of a specific tableList, for example, mysql is database,
oracle is schema
+ */
+ public abstract String getTableListPrefix();
+
public DatabaseSync() throws SQLException {
registerDriver();
}
@@ -132,8 +138,7 @@ public abstract class DatabaseSync {
System.out.println("Create table finished.");
System.exit(0);
}
-
- config.setString(TABLE_NAME_OPTIONS, "(" + String.join("|",
syncTables) + ")");
+ config.setString(TABLE_NAME_OPTIONS, getSyncTableList(syncTables));
DataStreamSource<String> streamSource = buildCdcSource(env);
if(singleSink){
streamSource.sinkTo(buildDorisSink());
@@ -256,6 +261,24 @@ public abstract class DatabaseSync {
LOG.debug("table {} is synchronized? {}", tableName, sync);
return sync;
}
+
+ protected String getSyncTableList(List<String> syncTables){
+ if(!singleSink){
+ return syncTables.stream()
+ .map(v-> getTableListPrefix() + "\\." + v)
+ .collect(Collectors.joining("|"));
+ }else{
+ // includingTablePattern and ^excludingPattern
+ String includingPattern = String.format("(%s)\\.(%s)",
getTableListPrefix(), includingTables);
+ if (StringUtils.isNullOrWhitespaceOnly(excludingTables)) {
+ return includingPattern;
+ }else{
+ String excludingPattern = String.format("?!(%s\\.(%s))$",
getTableListPrefix(), excludingTables);
+ return String.format("(%s)(%s)", includingPattern,
excludingPattern);
+ }
+ }
+ }
+
/**
* Filter table that many tables merge to one
*/
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
index 22e49aa..a3e01d3 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
@@ -122,7 +122,9 @@ public class MysqlDatabaseSync extends DatabaseSync {
.username(config.get(MySqlSourceOptions.USERNAME))
.password(config.get(MySqlSourceOptions.PASSWORD))
.databaseList(databaseName)
- .tableList(databaseName + "." + tableName);
+ .tableList(tableName)
+ //default open add newly table
+ .scanNewlyAddedTableEnabled(true);
config.getOptional(MySqlSourceOptions.SERVER_ID).ifPresent(sourceBuilder::serverId);
config
@@ -215,6 +217,12 @@ public class MysqlDatabaseSync extends DatabaseSync {
mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
}
+ @Override
+ public String getTableListPrefix() {
+ String databaseName = config.get(MySqlSourceOptions.DATABASE_NAME);
+ return databaseName;
+ }
+
/**
* set chunkkeyColumn,eg: db.table1:column1,db.table2:column2
* @param sourceBuilder
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
index 6e27eb6..92c1f95 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
@@ -174,7 +174,7 @@ public class OracleDatabaseSync extends DatabaseSync {
.port(port)
.databaseList(databaseName)
.schemaList(schemaName)
- .tableList(schemaName + "." + tableName)
+ .tableList(tableName)
.username(username)
.password(password)
.includeSchemaChanges(true)
@@ -199,7 +199,7 @@ public class OracleDatabaseSync extends DatabaseSync {
.password(password)
.database(databaseName)
.schemaList(schemaName)
- .tableList(schemaName + "." + tableName)
+ .tableList(tableName)
.debeziumProperties(debeziumProperties)
.startupOptions(startupOptions)
.deserializer(schema)
@@ -207,4 +207,10 @@ public class OracleDatabaseSync extends DatabaseSync {
return env.addSource(oracleSource, "Oracle Source");
}
}
+
+ @Override
+ public String getTableListPrefix() {
+ String schemaName = config.get(OracleSourceOptions.SCHEMA_NAME);
+ return schemaName;
+ }
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
index b8c9ad1..04b4127 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
@@ -159,7 +159,7 @@ public class PostgresDatabaseSync extends DatabaseSync {
.port(port)
.database(databaseName)
.schemaList(schemaName)
- .tableList(schemaName + "." + tableName)
+ .tableList(tableName)
.username(username)
.password(password)
.deserializer(schema)
@@ -185,7 +185,7 @@ public class PostgresDatabaseSync extends DatabaseSync {
.port(port)
.database(databaseName)
.schemaList(schemaName)
- .tableList(schemaName + "." + tableName)
+ .tableList(tableName)
.username(username)
.password(password)
.debeziumProperties(debeziumProperties)
@@ -196,4 +196,10 @@ public class PostgresDatabaseSync extends DatabaseSync {
return env.addSource(postgresSource, "Postgres Source");
}
}
+
+ @Override
+ public String getTableListPrefix() {
+ String schemaName = config.get(PostgresSourceOptions.SCHEMA_NAME);
+ return schemaName;
+ }
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
index fb25212..10db3c1 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
@@ -153,7 +153,7 @@ public class SqlServerDatabaseSync extends DatabaseSync {
.hostname(hostname)
.port(port)
.databaseList(databaseName)
- .tableList(schemaName + "." + tableName)
+ .tableList(tableName)
.username(username)
.password(password)
.startupOptions(startupOptions)
@@ -175,7 +175,7 @@ public class SqlServerDatabaseSync extends DatabaseSync {
.hostname(hostname)
.port(port)
.database(databaseName)
- .tableList(schemaName + "." + tableName)
+ .tableList(tableName)
.username(username)
.password(password)
.debeziumProperties(debeziumProperties)
@@ -185,4 +185,10 @@ public class SqlServerDatabaseSync extends DatabaseSync {
return env.addSource(sqlServerSource, "SqlServer Source");
}
}
+
+ @Override
+ public String getTableListPrefix() {
+ String schemaName = config.get(JdbcSourceOptions.SCHEMA_NAME);
+ return schemaName;
+ }
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
index de549ef..32aedab 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
@@ -20,8 +20,9 @@ package org.apache.doris.flink.sink.writer;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
-
+import org.apache.commons.collections.CollectionUtils;
import org.apache.doris.flink.catalog.doris.FieldSchema;
+import org.apache.doris.flink.catalog.doris.TableSchema;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
@@ -29,9 +30,8 @@ import org.apache.doris.flink.exception.DorisException;
import org.apache.doris.flink.rest.RestService;
import org.apache.doris.flink.rest.models.Field;
import org.apache.doris.flink.rest.models.Schema;
-
-import org.apache.commons.collections.CollectionUtils;
import
org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerializer;
+import org.apache.doris.flink.tools.cdc.SourceConnector;
import org.apache.flink.shaded.guava30.com.google.common.collect.Maps;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -198,8 +198,10 @@ public class TestJsonDebeziumSchemaSerializer {
String record
=
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"oracle\",\"name\":\"oracle_logminer\",\"ts_ms\":1696945825065,\"snapshot\":\"true\",\"db\":\"HELOWIN\",\"sequence\":null,\"schema\":\"ADMIN\",\"table\":\"PERSONS\",\"txId\":null,\"scn\":\"1199617\",\"commit_scn\":null,\"lcr_position\":null,\"rs_id\":null,\"ssn\":0,\"redo_thread\":null},\"databaseName\":\"HELOWIN\",\"schemaName\":\"ADMIN\",\"ddl\":\"\\n
CREATE TABLE \\\"ADMIN\\\".\\\"PERSONS\\\" \\n (\\t\\\"ID\ [...]
JsonNode recordRoot = objectMapper.readTree(record);
+ serializer.setSourceConnector("oracle");
List<String> ddlSQLList = serializer.extractDDLList(recordRoot);
Assert.assertTrue(CollectionUtils.isEmpty(ddlSQLList));
+ serializer.setSourceConnector("mysql");
}
@Test
@@ -419,4 +421,27 @@ public class TestJsonDebeziumSchemaSerializer {
dorisOptions.setTableIdentifier(tmp);
}
+ @Test
+ @Ignore
+ public void testAutoCreateTable() throws Exception {
+ String record
+ = "{ \"source\":{ \"version\":\"1.9.7.Final\",
\"connector\":\"oracle\", \"name\":\"oracle_logminer\",
\"ts_ms\":1696945825065, \"snapshot\":\"true\",
\"db\":\"TESTDB\", \"sequence\":null, \"schema\":\"ADMIN\",
\"table\":\"PERSONS\", \"txId\":null, \"scn\":\"1199617\",
\"commit_scn\":null, \"lcr_position\":null, \"rs_id\":null,
\"ssn\":0, \"redo_thread\":null [...]
+ JsonNode recordRoot = objectMapper.readTree(record);
+ dorisOptions = DorisOptions.builder().setFenodes("127.0.0.1:8030")
+ .setTableIdentifier("")
+ .setUsername("root")
+ .setPassword("").build();
+ serializer =
JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build();
+ serializer.setSourceConnector(SourceConnector.ORACLE.connectorName);
+ TableSchema tableSchema =
serializer.extractCreateTableSchema(recordRoot);
+ Assert.assertEquals("TESTDB", tableSchema.getDatabase());
+ Assert.assertEquals("PERSONS", tableSchema.getTable());
+ Assert.assertArrayEquals(new String[]{"ID"},
tableSchema.getKeys().toArray());
+ Assert.assertEquals(3, tableSchema.getFields().size());
+ Assert.assertEquals("ID", tableSchema.getFields().get("ID").getName());
+ Assert.assertEquals("NAME4",
tableSchema.getFields().get("NAME4").getName());
+ Assert.assertEquals("age4",
tableSchema.getFields().get("age4").getName());
+ serializer.setSourceConnector(SourceConnector.MYSQL.connectorName);
+ }
+
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
index daab90b..fb2c8d6 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
@@ -17,7 +17,10 @@
package org.apache.doris.flink.tools.cdc;
import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
+import org.apache.flink.configuration.Configuration;
+import org.junit.Assert;
import org.junit.Test;
+
import java.util.Arrays;
/**
@@ -37,4 +40,17 @@ public class DatabaseSyncTest {
databaseSync.multiToOneRulesParser(arr[0], arr[1]);
});
}
+
+ @Test
+ public void getSyncTableListTest() throws Exception{
+ DatabaseSync databaseSync = new MysqlDatabaseSync();
+ databaseSync.setSingleSink(false);
+ databaseSync.setIncludingTables("tbl_1|tbl_2");
+ Configuration config = new Configuration();
+ config.setString("database-name", "db");
+ config.setString("table-name", "tbl.*");
+ databaseSync.setConfig(config);
+ String syncTableList =
databaseSync.getSyncTableList(Arrays.asList("tbl_1", "tbl_2"));
+ Assert.assertEquals("db\\.tbl_1|db\\.tbl_2", syncTableList);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]