This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 130c0a0b [improve]Improve the hardcode of tableConfig (#449)
130c0a0b is described below
commit 130c0a0b0f3d4dfffe7df5fd9a93a1fb740e22c4
Author: wudongliang <[email protected]>
AuthorDate: Tue Jul 30 10:09:08 2024 +0800
[improve]Improve the hardcode of tableConfig (#449)
---
.../apache/doris/flink/catalog/DorisCatalog.java | 3 +-
.../flink/catalog/doris/DorisSchemaFactory.java | 38 +++-------
.../doris/flink/catalog/doris/DorisSystem.java | 9 +--
.../flink/sink/schema/SQLParserSchemaManager.java | 5 +-
.../serializer/JsonDebeziumSchemaSerializer.java | 21 ++++--
.../jsondebezium/JsonDebeziumChangeContext.java | 18 +++--
.../jsondebezium/JsonDebeziumSchemaChange.java | 2 +
.../JsonDebeziumSchemaChangeImplV2.java | 5 +-
.../jsondebezium/SQLParserSchemaChange.java | 3 +-
.../org/apache/doris/flink/tools/cdc/CdcTools.java | 5 +-
.../apache/doris/flink/tools/cdc/DatabaseSync.java | 19 ++---
.../doris/flink/tools/cdc/DorisTableConfig.java | 84 ++++++++++++++++++++++
.../tools/cdc/mongodb/MongoDBDatabaseSync.java | 2 +-
.../MongoDBJsonDebeziumSchemaSerializer.java | 22 ++++--
.../catalog/doris/DorisSchemaFactoryTest.java | 29 +++-----
.../doris/flink/catalog/doris/DorisSystemTest.java | 4 +-
.../sink/schema/SQLParserSchemaManagerTest.java | 19 ++---
.../TestJsonDebeziumSchemaChangeImplV2.java | 3 +-
.../jsondebezium/TestSQLParserSchemaChange.java | 3 +-
.../flink/tools/cdc/DorisTableConfigTest.java | 46 ++++++++++++
20 files changed, 232 insertions(+), 108 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
index 79d6b2d7..ca7dc114 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
@@ -56,6 +56,7 @@ import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.catalog.doris.TableSchema;
import org.apache.doris.flink.cfg.DorisConnectionOptions;
import org.apache.doris.flink.table.DorisDynamicTableFactory;
+import org.apache.doris.flink.tools.cdc.DorisTableConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -359,7 +360,7 @@ public class DorisCatalog extends AbstractCatalog {
tablePath.getObjectName(),
getCreateDorisColumns(table.getSchema()),
primaryKeys,
- getCreateTableProps(options),
+ new DorisTableConfig(getCreateTableProps(options)),
table.getComment());
dorisSystem.createTable(schema);
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactory.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactory.java
index b621cfdc..3f824a1f 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactory.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactory.java
@@ -21,13 +21,14 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
+import org.apache.doris.flink.tools.cdc.DorisTableConfig;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Objects;
import java.util.regex.Pattern;
/**
@@ -37,14 +38,13 @@ import java.util.regex.Pattern;
* factory
*/
public class DorisSchemaFactory {
- private static Map<String, Integer> tableBucketMap;
public static TableSchema createTableSchema(
String database,
String table,
Map<String, FieldSchema> columnFields,
List<String> pkKeys,
- Map<String, String> tableProperties,
+ DorisTableConfig dorisTableConfig,
String tableComment) {
TableSchema tableSchema = new TableSchema();
tableSchema.setDatabase(database);
@@ -55,14 +55,10 @@ public class DorisSchemaFactory {
tableSchema.setKeys(buildKeys(pkKeys, columnFields));
tableSchema.setTableComment(tableComment);
tableSchema.setDistributeKeys(buildDistributeKeys(pkKeys,
columnFields));
- tableSchema.setProperties(tableProperties);
- if (tableProperties.containsKey("table-buckets")) {
- if (MapUtils.isEmpty(tableBucketMap)) {
- String tableBucketsConfig =
tableProperties.get("table-buckets");
- tableBucketMap = buildTableBucketMap(tableBucketsConfig);
- }
- Integer buckets = parseTableSchemaBuckets(tableBucketMap, table);
- tableSchema.setTableBuckets(buckets);
+ if (Objects.nonNull(dorisTableConfig)) {
+ tableSchema.setProperties(dorisTableConfig.getTableProperties());
+ tableSchema.setTableBuckets(
+
parseTableSchemaBuckets(dorisTableConfig.getTableBuckets(), table));
}
return tableSchema;
}
@@ -92,7 +88,7 @@ public class DorisSchemaFactory {
@VisibleForTesting
public static Integer parseTableSchemaBuckets(
Map<String, Integer> tableBucketsMap, String tableName) {
- if (tableBucketsMap != null) {
+ if (MapUtils.isNotEmpty(tableBucketsMap)) {
// Firstly, if the table name is in the table-buckets map, set the
buckets of the table.
if (tableBucketsMap.containsKey(tableName)) {
return tableBucketsMap.get(tableName);
@@ -107,22 +103,4 @@ public class DorisSchemaFactory {
}
return null;
}
-
- /**
- * Build table bucket Map.
- *
- * @param tableBuckets the string of tableBuckets,
eg:student:10,student_info:20,student.*:30
- * @return The table name and buckets map. The key is table name, the
value is buckets.
- */
- @VisibleForTesting
- public static Map<String, Integer> buildTableBucketMap(String
tableBuckets) {
- Map<String, Integer> tableBucketsMap = new LinkedHashMap<>();
- String[] tableBucketsArray = tableBuckets.split(",");
- for (String tableBucket : tableBucketsArray) {
- String[] tableBucketArray = tableBucket.split(":");
- tableBucketsMap.put(
- tableBucketArray[0].trim(),
Integer.parseInt(tableBucketArray[1].trim()));
- }
- return tableBucketsMap;
- }
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
index 0d33eb9f..be6572d3 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
@@ -50,7 +50,6 @@ import static
org.apache.flink.util.Preconditions.checkArgument;
public class DorisSystem implements Serializable {
private static final long serialVersionUID = 1L;
private static final Logger LOG =
LoggerFactory.getLogger(DorisSystem.class);
- private static final String TABLE_BUCKETS = "table-buckets";
private final JdbcConnectionProvider jdbcConnectionProvider;
private static final List<String> builtinDatabases =
Collections.singletonList("information_schema");
@@ -200,13 +199,7 @@ public class DorisSystem implements Serializable {
}
// append properties
int index = 0;
- int skipProNum = 0;
for (Map.Entry<String, String> entry : properties.entrySet()) {
- // skip table-buckets
- if (entry.getKey().equals(TABLE_BUCKETS)) {
- skipProNum++;
- continue;
- }
if (index == 0) {
sb.append(" PROPERTIES (");
}
@@ -218,7 +211,7 @@ public class DorisSystem implements Serializable {
.append(quoteProperties(entry.getValue()));
index++;
- if (index == (schema.getProperties().size() - skipProNum)) {
+ if (index == (schema.getProperties().size())) {
sb.append(")");
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java
index c44f7433..9289c886 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java
@@ -35,6 +35,7 @@ import
org.apache.doris.flink.catalog.doris.DorisSchemaFactory;
import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.catalog.doris.TableSchema;
import
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeUtils;
+import org.apache.doris.flink.tools.cdc.DorisTableConfig;
import org.apache.doris.flink.tools.cdc.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -110,7 +111,7 @@ public class SQLParserSchemaManager implements Serializable
{
SourceConnector sourceConnector,
String ddl,
String dorisTable,
- Map<String, String> tableProperties) {
+ DorisTableConfig dorisTableConfig) {
try {
Statement statement = CCJSqlParserUtil.parse(ddl);
if (statement instanceof CreateTable) {
@@ -144,7 +145,7 @@ public class SQLParserSchemaManager implements Serializable
{
dbTable[1],
columnFields,
pkKeys,
- tableProperties,
+ dorisTableConfig,
extractTableComment(createTable.getTableOptionsStrings()));
} else {
LOG.warn(
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
index c1ed1de1..83dec58b 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
@@ -37,6 +37,7 @@ import
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumSc
import
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumSchemaChangeImpl;
import
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumSchemaChangeImplV2;
import
org.apache.doris.flink.sink.writer.serializer.jsondebezium.SQLParserSchemaChange;
+import org.apache.doris.flink.tools.cdc.DorisTableConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,7 +73,7 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
// <cdc db.schema.table, doris db.table>
private Map<String, String> tableMapping;
// create table properties
- private Map<String, String> tableProperties;
+ private DorisTableConfig dorisTableConfig;
private String targetDatabase;
private String targetTablePrefix;
private String targetTableSuffix;
@@ -120,18 +121,18 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
boolean newSchemaChange,
DorisExecutionOptions executionOptions,
Map<String, String> tableMapping,
- Map<String, String> tableProperties,
+ DorisTableConfig dorisTableConfig,
String targetDatabase,
String targetTablePrefix,
String targetTableSuffix,
SchemaChangeMode schemaChangeMode) {
this(dorisOptions, pattern, sourceTableName, newSchemaChange,
executionOptions);
this.tableMapping = tableMapping;
- this.tableProperties = tableProperties;
this.targetDatabase = targetDatabase;
this.targetTablePrefix = targetTablePrefix;
this.targetTableSuffix = targetTableSuffix;
this.schemaChangeMode = schemaChangeMode;
+ this.dorisTableConfig = dorisTableConfig;
init();
}
@@ -142,7 +143,7 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
tableMapping,
sourceTableName,
targetDatabase,
- tableProperties,
+ dorisTableConfig,
objectMapper,
pattern,
lineDelimiter,
@@ -225,7 +226,7 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
private SchemaChangeMode schemaChangeMode;
private DorisExecutionOptions executionOptions;
private Map<String, String> tableMapping;
- private Map<String, String> tableProperties;
+ private DorisTableConfig dorisTableConfig;
private String targetDatabase;
private String targetTablePrefix = "";
private String targetTableSuffix = "";
@@ -268,8 +269,14 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
return this;
}
+ @Deprecated
public Builder setTableProperties(Map<String, String> tableProperties)
{
- this.tableProperties = tableProperties;
+ this.dorisTableConfig = new DorisTableConfig(tableProperties);
+ return this;
+ }
+
+ public Builder setDorisTableConf(DorisTableConfig dorisTableConfig) {
+ this.dorisTableConfig = dorisTableConfig;
return this;
}
@@ -300,7 +307,7 @@ public class JsonDebeziumSchemaSerializer implements
DorisRecordSerializer<Strin
newSchemaChange,
executionOptions,
tableMapping,
- tableProperties,
+ dorisTableConfig,
targetDatabase,
targetTablePrefix,
targetTableSuffix,
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
index 2a3eebe0..d1326c72 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
@@ -19,9 +19,12 @@ package
org.apache.doris.flink.sink.writer.serializer.jsondebezium;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.tools.cdc.DorisTableConfig;
import java.io.Serializable;
+import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
import java.util.regex.Pattern;
/** Record the context of schema change and data change during serialization.
*/
@@ -33,7 +36,7 @@ public class JsonDebeziumChangeContext implements
Serializable {
private final String sourceTableName;
private final String targetDatabase;
// create table properties
- private final Map<String, String> tableProperties;
+ private final DorisTableConfig dorisTableConfig;
private final ObjectMapper objectMapper;
private final Pattern pattern;
private final String lineDelimiter;
@@ -46,7 +49,7 @@ public class JsonDebeziumChangeContext implements
Serializable {
Map<String, String> tableMapping,
String sourceTableName,
String targetDatabase,
- Map<String, String> tableProperties,
+ DorisTableConfig dorisTableConfig,
ObjectMapper objectMapper,
Pattern pattern,
String lineDelimiter,
@@ -57,7 +60,7 @@ public class JsonDebeziumChangeContext implements
Serializable {
this.tableMapping = tableMapping;
this.sourceTableName = sourceTableName;
this.targetDatabase = targetDatabase;
- this.tableProperties = tableProperties;
+ this.dorisTableConfig = dorisTableConfig;
this.objectMapper = objectMapper;
this.pattern = pattern;
this.lineDelimiter = lineDelimiter;
@@ -82,8 +85,11 @@ public class JsonDebeziumChangeContext implements
Serializable {
return targetDatabase;
}
+ @Deprecated
public Map<String, String> getTableProperties() {
- return tableProperties;
+ return Objects.nonNull(dorisTableConfig)
+ ? dorisTableConfig.getTableProperties()
+ : new HashMap<>();
}
public ObjectMapper getObjectMapper() {
@@ -109,4 +115,8 @@ public class JsonDebeziumChangeContext implements
Serializable {
public String getTargetTableSuffix() {
return targetTableSuffix;
}
+
+ public DorisTableConfig getDorisTableConf() {
+ return dorisTableConfig;
+ }
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
index 15817da6..91be21fa 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
@@ -30,6 +30,7 @@ import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.exception.IllegalArgumentException;
import org.apache.doris.flink.sink.schema.SchemaChangeManager;
import org.apache.doris.flink.sink.writer.EventType;
+import org.apache.doris.flink.tools.cdc.DorisTableConfig;
import org.apache.doris.flink.tools.cdc.SourceConnector;
import org.apache.doris.flink.tools.cdc.SourceSchema;
import org.slf4j.Logger;
@@ -69,6 +70,7 @@ public abstract class JsonDebeziumSchemaChange extends
CdcSchemaChange {
protected String targetDatabase;
protected String targetTablePrefix;
protected String targetTableSuffix;
+ protected DorisTableConfig dorisTableConfig;
public abstract boolean schemaChange(JsonNode recordRoot);
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
index 3f985801..94605105 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
@@ -68,7 +68,6 @@ public class JsonDebeziumSchemaChangeImplV2 extends
JsonDebeziumSchemaChange {
// schemaChange saves table names, field, and field column information
private Map<String, Map<String, FieldSchema>> originFieldSchemaMap = new
LinkedHashMap<>();
// create table properties
- private final Map<String, String> tableProperties;
private final Set<String> filledTables = new HashSet<>();
public JsonDebeziumSchemaChangeImplV2(JsonDebeziumChangeContext
changeContext) {
@@ -78,7 +77,7 @@ public class JsonDebeziumSchemaChangeImplV2 extends
JsonDebeziumSchemaChange {
this.dorisOptions = changeContext.getDorisOptions();
this.schemaChangeManager = new SchemaChangeManager(dorisOptions);
this.targetDatabase = changeContext.getTargetDatabase();
- this.tableProperties = changeContext.getTableProperties();
+ this.dorisTableConfig = changeContext.getDorisTableConf();
this.tableMapping = changeContext.getTableMapping();
this.objectMapper = changeContext.getObjectMapper();
this.targetTablePrefix =
@@ -233,7 +232,7 @@ public class JsonDebeziumSchemaChangeImplV2 extends
JsonDebeziumSchemaChange {
Preconditions.checkArgument(dbTable.length == 2);
return DorisSchemaFactory.createTableSchema(
- dbTable[0], dbTable[1], fields, pkList, tableProperties,
tblComment);
+ dbTable[0], dbTable[1], fields, pkList, dorisTableConfig,
tblComment);
}
private boolean checkSchemaChange(String database, String table, DDLSchema
ddlSchema)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
index 71d4edfe..5bf245ee 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
@@ -44,6 +44,7 @@ public class SQLParserSchemaChange extends
JsonDebeziumSchemaChange {
this.tableMapping = changeContext.getTableMapping();
this.objectMapper = changeContext.getObjectMapper();
this.targetDatabase = changeContext.getTargetDatabase();
+ this.dorisTableConfig = changeContext.getDorisTableConf();
this.targetTablePrefix =
changeContext.getTargetTablePrefix() == null
? ""
@@ -106,7 +107,7 @@ public class SQLParserSchemaChange extends
JsonDebeziumSchemaChange {
String ddl = extractJsonNode(historyRecord, "ddl");
extractSourceConnector(record);
return sqlParserSchemaManager.parseCreateTableStatement(
- sourceConnector, ddl, dorisTable,
changeContext.getTableProperties());
+ sourceConnector, ddl, dorisTable, dorisTableConfig);
}
@VisibleForTesting
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
index c91a0734..ec3b4523 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
@@ -142,7 +142,8 @@ public class CdcTools {
Preconditions.checkArgument(params.has(DatabaseSyncConfig.SINK_CONF));
Map<String, String> sinkMap = getConfigMap(params,
DatabaseSyncConfig.SINK_CONF);
- Map<String, String> tableMap = getConfigMap(params,
DatabaseSyncConfig.TABLE_CONF);
+ DorisTableConfig tableConfig =
+ new DorisTableConfig(getConfigMap(params,
DatabaseSyncConfig.TABLE_CONF));
Configuration sinkConfig = Configuration.fromMap(sinkMap);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -158,7 +159,7 @@ public class CdcTools {
.setMultiToOneTarget(multiToOneTarget)
.setIgnoreDefaultValue(ignoreDefaultValue)
.setSinkConfig(sinkConfig)
- .setTableConfig(tableMap)
+ .setTableConfig(tableConfig)
.setCreateTableOnly(createTableOnly)
.setSingleSink(singleSink)
.setIgnoreIncompatible(ignoreIncompatible)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 1e66dd4c..e565d0a7 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -61,7 +61,6 @@ import java.util.regex.Pattern;
public abstract class DatabaseSync {
private static final Logger LOG =
LoggerFactory.getLogger(DatabaseSync.class);
- private static final String LIGHT_SCHEMA_CHANGE = "light_schema_change";
private static final String TABLE_NAME_OPTIONS = "table-name";
protected Configuration config;
@@ -72,7 +71,7 @@ public abstract class DatabaseSync {
protected Pattern includingPattern;
protected Pattern excludingPattern;
protected Map<Pattern, String> multiToOneRulesPattern;
- protected Map<String, String> tableConfig = new HashMap<>();
+ protected DorisTableConfig dorisTableConfig;
protected Configuration sinkConfig;
protected boolean ignoreDefaultValue;
protected boolean ignoreIncompatible;
@@ -110,10 +109,6 @@ public abstract class DatabaseSync {
this.excludingPattern = excludingTables == null ? null :
Pattern.compile(excludingTables);
this.multiToOneRulesPattern = multiToOneRulesParser(multiToOneOrigin,
multiToOneTarget);
this.converter = new TableNameConverter(tablePrefix, tableSuffix,
multiToOneRulesPattern);
- // default enable light schema change
- if (!this.tableConfig.containsKey(LIGHT_SCHEMA_CHANGE)) {
- this.tableConfig.put(LIGHT_SCHEMA_CHANGE, "true");
- }
}
public void build() throws Exception {
@@ -336,7 +331,7 @@ public abstract class DatabaseSync {
.setSchemaChangeMode(schemaChangeMode)
.setExecutionOptions(executionOptions)
.setTableMapping(tableMapping)
- .setTableProperties(tableConfig)
+ .setDorisTableConf(dorisTableConfig)
.setTargetDatabase(database)
.setTargetTablePrefix(tablePrefix)
.setTargetTableSuffix(tableSuffix)
@@ -468,7 +463,7 @@ public abstract class DatabaseSync {
dorisTable,
schema.getFields(),
schema.getPrimaryKeys(),
- tableConfig,
+ dorisTableConfig,
schema.getTableComment());
try {
dorisSystem.createTable(dorisSchema);
@@ -523,13 +518,19 @@ public abstract class DatabaseSync {
return this;
}
+ @Deprecated
public DatabaseSync setTableConfig(Map<String, String> tableConfig) {
if (!CollectionUtil.isNullOrEmpty(tableConfig)) {
- this.tableConfig = tableConfig;
+ this.dorisTableConfig = new DorisTableConfig(tableConfig);
}
return this;
}
+ public DatabaseSync setTableConfig(DorisTableConfig tableConfig) {
+ this.dorisTableConfig = tableConfig;
+ return this;
+ }
+
public DatabaseSync setSinkConfig(Configuration sinkConfig) {
this.sinkConfig = sinkConfig;
return this;
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java
new file mode 100644
index 00000000..912ed698
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+
+public class DorisTableConfig implements Serializable {
+ private static final String LIGHT_SCHEMA_CHANGE = "light_schema_change";
+ // PROPERTIES parameter in doris table creation statement. such as:
replication_num=1.
+ private final Map<String, String> tableProperties;
+ // The specific parameters extracted from --table-conf need to be parsed
and integrated into the
+ // doris table creation statement. such as:
table-buckets="tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50".
+ private Map<String, Integer> tableBuckets;
+
+ // Only for testing
+ @VisibleForTesting
+ public DorisTableConfig() {
+ tableProperties = new HashMap<>();
+ tableBuckets = new HashMap<>();
+ }
+
+ public DorisTableConfig(Map<String, String> tableConfig) {
+ if (Objects.isNull(tableConfig)) {
+ tableConfig = new HashMap<>();
+ }
+ // default enable light schema change
+ if (!tableConfig.containsKey(LIGHT_SCHEMA_CHANGE)) {
+ tableConfig.put(LIGHT_SCHEMA_CHANGE, Boolean.toString(true));
+ }
+ if (tableConfig.containsKey(DatabaseSyncConfig.TABLE_BUCKETS)) {
+ this.tableBuckets =
+
buildTableBucketMap(tableConfig.get(DatabaseSyncConfig.TABLE_BUCKETS));
+ tableConfig.remove(DatabaseSyncConfig.TABLE_BUCKETS);
+ }
+ tableProperties = tableConfig;
+ }
+
+ public Map<String, Integer> getTableBuckets() {
+ return tableBuckets;
+ }
+
+ public Map<String, String> getTableProperties() {
+ return tableProperties;
+ }
+
+ /**
+ * Build table bucket Map.
+ *
+ * @param tableBuckets the string of tableBuckets,
eg:student:10,student_info:20,student.*:30
+ * @return The table name and buckets map. The key is table name, the
value is buckets.
+ */
+ @VisibleForTesting
+ public Map<String, Integer> buildTableBucketMap(String tableBuckets) {
+ Map<String, Integer> tableBucketsMap = new LinkedHashMap<>();
+ String[] tableBucketsArray = tableBuckets.split(",");
+ for (String tableBucket : tableBucketsArray) {
+ String[] tableBucketArray = tableBucket.split(":");
+ tableBucketsMap.put(
+ tableBucketArray[0].trim(),
Integer.parseInt(tableBucketArray[1].trim()));
+ }
+ return tableBucketsMap;
+ }
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
index 8e406f21..eac6acc4 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
@@ -208,7 +208,7 @@ public class MongoDBDatabaseSync extends DatabaseSync {
.setDorisOptions(dorisBuilder.build())
.setExecutionOptions(executionOptions)
.setTableMapping(tableMapping)
- .setTableProperties(tableConfig)
+ .setTableConf(dorisTableConfig)
.setTargetDatabase(database)
.build();
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java
index 5c9c1b9e..b7faec2f 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java
@@ -28,6 +28,7 @@ import
org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer;
import
org.apache.doris.flink.sink.writer.serializer.jsondebezium.CdcDataChange;
import
org.apache.doris.flink.sink.writer.serializer.jsondebezium.CdcSchemaChange;
import
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeContext;
+import org.apache.doris.flink.tools.cdc.DorisTableConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,7 +53,7 @@ public class MongoDBJsonDebeziumSchemaSerializer implements
DorisRecordSerialize
// <cdc db.schema.table, doris db.table>
private Map<String, String> tableMapping;
// create table properties
- private Map<String, String> tableProperties;
+ private DorisTableConfig dorisTableConfig;
private String targetDatabase;
private CdcDataChange dataChange;
@@ -67,7 +68,7 @@ public class MongoDBJsonDebeziumSchemaSerializer implements
DorisRecordSerialize
String sourceTableName,
DorisExecutionOptions executionOptions,
Map<String, String> tableMapping,
- Map<String, String> tableProperties,
+ DorisTableConfig dorisTableConfig,
String targetDatabase,
String targetTablePrefix,
String targetTableSuffix) {
@@ -79,7 +80,7 @@ public class MongoDBJsonDebeziumSchemaSerializer implements
DorisRecordSerialize
JsonNodeFactory jsonNodeFactory =
JsonNodeFactory.withExactBigDecimals(true);
this.objectMapper.setNodeFactory(jsonNodeFactory);
this.tableMapping = tableMapping;
- this.tableProperties = tableProperties;
+ this.dorisTableConfig = dorisTableConfig;
this.targetDatabase = targetDatabase;
this.targetTablePrefix = targetTablePrefix;
this.targetTableSuffix = targetTableSuffix;
@@ -100,7 +101,7 @@ public class MongoDBJsonDebeziumSchemaSerializer implements
DorisRecordSerialize
tableMapping,
sourceTableName,
targetDatabase,
- tableProperties,
+ dorisTableConfig,
objectMapper,
pattern,
lineDelimiter,
@@ -138,7 +139,7 @@ public class MongoDBJsonDebeziumSchemaSerializer implements
DorisRecordSerialize
private String sourceTableName;
private DorisExecutionOptions executionOptions;
private Map<String, String> tableMapping;
- private Map<String, String> tableProperties;
+ private DorisTableConfig dorisTableConfig;
private String targetDatabase;
private String targetTablePrefix = "";
private String targetTableSuffix = "";
@@ -172,9 +173,16 @@ public class MongoDBJsonDebeziumSchemaSerializer
implements DorisRecordSerialize
return this;
}
+ @Deprecated
public MongoDBJsonDebeziumSchemaSerializer.Builder setTableProperties(
Map<String, String> tableProperties) {
- this.tableProperties = tableProperties;
+ this.dorisTableConfig = new DorisTableConfig(tableProperties);
+ return this;
+ }
+
+ public MongoDBJsonDebeziumSchemaSerializer.Builder setTableConf(
+ DorisTableConfig dorisTableConfig) {
+ this.dorisTableConfig = dorisTableConfig;
return this;
}
@@ -191,7 +199,7 @@ public class MongoDBJsonDebeziumSchemaSerializer implements
DorisRecordSerialize
sourceTableName,
executionOptions,
tableMapping,
- tableProperties,
+ dorisTableConfig,
targetDatabase,
targetTablePrefix,
targetTableSuffix);
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactoryTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactoryTest.java
index 29e8d461..1bc1f115 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactoryTest.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactoryTest.java
@@ -19,6 +19,7 @@ package org.apache.doris.flink.catalog.doris;
import org.apache.flink.util.Preconditions;
+import org.apache.doris.flink.tools.cdc.DorisTableConfig;
import org.junit.Assert;
import org.junit.Test;
@@ -28,8 +29,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static org.junit.Assert.assertEquals;
-
public class DorisSchemaFactoryTest {
@Test
@@ -65,10 +64,10 @@ public class DorisSchemaFactoryTest {
dbTable[1],
columnFields,
pkKeys,
- tableProperties,
+ new DorisTableConfig(tableProperties),
tableComment);
Assert.assertEquals(
- "TableSchema{database='doris', table='create_tab',
tableComment='auto_tab_comment', fields={name=FieldSchema{name='name',
typeString='VARVHAR(100)', defaultValue='null', comment='Name_test'},
id=FieldSchema{name='id', typeString='INT', defaultValue='100',
comment='int_test'}, age=FieldSchema{name='age', typeString='INT',
defaultValue='null', comment='null'}, email=FieldSchema{name='email',
typeString='VARCHAR(100)', defaultValue='[email protected]', comment='e'}},
keys=emai [...]
+ "TableSchema{database='doris', table='create_tab',
tableComment='auto_tab_comment', fields={name=FieldSchema{name='name',
typeString='VARVHAR(100)', defaultValue='null', comment='Name_test'},
id=FieldSchema{name='id', typeString='INT', defaultValue='100',
comment='int_test'}, age=FieldSchema{name='age', typeString='INT',
defaultValue='null', comment='null'}, email=FieldSchema{name='email',
typeString='VARCHAR(100)', defaultValue='[email protected]', comment='e'}},
keys=emai [...]
tableSchema.toString());
}
@@ -86,6 +85,7 @@ public class DorisSchemaFactoryTest {
List<String> pkKeys = Collections.singletonList("email");
Map<String, String> tableProperties = new HashMap<>();
tableProperties.put("table-buckets", "create_tab:40, create_taba:10,
tabs:12");
+ tableProperties.put("replication_num", "2");
String tableComment = "auto_tab_comment";
TableSchema tableSchema =
DorisSchemaFactory.createTableSchema(
@@ -93,11 +93,10 @@ public class DorisSchemaFactoryTest {
dbTable[1],
columnFields,
pkKeys,
- tableProperties,
+ new DorisTableConfig(tableProperties),
tableComment);
-
Assert.assertEquals(
- "TableSchema{database='doris', table='create_tab',
tableComment='auto_tab_comment', fields={name=FieldSchema{name='name',
typeString='VARVHAR(100)', defaultValue='null', comment='Name_test'},
id=FieldSchema{name='id', typeString='INT', defaultValue='100',
comment='int_test'}, age=FieldSchema{name='age', typeString='INT',
defaultValue='null', comment='null'}, email=FieldSchema{name='email',
typeString='VARCHAR(100)', defaultValue='[email protected]', comment='e'}},
keys=emai [...]
+ "TableSchema{database='doris', table='create_tab',
tableComment='auto_tab_comment', fields={name=FieldSchema{name='name',
typeString='VARVHAR(100)', defaultValue='null', comment='Name_test'},
id=FieldSchema{name='id', typeString='INT', defaultValue='100',
comment='int_test'}, age=FieldSchema{name='age', typeString='INT',
defaultValue='null', comment='null'}, email=FieldSchema{name='email',
typeString='VARCHAR(100)', defaultValue='[email protected]', comment='e'}},
keys=emai [...]
tableSchema.toString());
}
@@ -113,6 +112,7 @@ public class DorisSchemaFactoryTest {
columnFields.put("age", new FieldSchema("age", "INT", null, null));
columnFields.put("email", new FieldSchema("email", "VARCHAR(100)",
"[email protected]", "e"));
Map<String, String> tableProperties = new HashMap<>();
+ tableProperties.put("replication_num", "1");
String tableComment = "auto_tab_comment";
TableSchema tableSchema =
DorisSchemaFactory.createTableSchema(
@@ -120,21 +120,10 @@ public class DorisSchemaFactoryTest {
dbTable[1],
columnFields,
new ArrayList<>(),
- tableProperties,
+ new DorisTableConfig(tableProperties),
tableComment);
Assert.assertEquals(
- "TableSchema{database='doris', table='dup_tab',
tableComment='auto_tab_comment', fields={name=FieldSchema{name='name',
typeString='VARVHAR(100)', defaultValue='null', comment='Name_test'},
id=FieldSchema{name='id', typeString='INT', defaultValue='100',
comment='int_test'}, age=FieldSchema{name='age', typeString='INT',
defaultValue='null', comment='null'}, email=FieldSchema{name='email',
typeString='VARCHAR(100)', defaultValue='[email protected]', comment='e'}},
keys=name, m [...]
+ "TableSchema{database='doris', table='dup_tab',
tableComment='auto_tab_comment', fields={name=FieldSchema{name='name',
typeString='VARVHAR(100)', defaultValue='null', comment='Name_test'},
id=FieldSchema{name='id', typeString='INT', defaultValue='100',
comment='int_test'}, age=FieldSchema{name='age', typeString='INT',
defaultValue='null', comment='null'}, email=FieldSchema{name='email',
typeString='VARCHAR(100)', defaultValue='[email protected]', comment='e'}},
keys=name, m [...]
tableSchema.toString());
}
-
- @Test
- public void buildTableBucketMapTest() {
- String tableBuckets = "tbl1:10,tbl2 : 20, a.* :30,b.*:40,.*:50";
- Map<String, Integer> tableBucketsMap =
DorisSchemaFactory.buildTableBucketMap(tableBuckets);
- assertEquals(10, tableBucketsMap.get("tbl1").intValue());
- assertEquals(20, tableBucketsMap.get("tbl2").intValue());
- assertEquals(30, tableBucketsMap.get("a.*").intValue());
- assertEquals(40, tableBucketsMap.get("b.*").intValue());
- assertEquals(50, tableBucketsMap.get(".*").intValue());
- }
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/doris/DorisSystemTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/doris/DorisSystemTest.java
index df0321f6..65e1240d 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/doris/DorisSystemTest.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/doris/DorisSystemTest.java
@@ -80,11 +80,11 @@ public class DorisSystemTest {
schema.setFields(map);
schema.setTableBuckets(1);
Map<String, String> properties = new HashMap<>();
- properties.put("table-buckets", "1");
+ properties.put("replication_num", "3");
schema.setProperties(properties);
String createTableDDL = DorisSystem.buildCreateTableDDL(schema);
String except =
- "CREATE TABLE IF NOT EXISTS `db`.`table`(`name` VARCHAR(65533)
DEFAULT 'zhangsan' COMMENT '' ) DISTRIBUTED BY HASH(`name`) BUCKETS 1";
+ "CREATE TABLE IF NOT EXISTS `db`.`table`(`name` VARCHAR(65533)
DEFAULT 'zhangsan' COMMENT '' ) DISTRIBUTED BY HASH(`name`) BUCKETS 1
PROPERTIES ('replication_num'='3')";
Assert.assertEquals(createTableDDL, except);
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java
index d101d1e8..d8cb7c3f 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java
@@ -18,6 +18,7 @@
package org.apache.doris.flink.sink.schema;
import org.apache.doris.flink.catalog.doris.TableSchema;
+import org.apache.doris.flink.tools.cdc.DorisTableConfig;
import org.apache.doris.flink.tools.cdc.SourceConnector;
import org.junit.Assert;
import org.junit.Before;
@@ -222,7 +223,7 @@ public class SQLParserSchemaManagerTest {
+ ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
COLLATE=utf8mb4_0900_ai_ci";
TableSchema tableSchema =
schemaManager.parseCreateTableStatement(
- SourceConnector.MYSQL, ddl, dorisTable, new
HashMap<>());
+ SourceConnector.MYSQL, ddl, dorisTable, null);
String expected =
"TableSchema{database='doris', table='auto_tab',
tableComment='null', fields={`id`=FieldSchema{name='`id`', typeString='INT',
defaultValue='10000', comment='id_test'},
`create_time`=FieldSchema{name='`create_time`', typeString='DATETIMEV2(3)',
defaultValue='CURRENT_TIMESTAMP', comment='null'},
`c1`=FieldSchema{name='`c1`', typeString='INT', defaultValue='999',
comment='null'}, `decimal_type`=FieldSchema{name='`decimal_type`',
typeString='DECIMALV3(9,3)', defaultValue='1.0 [...]
@@ -236,7 +237,7 @@ public class SQLParserSchemaManagerTest {
"CREATE TABLE test_sink_unique ( id INT NOT NULL, name
VARCHAR(100) NOT NULL, age INT, email VARCHAR(100), UNIQUE (email)
)";
TableSchema tableSchema =
schemaManager.parseCreateTableStatement(
- SourceConnector.MYSQL, ddl, dorisTable, new
HashMap<>());
+ SourceConnector.MYSQL, ddl, dorisTable, null);
String expected =
"TableSchema{database='doris', table='auto_uni_tab',
tableComment='null', fields={id=FieldSchema{name='id', typeString='INT',
defaultValue='null', comment='null'}, name=FieldSchema{name='name',
typeString='VARCHAR(300)', defaultValue='null', comment='null'},
age=FieldSchema{name='age', typeString='INT', defaultValue='null',
comment='null'}, email=FieldSchema{name='email', typeString='VARCHAR(300)',
defaultValue='null', comment='null'}}, keys=email, model=UNIQUE, distribut [...]
@@ -255,7 +256,7 @@ public class SQLParserSchemaManagerTest {
+ ")";
TableSchema tableSchema =
schemaManager.parseCreateTableStatement(
- SourceConnector.MYSQL, ddl, dorisTable, new
HashMap<>());
+ SourceConnector.MYSQL, ddl, dorisTable, null);
String expected =
"TableSchema{database='doris', table='auto_duptab',
tableComment='null', fields={id=FieldSchema{name='id', typeString='INT',
defaultValue='null', comment='null'}, name=FieldSchema{name='name',
typeString='VARCHAR(150)', defaultValue='null', comment='null'},
age=FieldSchema{name='age', typeString='INT', defaultValue='null',
comment='null'}, address=FieldSchema{name='address', typeString='VARCHAR(765)',
defaultValue='null', comment='null'}}, keys=id, model=DUPLICATE, distri [...]
@@ -284,7 +285,7 @@ public class SQLParserSchemaManagerTest {
+ ");";
TableSchema tableSchema =
schemaManager.parseCreateTableStatement(
- SourceConnector.ORACLE, ddl, dorisTable, new
HashMap<>());
+ SourceConnector.ORACLE, ddl, dorisTable, null);
String expected =
"TableSchema{database='doris', table='auto_tab',
tableComment='null', fields={employee_id=FieldSchema{name='employee_id',
typeString='BIGINT', defaultValue='null', comment='null'},
first_name=FieldSchema{name='first_name', typeString='VARCHAR(150)',
defaultValue='null', comment='null'}, last_name=FieldSchema{name='last_name',
typeString='VARCHAR(150)', defaultValue='null', comment='null'},
email=FieldSchema{name='email', typeString='VARCHAR(300)', defaultValue='null',
com [...]
@@ -310,7 +311,7 @@ public class SQLParserSchemaManagerTest {
+ ");";
TableSchema tableSchema =
schemaManager.parseCreateTableStatement(
- SourceConnector.ORACLE, ddl, dorisTable, new
HashMap<>());
+ SourceConnector.ORACLE, ddl, dorisTable, null);
String expected =
"TableSchema{database='doris', table='auto_tab',
tableComment='null', fields={employee_id=FieldSchema{name='employee_id',
typeString='BIGINT', defaultValue='null', comment='null'},
first_name=FieldSchema{name='first_name', typeString='VARCHAR(150)',
defaultValue='null', comment='null'}, last_name=FieldSchema{name='last_name',
typeString='VARCHAR(150)', defaultValue='null', comment='null'},
email=FieldSchema{name='email', typeString='VARCHAR(300)', defaultValue='null',
com [...]
@@ -335,10 +336,12 @@ public class SQLParserSchemaManagerTest {
+ ");";
TableSchema tableSchema =
schemaManager.parseCreateTableStatement(
- SourceConnector.ORACLE, ddl, dorisTable, new
HashMap<>());
-
+ SourceConnector.ORACLE,
+ ddl,
+ dorisTable,
+ new DorisTableConfig(new HashMap<>()));
String expected =
- "TableSchema{database='doris', table='auto_tab',
tableComment='null', fields={order_id=FieldSchema{name='order_id',
typeString='BIGINT', defaultValue='null', comment='null'},
customer_id=FieldSchema{name='customer_id', typeString='BIGINT',
defaultValue='null', comment='null'}, order_date=FieldSchema{name='order_date',
typeString='DATETIMEV2', defaultValue='SYSDATE', comment='null'},
status=FieldSchema{name='status', typeString='VARCHAR(60)',
defaultValue='null', comment=' [...]
+ "TableSchema{database='doris', table='auto_tab',
tableComment='null', fields={order_id=FieldSchema{name='order_id',
typeString='BIGINT', defaultValue='null', comment='null'},
customer_id=FieldSchema{name='customer_id', typeString='BIGINT',
defaultValue='null', comment='null'}, order_date=FieldSchema{name='order_date',
typeString='DATETIMEV2', defaultValue='SYSDATE', comment='null'},
status=FieldSchema{name='status', typeString='VARCHAR(60)',
defaultValue='null', comment=' [...]
Assert.assertEquals(expected, tableSchema.toString());
}
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
index a3c24d15..68239618 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
@@ -26,6 +26,7 @@ import org.apache.doris.flink.catalog.doris.TableSchema;
import org.apache.doris.flink.exception.DorisRuntimeException;
import org.apache.doris.flink.rest.RestService;
import org.apache.doris.flink.rest.models.Schema;
+import org.apache.doris.flink.tools.cdc.DorisTableConfig;
import org.apache.doris.flink.tools.cdc.SourceConnector;
import org.junit.After;
import org.junit.Assert;
@@ -76,7 +77,7 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends
TestJsonDebeziumChangeBa
tableMapping,
sourceTableName,
targetDatabase,
- tableProperties,
+ new DorisTableConfig(tableProperties),
objectMapper,
null,
lineDelimiter,
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java
index aa1333da..d3194f81 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java
@@ -25,7 +25,6 @@ import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
public class TestSQLParserSchemaChange extends TestJsonDebeziumChangeBase {
@@ -41,7 +40,7 @@ public class TestSQLParserSchemaChange extends
TestJsonDebeziumChangeBase {
tableMapping,
null,
null,
- new HashMap<>(),
+ null,
objectMapper,
null,
lineDelimiter,
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DorisTableConfigTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DorisTableConfigTest.java
new file mode 100644
index 00000000..1a82e1c6
--- /dev/null
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DorisTableConfigTest.java
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class DorisTableConfigTest {
+
+ private DorisTableConfig dorisTableConfig;
+
+ @Before
+ public void init() {
+ dorisTableConfig = new DorisTableConfig();
+ }
+
+ @Test
+ public void buildTableBucketMapTest() {
+ String tableBuckets = "tbl1:10,tbl2 : 20, a.* :30,b.*:40,.*:50";
+ Map<String, Integer> tableBucketsMap =
dorisTableConfig.buildTableBucketMap(tableBuckets);
+ assertEquals(10, tableBucketsMap.get("tbl1").intValue());
+ assertEquals(20, tableBucketsMap.get("tbl2").intValue());
+ assertEquals(30, tableBucketsMap.get("a.*").intValue());
+ assertEquals(40, tableBucketsMap.get("b.*").intValue());
+ assertEquals(50, tableBucketsMap.get(".*").intValue());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]