This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 35a9a852 [Improve](cdc) support partition table for auto create table
(#520)
35a9a852 is described below
commit 35a9a85298746d88a11c55dfae836928c2a5db55
Author: wudi <[email protected]>
AuthorDate: Mon Dec 2 10:11:04 2024 +0800
[Improve](cdc) support partition table for auto create table (#520)
---
.../flink/catalog/doris/DorisSchemaFactory.java | 50 ++++++++++++++++--
.../doris/flink/catalog/doris/TableSchema.java | 14 ++++-
.../org/apache/doris/flink/tools/cdc/CdcTools.java | 7 ++-
.../apache/doris/flink/tools/cdc/DatabaseSync.java | 5 +-
.../doris/flink/tools/cdc/DorisTableConfig.java | 32 ++++++++++++
.../doris/flink/container/AbstractE2EService.java | 4 ++
.../flink/container/e2e/Mysql2DorisE2ECase.java | 55 ++++++++++++++++++++
.../flink/tools/cdc/DorisTableConfigTest.java | 12 +++++
.../e2e/mysql2doris/testMySQL2DorisCreateTable.txt | 5 ++
.../testMySQL2DorisCreateTable_init.sql | 59 ++++++++++++++++++++++
10 files changed, 234 insertions(+), 9 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactory.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactory.java
index 9693d433..dd42f803 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactory.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactory.java
@@ -23,6 +23,7 @@ import org.apache.flink.util.StringUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.ObjectUtils;
import org.apache.doris.flink.exception.CreateTableException;
import org.apache.doris.flink.tools.cdc.DorisTableConfig;
@@ -63,6 +64,10 @@ public class DorisSchemaFactory {
tableSchema.setProperties(dorisTableConfig.getTableProperties());
tableSchema.setTableBuckets(
parseTableSchemaBuckets(dorisTableConfig.getTableBuckets(), table));
+ if (ObjectUtils.isNotEmpty(dorisTableConfig.getTablePartitions())
+ &&
dorisTableConfig.getTablePartitions().containsKey(table)) {
+
tableSchema.setPartitionInfo(dorisTableConfig.getTablePartitions().get(table));
+ }
}
return tableSchema;
}
@@ -123,16 +128,29 @@ public class DorisSchemaFactory {
throw new CreateTableException("key " + key + " not found in
column list");
}
FieldSchema field = fields.get(key);
- buildColumn(sb, field, true);
+ buildColumn(sb, field, true, false);
+ }
+
+ // append partition column, auto partition column must be in keys
+ if (schema.getPartitionInfo() != null) {
+ String partitionCol = schema.getPartitionInfo().f0;
+ FieldSchema field = fields.get(partitionCol);
+ buildColumn(sb, field, true, true);
}
// append values
for (Map.Entry<String, FieldSchema> entry : fields.entrySet()) {
+ // skip key column
if (keys.contains(entry.getKey())) {
continue;
}
+ // skip partition column
+ if (schema.getPartitionInfo() != null
+ && entry.getKey().equals(schema.getPartitionInfo().f0)) {
+ continue;
+ }
FieldSchema field = entry.getValue();
- buildColumn(sb, field, false);
+ buildColumn(sb, field, false, false);
}
sb = sb.deleteCharAt(sb.length() - 1);
sb.append(" ) ");
@@ -140,8 +158,13 @@ public class DorisSchemaFactory {
if (DataModel.UNIQUE.equals(schema.getModel())) {
sb.append(schema.getModel().name())
.append(" KEY(")
- .append(String.join(",", identifier(schema.getKeys())))
- .append(")");
+ .append(String.join(",", identifier(schema.getKeys())));
+
+ if (schema.getPartitionInfo() != null) {
+
sb.append(",").append(identifier(schema.getPartitionInfo().f0));
+ }
+
+ sb.append(")");
}
// append table comment
@@ -149,6 +172,16 @@ public class DorisSchemaFactory {
sb.append(" COMMENT
'").append(quoteComment(schema.getTableComment())).append("' ");
}
+ // append partition info if exists
+ if (schema.getPartitionInfo() != null) {
+ sb.append(" AUTO PARTITION BY RANGE ")
+ .append(
+ String.format(
+ "(date_trunc(`%s`, '%s'))",
+ schema.getPartitionInfo().f0,
schema.getPartitionInfo().f1))
+ .append("()");
+ }
+
// append distribute key
sb.append(" DISTRIBUTED BY HASH(")
.append(String.join(",",
identifier(schema.getDistributeKeys())))
@@ -165,6 +198,7 @@ public class DorisSchemaFactory {
} else {
sb.append(" BUCKETS AUTO ");
}
+
// append properties
int index = 0;
for (Map.Entry<String, String> entry : properties.entrySet()) {
@@ -186,13 +220,19 @@ public class DorisSchemaFactory {
return sb.toString();
}
- private static void buildColumn(StringBuilder sql, FieldSchema field,
boolean isKey) {
+ private static void buildColumn(
+ StringBuilder sql, FieldSchema field, boolean isKey, boolean
autoPartitionCol) {
String fieldType = field.getTypeString();
if (isKey && DorisType.STRING.equals(fieldType)) {
fieldType = String.format("%s(%s)", DorisType.VARCHAR, 65533);
}
sql.append(identifier(field.getName())).append(" ").append(fieldType);
+ // auto partition need set partition-column not null
+ if (autoPartitionCol) {
+ sql.append(" NOT NULL ");
+ }
+
if (field.getDefaultValue() != null) {
sql.append(" DEFAULT " +
quoteDefaultValue(field.getDefaultValue()));
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java
index 3a47a044..f7617ab1 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java
@@ -17,6 +17,8 @@
package org.apache.doris.flink.catalog.doris;
+import org.apache.flink.api.java.tuple.Tuple2;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -32,9 +34,11 @@ public class TableSchema {
private DataModel model = DataModel.DUPLICATE;
private List<String> distributeKeys = new ArrayList<>();
private Map<String, String> properties = new HashMap<>();
-
private Integer tableBuckets;
+ // Currently only supports auto partition, eg: DATE_TRUNC(column,interval)
+ private Tuple2<String, String> partitionInfo;
+
public String getDatabase() {
return database;
}
@@ -107,6 +111,14 @@ public class TableSchema {
return tableBuckets;
}
+ public Tuple2<String, String> getPartitionInfo() {
+ return partitionInfo;
+ }
+
+ public void setPartitionInfo(Tuple2<String, String> partitionInfo) {
+ this.partitionInfo = partitionInfo;
+ }
+
@Override
public String toString() {
return "TableSchema{"
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
index 61beea19..e512d790 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
@@ -172,7 +172,12 @@ public class CdcTools {
.setIgnoreIncompatible(ignoreIncompatible)
.setSchemaChangeMode(schemaChangeMode)
.create();
- databaseSync.build();
+
+ boolean needExecute = databaseSync.build();
+ if (!needExecute) {
+ // create table only
+ return;
+ }
if (StringUtils.isNullOrWhitespaceOnly(jobName)) {
jobName =
String.format(
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 7cd29506..dea4422c 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -113,7 +113,7 @@ public abstract class DatabaseSync {
this.converter = new TableNameConverter(tablePrefix, tableSuffix,
multiToOneRulesPattern);
}
- public void build() throws Exception {
+ public boolean build() throws Exception {
DorisConnectionOptions options = getDorisConnectionOptions();
DorisSystem dorisSystem = new DorisSystem(options);
@@ -156,7 +156,7 @@ public abstract class DatabaseSync {
}
if (createTableOnly) {
System.out.println("Create table finished.");
- System.exit(0);
+ return false;
}
LOG.info("table mapping: {}", tableMapping);
config.setString(TABLE_NAME_OPTIONS, getSyncTableList(syncTables));
@@ -181,6 +181,7 @@ public abstract class DatabaseSync {
.uid(uidName);
}
}
+ return true;
}
/**
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java
index 6318fc8a..6f5d929e 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java
@@ -18,6 +18,7 @@
package org.apache.doris.flink.tools.cdc;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
import java.io.Serializable;
import java.util.HashMap;
@@ -30,11 +31,14 @@ public class DorisTableConfig implements Serializable {
// PROPERTIES parameter in doris table creation statement. such as:
replication_num=1.
public static final String REPLICATION_NUM = "replication_num";
public static final String TABLE_BUCKETS = "table-buckets";
+ public static final String TABLE_PARTITIONS = "table-partitions";
private final Map<String, String> tableProperties;
// The specific parameters extracted from --table-conf need to be parsed
and integrated into the
// doris table creation statement. such as:
table-buckets="tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50".
private Map<String, Integer> tableBuckets;
+ // table:partitionColumn:interval
+ private Map<String, Tuple2<String, String>> tablePartitions;
// Only for testing
@VisibleForTesting
@@ -55,6 +59,11 @@ public class DorisTableConfig implements Serializable {
this.tableBuckets =
buildTableBucketMap(tableConfig.get(TABLE_BUCKETS));
tableConfig.remove(TABLE_BUCKETS);
}
+ if (tableConfig.containsKey(TABLE_PARTITIONS)) {
+ this.tablePartitions =
buildTablePartitionMap(tableConfig.get(TABLE_PARTITIONS));
+ tableConfig.remove(TABLE_PARTITIONS);
+ }
+
tableProperties = tableConfig;
}
@@ -66,6 +75,10 @@ public class DorisTableConfig implements Serializable {
return tableProperties;
}
+ public Map<String, Tuple2<String, String>> getTablePartitions() {
+ return tablePartitions;
+ }
+
/**
* Build table bucket Map.
*
@@ -83,4 +96,23 @@ public class DorisTableConfig implements Serializable {
}
return tableBucketsMap;
}
+
+ /**
+ * Build table partition Map.
+ *
+ * @param tablePartitions the string of tablePartitions,
+ * eg:tbl1:dt_column:month,tb2:dt_column:day
+ * @return The table name and buckets map. The key is table name, the
value is partition column
+ * and interval.
+ */
+ @VisibleForTesting
+ public Map<String, Tuple2<String, String>> buildTablePartitionMap(String
tablePartitions) {
+ Map<String, Tuple2<String, String>> tablePartitionMap = new
LinkedHashMap<>();
+ String[] tablePartitionArray = tablePartitions.split(",");
+ for (String tablePartition : tablePartitionArray) {
+ String[] tp = tablePartition.split(":");
+ tablePartitionMap.put(tp[0].trim(), Tuple2.of(tp[1].trim(),
tp[2].trim()));
+ }
+ return tablePartitionMap;
+ }
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractE2EService.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractE2EService.java
index ec536ee6..cd3965a3 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractE2EService.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractE2EService.java
@@ -32,6 +32,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -95,6 +96,9 @@ public abstract class AbstractE2EService extends
AbstractContainerTestBase {
LOG.info("{} e2e job will submit to start. ", jobName);
CdcTools.setStreamExecutionEnvironmentForTesting(configFlinkEnvironment());
CdcTools.main(args);
+ if (Arrays.asList(args).contains("--create-table-only")) {
+ return;
+ }
jobClient = CdcTools.getJobClient();
if (Objects.isNull(jobClient)) {
LOG.warn("Failed get flink job client. jobName={}", jobName);
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java
index 938aa218..fe715f62 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java
@@ -21,11 +21,14 @@ import org.apache.doris.flink.container.AbstractE2EService;
import org.apache.doris.flink.container.ContainerUtils;
import org.apache.doris.flink.tools.cdc.DatabaseSyncConfig;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.sql.ResultSet;
+import java.sql.Statement;
import java.util.Arrays;
import java.util.List;
@@ -381,6 +384,58 @@ public class Mysql2DorisE2ECase extends AbstractE2EService
{
cancelE2EJob(jobName);
}
+ @Test
+ public void testMySQL2DorisCreateTableOnly() throws Exception {
+ String jobName = "testMySQL2DorisCreateTableOnly";
+ initEnvironment(jobName,
"container/e2e/mysql2doris/testMySQL2DorisCreateTable_init.sql");
+ startMysql2DorisJob(jobName,
"container/e2e/mysql2doris/testMySQL2DorisCreateTable.txt");
+
+ String createTblSQL = getCreateTableSQL(DATABASE, "create_tbl_uniq");
+ Assert.assertTrue(createTblSQL.contains("UNIQUE KEY(`id`)"));
+ Assert.assertTrue(createTblSQL.contains("BUCKETS 10"));
+
+ createTblSQL = getCreateTableSQL(DATABASE, "create_tbl_dup");
+ Assert.assertTrue(createTblSQL.contains("DUPLICATE KEY(`id`,
`name`)"));
+ Assert.assertTrue(createTblSQL.contains("BUCKETS AUTO"));
+
+ createTblSQL = getCreateTableSQL(DATABASE,
"create_tbl_from_uniqindex");
+ Assert.assertTrue(createTblSQL.contains("UNIQUE KEY(`name`)"));
+ Assert.assertTrue(createTblSQL.contains("BUCKETS 30"));
+
+ createTblSQL = getCreateTableSQL(DATABASE,
"create_tbl_from_uniqindex2");
+ Assert.assertTrue(
+ createTblSQL.contains("UNIQUE KEY(`id`, `name`)")
+ || createTblSQL.contains("UNIQUE KEY(`id`, `age`)"));
+ Assert.assertTrue(createTblSQL.contains("BUCKETS 30"));
+
+ createTblSQL = getCreateTableSQL(DATABASE,
"create_tbl_from_multiindex");
+ Assert.assertTrue(createTblSQL.contains("UNIQUE KEY(`id`)"));
+ Assert.assertTrue(createTblSQL.contains("BUCKETS AUTO"));
+
+ /*
+ The auto partition behavior of doris 2.1.0 to 2.1.4 has changed,
temporarily skipped
+ createTblSQL = getCreateTableSQL(DATABASE, "create_tbl_part_uniq");
+ Assert.assertTrue(createTblSQL.contains("UNIQUE KEY(`id`,
`create_dtime`)"));
+ Assert.assertTrue(createTblSQL.contains("BUCKETS AUTO"));
+
+ createTblSQL = getCreateTableSQL(DATABASE, "create_tbl_part_dup");
+ Assert.assertTrue(createTblSQL.contains("DUPLICATE KEY(`id`,
`create_dtime`, `name`)"));
+ Assert.assertTrue(createTblSQL.contains("BUCKETS AUTO"));
+ */
+ }
+
+ private String getCreateTableSQL(String database, String table) throws
Exception {
+ Statement statement = getDorisQueryConnection().createStatement();
+ ResultSet resultSet =
+ statement.executeQuery(String.format("SHOW CREATE TABLE
%s.%s", database, table));
+ while (resultSet.next()) {
+ String createTblSql = resultSet.getString(2);
+ LOG.info("Create table sql: {}", createTblSql.replace("\n", ""));
+ return createTblSql;
+ }
+ throw new RuntimeException("Table not exist " + table);
+ }
+
@After
public void close() {
try {
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DorisTableConfigTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DorisTableConfigTest.java
index 1a82e1c6..804e5dbd 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DorisTableConfigTest.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DorisTableConfigTest.java
@@ -17,6 +17,8 @@
package org.apache.doris.flink.tools.cdc;
+import org.apache.flink.api.java.tuple.Tuple2;
+
import org.junit.Before;
import org.junit.Test;
@@ -43,4 +45,14 @@ public class DorisTableConfigTest {
assertEquals(40, tableBucketsMap.get("b.*").intValue());
assertEquals(50, tableBucketsMap.get(".*").intValue());
}
+
+ @Test
+ public void buildTablePartitionMapTest() {
+ String tablePartitions =
"tbl1:dt_col_d:day,tbl2:dt_col_w:week,tbl3:dt_col_m:month";
+ Map<String, Tuple2<String, String>> tablePartitionMap =
+ dorisTableConfig.buildTablePartitionMap(tablePartitions);
+ assertEquals(Tuple2.of("dt_col_d", "day"),
tablePartitionMap.get("tbl1"));
+ assertEquals(Tuple2.of("dt_col_w", "week"),
tablePartitionMap.get("tbl2"));
+ assertEquals(Tuple2.of("dt_col_m", "month"),
tablePartitionMap.get("tbl3"));
+ }
}
diff --git
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable.txt
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable.txt
new file mode 100644
index 00000000..632c3735
--- /dev/null
+++
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable.txt
@@ -0,0 +1,5 @@
+mysql-sync-database
+ --including-tables "create_tbl_.*"
+ --create-table-only
+ --table-conf
table-buckets=create_tbl_uniq:10,create_tbl_from_uniqindex.*:30
+ --table-conf replication_num=1
\ No newline at end of file
diff --git
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable_init.sql
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable_init.sql
new file mode 100644
index 00000000..cc3c16a6
--- /dev/null
+++
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable_init.sql
@@ -0,0 +1,59 @@
+CREATE DATABASE if NOT EXISTS test_e2e_mysql;
+DROP TABLE IF EXISTS test_e2e_mysql.create_tbl_uniq;
+CREATE TABLE test_e2e_mysql.create_tbl_uniq (
+`id` int NOT NULL,
+`name` varchar(255) DEFAULT NULL,
+`age` bigint DEFAULT NULL,
+PRIMARY KEY (`id`) USING BTREE
+);
+
+DROP TABLE IF EXISTS test_e2e_mysql.create_tbl_dup;
+CREATE TABLE test_e2e_mysql.create_tbl_dup (
+`id` int NOT NULL,
+`name` varchar(255) DEFAULT NULL,
+`age` bigint DEFAULT NULL
+);
+
+DROP TABLE IF EXISTS test_e2e_mysql.create_tbl_from_uniqindex;
+CREATE TABLE test_e2e_mysql.create_tbl_from_uniqindex (
+`id` int NOT NULL,
+`name` varchar(255) DEFAULT NULL,
+`age` bigint DEFAULT NULL,
+UNIQUE KEY `uniq` (`name`)
+);
+
+DROP TABLE IF EXISTS test_e2e_mysql.create_tbl_from_uniqindex2;
+CREATE TABLE test_e2e_mysql.create_tbl_from_uniqindex2 (
+`id` int DEFAULT NULL,
+`name` varchar(255) DEFAULT NULL,
+`age` int DEFAULT NULL,
+UNIQUE KEY `idname_uniq` (`id`,`name`),
+UNIQUE KEY `idage_uniq` (`id`,`age`)
+);
+
+DROP TABLE IF EXISTS test_e2e_mysql.create_tbl_from_multiindex;
+CREATE TABLE test_e2e_mysql.create_tbl_from_multiindex (
+`id` int DEFAULT NULL,
+`name` varchar(255) DEFAULT NULL,
+`age` int DEFAULT NULL,
+UNIQUE KEY `uniq` (`id`),
+KEY `normal` (`name`)
+);
+
+-- for auto partition table
+DROP TABLE IF EXISTS test_e2e_mysql.create_tbl_part_uniq;
+CREATE TABLE test_e2e_mysql.create_tbl_part_uniq (
+`id` int NOT NULL,
+`name` varchar(255) DEFAULT NULL,
+`age` int DEFAULT NULL,
+`create_dtime` datetime DEFAULT NULL,
+PRIMARY KEY (`id`) USING BTREE
+);
+
+DROP TABLE IF EXISTS test_e2e_mysql.create_tbl_part_dup;
+CREATE TABLE test_e2e_mysql.create_tbl_part_dup (
+`id` int NOT NULL,
+`name` varchar(255) DEFAULT NULL,
+`age` int DEFAULT NULL,
+`create_dtime` datetime DEFAULT NULL
+);
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]