This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/cdc-multiple-table by this
push:
new bc9cd6bf6 [Improve] Improve StarRocks Auto Create Table (#4208)
bc9cd6bf6 is described below
commit bc9cd6bf6950142dc3c9059eb5bdaa1299779296
Author: Hisoka <[email protected]>
AuthorDate: Fri Feb 24 15:54:36 2023 +0800
[Improve] Improve StarRocks Auto Create Table (#4208)
---
.../starrocks/sink/StarRocksSaveModeUtil.java | 56 +++++++++++++++---
.../seatunnel/starrocks/sink/StarRocksSink.java | 11 ++--
.../starrocks/sink/StarRocksSinkFactory.java | 2 +-
.../starrocks/util/CreateTableParser.java | 67 ++++++++++++++++++++++
4 files changed, 121 insertions(+), 15 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
index 93be4e53f..15e71cf2a 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
@@ -25,28 +25,68 @@ import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SqlType;
+import
org.apache.seatunnel.connectors.seatunnel.starrocks.util.CreateTableParser;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
import java.util.stream.Collectors;
public class StarRocksSaveModeUtil {
+ private static final List<SqlType> UNSUPPORTED_DEFAULT_VALUE_TYPES =
Arrays.asList(SqlType.ARRAY,
+ SqlType.MAP, SqlType.ROW, SqlType.MULTIPLE_ROW, SqlType.BOOLEAN);
+
public static String fillingCreateSql(String template, String database,
String table, TableSchema tableSchema) {
String primaryKey =
tableSchema.getPrimaryKey().getColumnNames().stream().map(r -> "`" + r +
"`").collect(Collectors.joining(","));
- String rowTypeFields =
tableSchema.getColumns().stream().map(StarRocksSaveModeUtil::columnToStarrocksType)
+
+ Map<String, CreateTableParser.ColumnInfo> columnInTemplate =
CreateTableParser.getColumnList(template);
+ template = mergeColumnInTemplate(columnInTemplate, tableSchema,
template);
+
+ String rowTypeFields = tableSchema.getColumns().stream().filter(column
->
!columnInTemplate.containsKey(column.getName())).map(StarRocksSaveModeUtil::columnToStarrocksType)
.collect(Collectors.joining(",\n"));
- return template.replaceAll(String.format("${%s}",
SaveModeConstants.DATABASE), database)
- .replaceAll(String.format("${%s}", SaveModeConstants.TABLE_NAME),
table)
- .replaceAll(String.format("${%s}",
SaveModeConstants.ROWTYPE_FIELDS), rowTypeFields)
- .replaceAll(String.format("${%s}",
SaveModeConstants.ROWTYPE_PRIMARY_KEY), primaryKey);
+ return template.replaceAll(String.format("\\$\\{%s\\}",
SaveModeConstants.DATABASE), database)
+ .replaceAll(String.format("\\$\\{%s\\}",
SaveModeConstants.TABLE_NAME), table)
+ .replaceAll(String.format("\\$\\{%s\\}",
SaveModeConstants.ROWTYPE_FIELDS), rowTypeFields)
+ .replaceAll(String.format("\\$\\{%s\\}",
SaveModeConstants.ROWTYPE_PRIMARY_KEY), primaryKey);
}
- static String columnToStarrocksType(Column column) {
+ private static String columnToStarrocksType(Column column) {
checkNotNull(column, "The column is required.");
return String.format("`%s` %s %s %s", column.getName(),
dataTypeToStarrocksType(column.getDataType()),
- column.isNullable() ? "NULL" : "NOT NULL",
column.getDefaultValue() == null ? "" : column.getDefaultValue().toString());
+ column.isNullable() ? "NULL" : "NOT NULL",
getDefaultValue(column));
+ }
+
+ private static String mergeColumnInTemplate(Map<String,
CreateTableParser.ColumnInfo> columnInTemplate, TableSchema tableSchema, String
template) {
+ int offset = 0;
+ Map<String, Column> columnMap =
tableSchema.getColumns().stream().collect(Collectors.toMap(Column::getName,
Function.identity()));
+ for (String col : columnInTemplate.keySet()) {
+ CreateTableParser.ColumnInfo columnInfo =
columnInTemplate.get(col);
+ if (StringUtils.isEmpty(columnInfo.getInfo())) {
+ if (columnMap.containsKey(col)) {
+ Column column = columnMap.get(col);
+ String newCol = columnToStarrocksType(column);
+ template = template.substring(0, columnInfo.getIndex() +
offset - columnInfo.getName().length()) +
+ newCol + template.substring(offset +
columnInfo.getIndex());
+ offset += newCol.length() - columnInfo.getName().length();
+ }
+ }
+ }
+ return template;
+ }
+
+ private static String getDefaultValue(Column column) {
+ if (column.getDefaultValue() != null ||
UNSUPPORTED_DEFAULT_VALUE_TYPES.contains(column.getDataType().getSqlType())) {
+ return "DEFAULT '" + column.getDefaultValue().toString() + "'";
+ }
+ return "";
}
- static String dataTypeToStarrocksType(SeaTunnelDataType<?> dataType) {
+ private static String dataTypeToStarrocksType(SeaTunnelDataType<?>
dataType) {
checkNotNull(dataType, "The SeaTunnel's data type is required.");
switch (dataType.getSqlType()) {
case NULL:
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
index add3967ca..f30b8d13b 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
@@ -52,12 +52,15 @@ public class StarRocksSink extends
AbstractSimpleSink<SeaTunnelRow, Void> implem
private SinkConfig sinkConfig;
private DataSaveMode dataSaveMode;
+ private CatalogTable catalogTable;
+
public StarRocksSink(DataSaveMode dataSaveMode,
SinkConfig sinkConfig,
- SeaTunnelRowType seaTunnelRowType) {
+ CatalogTable catalogTable) {
this.dataSaveMode = dataSaveMode;
this.sinkConfig = sinkConfig;
- this.seaTunnelRowType = seaTunnelRowType;
+ this.seaTunnelRowType =
catalogTable.getTableSchema().toPhysicalRowDataType();
+ this.catalogTable = catalogTable;
}
@Override
@@ -69,8 +72,6 @@ public class StarRocksSink extends
AbstractSimpleSink<SeaTunnelRow, Void> implem
public void prepare(Config pluginConfig) throws PrepareFailException {
ConfigValidator.of(ReadonlyConfig.fromConfig(pluginConfig))
.validate(new StarRocksCatalogFactory().optionRule());
- // TODO get catalog Table
- CatalogTable catalogTable = null;
sinkConfig = SinkConfig.of(ReadonlyConfig.fromConfig(pluginConfig));
if (StringUtils.isEmpty(sinkConfig.getTable())) {
sinkConfig.setTable(catalogTable.getTableId().getTableName());
@@ -84,8 +85,6 @@ public class StarRocksSink extends
AbstractSimpleSink<SeaTunnelRow, Void> implem
if (!starRocksCatalog.databaseExists(sinkConfig.getDatabase())) {
starRocksCatalog.createDatabase(TablePath.of(sinkConfig.getDatabase(), ""),
true);
}
- // TODO get catalog Table
- CatalogTable catalogTable = null;
if
(!starRocksCatalog.tableExists(TablePath.of(sinkConfig.getDatabase(),
sinkConfig.getTable()))) {
starRocksCatalog.createTable(StarRocksSaveModeUtil.fillingCreateSql(template,
sinkConfig.getDatabase(), sinkConfig.getTable(),
catalogTable.getTableSchema()));
}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
index 5bbd57142..54220278b 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
@@ -60,6 +60,6 @@ public class StarRocksSinkFactory implements TableSinkFactory
{
}
return () -> new StarRocksSink(DataSaveMode.KEEP_SCHEMA_AND_DATA,
sinkConfig,
- catalogTable.getTableSchema().toPhysicalRowDataType());
+ catalogTable);
}
}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/util/CreateTableParser.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/util/CreateTableParser.java
new file mode 100644
index 000000000..4a8e6db40
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/util/CreateTableParser.java
@@ -0,0 +1,67 @@
+package org.apache.seatunnel.connectors.seatunnel.starrocks.util;
+
+import lombok.Getter;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class CreateTableParser {
+
+ private static final Pattern COLUMN_PATTERN =
Pattern.compile("`?(\\w+)`?\\s*([\\w|\\W]*)");
+
+ public static Map<String, ColumnInfo> getColumnList(String createTableSql)
{
+ Map<String, ColumnInfo> columns = new HashMap<>();
+ StringBuilder columnBuilder = new StringBuilder();
+ int startIndex = createTableSql.indexOf("(");
+ createTableSql = createTableSql.substring(startIndex + 1);
+
+ boolean insideParentheses = false;
+ for (int i = 0; i < createTableSql.length(); i++) {
+ char c = createTableSql.charAt(i);
+ if (c == '(') {
+ insideParentheses = true;
+ columnBuilder.append(c);
+ } else if ((c == ',' || c == ')') && !insideParentheses) {
+ parseColumn(columnBuilder.toString(), columns, startIndex + i);
+ columnBuilder.setLength(0);
+ } else if (c == ')') {
+ insideParentheses = false;
+ columnBuilder.append(c);
+ } else {
+ columnBuilder.append(c);
+ }
+ }
+ return columns;
+ }
+
+ private static void parseColumn(String columnString, Map<String,
ColumnInfo> columnList, int index) {
+ Matcher matcher = COLUMN_PATTERN.matcher(columnString.trim());
+ if (matcher.matches()) {
+ String columnName = matcher.group(1);
+ String otherInfo = matcher.group(2).trim();
+ StringBuilder columnBuilder = new
StringBuilder(columnName).append(" ").append(otherInfo);
+ if (columnBuilder.toString().toUpperCase().contains("PRIMARY KEY")
||
+ columnBuilder.toString().toUpperCase().contains("CREATE
TABLE")) {
+ return;
+ }
+ columnList.put(columnName, new ColumnInfo(columnName, otherInfo,
index));
+ }
+ }
+
+ @Getter
+ public static final class ColumnInfo {
+
+ public ColumnInfo(String name, String info, int index) {
+ this.name = name;
+ this.info = info;
+ this.index = index;
+ }
+
+ String name;
+ String info;
+ int index;
+ }
+
+}