This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/cdc-multiple-table by this 
push:
     new bc9cd6bf6 [Improve] Improve StarRocks Auto Create Table (#4208)
bc9cd6bf6 is described below

commit bc9cd6bf6950142dc3c9059eb5bdaa1299779296
Author: Hisoka <[email protected]>
AuthorDate: Fri Feb 24 15:54:36 2023 +0800

    [Improve] Improve StarRocks Auto Create Table (#4208)
---
 .../starrocks/sink/StarRocksSaveModeUtil.java      | 56 +++++++++++++++---
 .../seatunnel/starrocks/sink/StarRocksSink.java    | 11 ++--
 .../starrocks/sink/StarRocksSinkFactory.java       |  2 +-
 .../starrocks/util/CreateTableParser.java          | 67 ++++++++++++++++++++++
 4 files changed, 121 insertions(+), 15 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
index 93be4e53f..15e71cf2a 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
@@ -25,28 +25,68 @@ import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.ArrayType;
 import org.apache.seatunnel.api.table.type.DecimalType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SqlType;
+import 
org.apache.seatunnel.connectors.seatunnel.starrocks.util.CreateTableParser;
 
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 public class StarRocksSaveModeUtil {
 
+    private static final List<SqlType> UNSUPPORTED_DEFAULT_VALUE_TYPES = 
Arrays.asList(SqlType.ARRAY,
+        SqlType.MAP, SqlType.ROW, SqlType.MULTIPLE_ROW, SqlType.BOOLEAN);
+
     public static String fillingCreateSql(String template, String database, 
String table, TableSchema tableSchema) {
         String primaryKey = 
tableSchema.getPrimaryKey().getColumnNames().stream().map(r -> "`" + r + 
"`").collect(Collectors.joining(","));
-        String rowTypeFields = 
tableSchema.getColumns().stream().map(StarRocksSaveModeUtil::columnToStarrocksType)
+
+        Map<String, CreateTableParser.ColumnInfo> columnInTemplate = 
CreateTableParser.getColumnList(template);
+        template = mergeColumnInTemplate(columnInTemplate, tableSchema, 
template);
+
+        String rowTypeFields = tableSchema.getColumns().stream().filter(column 
-> 
!columnInTemplate.containsKey(column.getName())).map(StarRocksSaveModeUtil::columnToStarrocksType)
             .collect(Collectors.joining(",\n"));
-        return template.replaceAll(String.format("${%s}", 
SaveModeConstants.DATABASE), database)
-            .replaceAll(String.format("${%s}", SaveModeConstants.TABLE_NAME), 
table)
-            .replaceAll(String.format("${%s}", 
SaveModeConstants.ROWTYPE_FIELDS), rowTypeFields)
-            .replaceAll(String.format("${%s}", 
SaveModeConstants.ROWTYPE_PRIMARY_KEY), primaryKey);
+        return template.replaceAll(String.format("\\$\\{%s\\}", 
SaveModeConstants.DATABASE), database)
+            .replaceAll(String.format("\\$\\{%s\\}", 
SaveModeConstants.TABLE_NAME), table)
+            .replaceAll(String.format("\\$\\{%s\\}", 
SaveModeConstants.ROWTYPE_FIELDS), rowTypeFields)
+            .replaceAll(String.format("\\$\\{%s\\}", 
SaveModeConstants.ROWTYPE_PRIMARY_KEY), primaryKey);
     }
 
-    static String columnToStarrocksType(Column column) {
+    private static String columnToStarrocksType(Column column) {
         checkNotNull(column, "The column is required.");
         return String.format("`%s` %s %s %s", column.getName(), 
dataTypeToStarrocksType(column.getDataType()),
-            column.isNullable() ? "NULL" : "NOT NULL", 
column.getDefaultValue() == null ? "" : column.getDefaultValue().toString());
+            column.isNullable() ? "NULL" : "NOT NULL", 
getDefaultValue(column));
+    }
+
+    private static String mergeColumnInTemplate(Map<String, 
CreateTableParser.ColumnInfo> columnInTemplate, TableSchema tableSchema, String 
template) {
+        int offset = 0;
+        Map<String, Column> columnMap = 
tableSchema.getColumns().stream().collect(Collectors.toMap(Column::getName, 
Function.identity()));
+        for (String col : columnInTemplate.keySet()) {
+            CreateTableParser.ColumnInfo columnInfo = 
columnInTemplate.get(col);
+            if (StringUtils.isEmpty(columnInfo.getInfo())) {
+                if (columnMap.containsKey(col)) {
+                    Column column = columnMap.get(col);
+                    String newCol = columnToStarrocksType(column);
+                    template = template.substring(0, columnInfo.getIndex() + 
offset - columnInfo.getName().length()) +
+                        newCol + template.substring(offset + 
columnInfo.getIndex());
+                    offset += newCol.length() - columnInfo.getName().length();
+                }
+            }
+        }
+        return template;
+    }
+
+    private static String getDefaultValue(Column column) {
+        if (column.getDefaultValue() != null || 
UNSUPPORTED_DEFAULT_VALUE_TYPES.contains(column.getDataType().getSqlType())) {
+            return "DEFAULT '" + column.getDefaultValue().toString() + "'";
+        }
+        return "";
     }
 
-    static String dataTypeToStarrocksType(SeaTunnelDataType<?> dataType) {
+    private static String dataTypeToStarrocksType(SeaTunnelDataType<?> 
dataType) {
         checkNotNull(dataType, "The SeaTunnel's data type is required.");
         switch (dataType.getSqlType()) {
             case NULL:
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
index add3967ca..f30b8d13b 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
@@ -52,12 +52,15 @@ public class StarRocksSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> implem
     private SinkConfig sinkConfig;
     private DataSaveMode dataSaveMode;
 
+    private CatalogTable catalogTable;
+
     public StarRocksSink(DataSaveMode dataSaveMode,
                          SinkConfig sinkConfig,
-                         SeaTunnelRowType seaTunnelRowType) {
+                         CatalogTable catalogTable) {
         this.dataSaveMode = dataSaveMode;
         this.sinkConfig = sinkConfig;
-        this.seaTunnelRowType = seaTunnelRowType;
+        this.seaTunnelRowType = 
catalogTable.getTableSchema().toPhysicalRowDataType();
+        this.catalogTable = catalogTable;
     }
 
     @Override
@@ -69,8 +72,6 @@ public class StarRocksSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> implem
     public void prepare(Config pluginConfig) throws PrepareFailException {
         ConfigValidator.of(ReadonlyConfig.fromConfig(pluginConfig))
             .validate(new StarRocksCatalogFactory().optionRule());
-        // TODO get catalog Table
-        CatalogTable catalogTable = null;
         sinkConfig = SinkConfig.of(ReadonlyConfig.fromConfig(pluginConfig));
         if (StringUtils.isEmpty(sinkConfig.getTable())) {
             sinkConfig.setTable(catalogTable.getTableId().getTableName());
@@ -84,8 +85,6 @@ public class StarRocksSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> implem
         if (!starRocksCatalog.databaseExists(sinkConfig.getDatabase())) {
             
starRocksCatalog.createDatabase(TablePath.of(sinkConfig.getDatabase(), ""), 
true);
         }
-        // TODO get catalog Table
-        CatalogTable catalogTable = null;
         if 
(!starRocksCatalog.tableExists(TablePath.of(sinkConfig.getDatabase(), 
sinkConfig.getTable()))) {
             
starRocksCatalog.createTable(StarRocksSaveModeUtil.fillingCreateSql(template, 
sinkConfig.getDatabase(), sinkConfig.getTable(), 
catalogTable.getTableSchema()));
         }
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
index 5bbd57142..54220278b 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
@@ -60,6 +60,6 @@ public class StarRocksSinkFactory implements TableSinkFactory 
{
         }
         return () -> new StarRocksSink(DataSaveMode.KEEP_SCHEMA_AND_DATA,
             sinkConfig,
-            catalogTable.getTableSchema().toPhysicalRowDataType());
+            catalogTable);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/util/CreateTableParser.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/util/CreateTableParser.java
new file mode 100644
index 000000000..4a8e6db40
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/util/CreateTableParser.java
@@ -0,0 +1,67 @@
+package org.apache.seatunnel.connectors.seatunnel.starrocks.util;
+
+import lombok.Getter;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class CreateTableParser {
+
+    private static final Pattern COLUMN_PATTERN = 
Pattern.compile("`?(\\w+)`?\\s*([\\w|\\W]*)");
+
+    public static Map<String, ColumnInfo> getColumnList(String createTableSql) 
{
+        Map<String, ColumnInfo> columns = new HashMap<>();
+        StringBuilder columnBuilder = new StringBuilder();
+        int startIndex = createTableSql.indexOf("(");
+        createTableSql = createTableSql.substring(startIndex + 1);
+
+        boolean insideParentheses = false;
+        for (int i = 0; i < createTableSql.length(); i++) {
+            char c = createTableSql.charAt(i);
+            if (c == '(') {
+                insideParentheses = true;
+                columnBuilder.append(c);
+            } else if ((c == ',' || c == ')') && !insideParentheses) {
+                parseColumn(columnBuilder.toString(), columns, startIndex + i);
+                columnBuilder.setLength(0);
+            } else if (c == ')') {
+                insideParentheses = false;
+                columnBuilder.append(c);
+            } else {
+                columnBuilder.append(c);
+            }
+        }
+        return columns;
+    }
+
+    private static void parseColumn(String columnString, Map<String, 
ColumnInfo> columnList, int index) {
+        Matcher matcher = COLUMN_PATTERN.matcher(columnString.trim());
+        if (matcher.matches()) {
+            String columnName = matcher.group(1);
+            String otherInfo = matcher.group(2).trim();
+            StringBuilder columnBuilder = new 
StringBuilder(columnName).append(" ").append(otherInfo);
+            if (columnBuilder.toString().toUpperCase().contains("PRIMARY KEY") 
||
+                columnBuilder.toString().toUpperCase().contains("CREATE 
TABLE")) {
+                return;
+            }
+            columnList.put(columnName, new ColumnInfo(columnName, otherInfo, 
index));
+        }
+    }
+
+    @Getter
+    public static final class ColumnInfo {
+
+        public ColumnInfo(String name, String info, int index) {
+            this.name = name;
+            this.info = info;
+            this.index = index;
+        }
+
+        String name;
+        String info;
+        int index;
+    }
+
+}

Reply via email to