This is an automated email from the ASF dual-hosted git repository.

zongwen pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/cdc-multiple-table by this 
push:
     new c485d887e [feature][starrocks] add StarRocks factories (#4191)
c485d887e is described below

commit c485d887ecae6c1a356e86e837313d262d2d96b5
Author: Zongwen Li <[email protected]>
AuthorDate: Thu Feb 23 10:50:05 2023 +0800

    [feature][starrocks] add StarRocks factories (#4191)
---
 .../StarRocksCatalogFactory.java}                  |  33 ++--
 .../seatunnel/starrocks/config/SinkConfig.java     | 195 +++------------------
 .../starrocks/config/StarRocksOptions.java         |  40 +++++
 .../starrocks/config/StarRocksSinkOptions.java     | 116 ++++++++++++
 .../seatunnel/starrocks/sink/StarRocksSink.java    |  36 ++--
 .../starrocks/sink/StarRocksSinkFactory.java       |  31 +++-
 6 files changed, 245 insertions(+), 206 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogFactory.java
similarity index 54%
copy from 
seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
copy to 
seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogFactory.java
index 8863c3115..e17b8591a 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogFactory.java
@@ -15,30 +15,35 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.starrocks.sink;
+package org.apache.seatunnel.connectors.seatunnel.starrocks.catalog;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
-import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableSinkFactory;
-import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksOptions;
 
-import com.google.auto.service.AutoService;
+public class StarRocksCatalogFactory implements CatalogFactory {
+    public static final String IDENTIFIER = "StarRocks";
+    @Override
+    public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
+        return new StarRocksCatalog(catalogName,
+            options.get(StarRocksOptions.USERNAME),
+            options.get(StarRocksOptions.PASSWORD),
+            options.get(StarRocksOptions.BASE_URL));
+    }
 
-@AutoService(Factory.class)
-public class StarRocksSinkFactory implements TableSinkFactory {
     @Override
     public String factoryIdentifier() {
-        return "StarRocks";
+        return IDENTIFIER;
     }
 
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-            .required(SinkConfig.NODE_URLS, SinkConfig.USERNAME, 
SinkConfig.PASSWORD, SinkConfig.DATABASE, SinkConfig.QUERY_PORT)
-            .optional(SinkConfig.TABLE, SinkConfig.LABEL_PREFIX, 
SinkConfig.BATCH_MAX_SIZE, SinkConfig.BATCH_MAX_BYTES,
-                SinkConfig.BATCH_INTERVAL_MS, SinkConfig.MAX_RETRIES, 
SinkConfig.MAX_RETRY_BACKOFF_MS,
-                SinkConfig.RETRY_BACKOFF_MULTIPLIER_MS, 
SinkConfig.STARROCKS_CONFIG, SinkConfig.ENABLE_UPSERT_DELETE,
-                SinkConfig.SAVE_MODE_CREATE_TEMPLATE)
-                .build();
+            .required(StarRocksOptions.BASE_URL)
+            .required(StarRocksOptions.USERNAME)
+            .required(StarRocksOptions.PASSWORD)
+            .build();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
index 3a6212266..2095b9988 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
@@ -17,11 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.starrocks.config;
 
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 
 import lombok.Getter;
 import lombok.Setter;
@@ -37,108 +33,8 @@ import java.util.Map;
 @ToString
 public class SinkConfig implements Serializable {
 
-    private static final int DEFAULT_BATCH_MAX_SIZE = 1024;
-    private static final long DEFAULT_BATCH_BYTES = 5 * 1024 * 1024;
-
-    private static final String LOAD_FORMAT = "format";
-    private static final StreamLoadFormat DEFAULT_LOAD_FORMAT = 
StreamLoadFormat.CSV;
-    private static final String COLUMN_SEPARATOR = "column_separator";
-
-    public static final Option<List<String>> NODE_URLS = 
Options.key("nodeUrls")
-            .listType()
-            .noDefaultValue()
-            .withDescription("StarRocks cluster address, the format is 
[\"fe_ip:fe_http_port\", ...]");
-
-    public static final Option<String> USERNAME = Options.key("username")
-            .stringType()
-            .noDefaultValue()
-            .withDescription("StarRocks user username");
-
-    public static final Option<String> PASSWORD = Options.key("password")
-            .stringType()
-            .noDefaultValue()
-            .withDescription("StarRocks user password");
-
-    public static final Option<String> LABEL_PREFIX = 
Options.key("labelPrefix")
-            .stringType()
-            .noDefaultValue()
-            .withDescription("The prefix of StarRocks stream load label");
-
-    public static final Option<String> DATABASE = Options.key("database")
-        .stringType()
-        .noDefaultValue()
-        .withDescription("The name of StarRocks database");
-
-    public static final Option<String> TABLE = Options.key("table")
-        .stringType()
-        .noDefaultValue()
-        .withDescription("The name of StarRocks table");
-
-    public static final Option<Map<String, String>> STARROCKS_CONFIG = 
Options.key("starrocks.config")
-        .mapType()
-        .noDefaultValue()
-        .withDescription("The parameter of the stream load data_desc. " +
-            "The way to specify the parameter is to add the original stream 
load parameter into map");
-
-    public static final Option<String> SAVE_MODE_CREATE_TEMPLATE = 
Options.key("save_mode_create_template")
-        .stringType()
-        .defaultValue("CREATE TABLE IF NOT EXISTS 
`${database}`.`${table_name}` (\n" +
-            "${rowtype_fields}\n" +
-            ") ENGINE=OLAP\n" +
-            " PRIMARY KEY (${rowtype_primary_key})\n" +
-            "DISTRIBUTED BY HASH (${rowtype_primary_key})" +
-            "PROPERTIES (\n" +
-            "    \"replication_num\" = \"1\" \n" +
-            ")").withDescription("Create table statement template, used to 
create StarRocks table");
-
-    public static final Option<String> QUERY_PORT = Options.key("query_port")
-        .stringType()
-        .defaultValue("9030")
-        .withDescription("FE MySQL server port");
-
-    public static final Option<Integer> BATCH_MAX_SIZE = 
Options.key("batch_max_rows")
-        .intType()
-        .defaultValue(DEFAULT_BATCH_MAX_SIZE)
-        .withDescription("For batch writing, when the number of buffers 
reaches the number of batch_max_rows or the byte size of batch_max_bytes or the 
time reaches batch_interval_ms, the data will be flushed into the StarRocks");
-
-    public static final Option<Long> BATCH_MAX_BYTES = 
Options.key("batch_max_bytes")
-        .longType()
-        .defaultValue(DEFAULT_BATCH_BYTES)
-        .withDescription("For batch writing, when the number of buffers 
reaches the number of batch_max_rows or the byte size of batch_max_bytes or the 
time reaches batch_interval_ms, the data will be flushed into the StarRocks");
-
-    public static final Option<Integer> BATCH_INTERVAL_MS = 
Options.key("batch_interval_ms")
-            .intType()
-            .noDefaultValue()
-            .withDescription("For batch writing, when the number of buffers 
reaches the number of batch_max_rows or the byte size of batch_max_bytes or the 
time reaches batch_interval_ms, the data will be flushed into the StarRocks");
-
-    public static final Option<Integer> MAX_RETRIES = 
Options.key("max_retries")
-            .intType()
-            .noDefaultValue()
-            .withDescription("The number of retries to flush failed");
-
-    public static final Option<Integer> RETRY_BACKOFF_MULTIPLIER_MS = 
Options.key("retry_backoff_multiplier_ms")
-            .intType()
-            .noDefaultValue()
-            .withDescription("Using as a multiplier for generating the next 
delay for backoff");
-
-    public static final Option<Integer> MAX_RETRY_BACKOFF_MS = 
Options.key("max_retry_backoff_ms")
-            .intType()
-            .noDefaultValue()
-            .withDescription("The amount of time to wait before attempting to 
retry a request to StarRocks");
-
-    public static final Option<Boolean> ENABLE_UPSERT_DELETE = 
Options.key("enable_upsert_delete")
-        .booleanType()
-        .defaultValue(false)
-        .withDescription("Whether to enable upsert/delete, only supports 
PrimaryKey model.");
-
     public enum StreamLoadFormat {
         CSV, JSON;
-        public static StreamLoadFormat parse(String format) {
-            if (StreamLoadFormat.JSON.name().equals(format)) {
-                return JSON;
-            }
-            return CSV;
-        }
     }
 
     private List<String> nodeUrls;
@@ -149,10 +45,9 @@ public class SinkConfig implements Serializable {
     private String table;
     private String labelPrefix;
     private String columnSeparator;
-    private StreamLoadFormat loadFormat = DEFAULT_LOAD_FORMAT;
-
-    private int batchMaxSize = DEFAULT_BATCH_MAX_SIZE;
-    private long batchMaxBytes = DEFAULT_BATCH_BYTES;
+    private StreamLoadFormat loadFormat;
+    private int batchMaxSize;
+    private long batchMaxBytes;
 
     private Integer batchIntervalMs;
     private int maxRetries;
@@ -160,68 +55,34 @@ public class SinkConfig implements Serializable {
     private int maxRetryBackoffMs;
     private boolean enableUpsertDelete;
 
-    private String saveModeCreateTemplate = 
SAVE_MODE_CREATE_TEMPLATE.defaultValue();
+    private String saveModeCreateTemplate;
 
+    @Getter
     private final Map<String, Object> streamLoadProps = new HashMap<>();
 
-    public static SinkConfig loadConfig(Config pluginConfig) {
+    public static SinkConfig of(ReadonlyConfig config) {
         SinkConfig sinkConfig = new SinkConfig();
-        sinkConfig.setNodeUrls(pluginConfig.getStringList(NODE_URLS.key()));
-        sinkConfig.setDatabase(pluginConfig.getString(DATABASE.key()));
-        sinkConfig.setJdbcUrl("jdbc:mysql://" + 
sinkConfig.getNodeUrls().get(0).split(":")[0] +
-            ":" + pluginConfig.getString(QUERY_PORT.key()) + "/");
-        if (pluginConfig.hasPath(USERNAME.key())) {
-            sinkConfig.setUsername(pluginConfig.getString(USERNAME.key()));
-        }
-        if (pluginConfig.hasPath(TABLE.key())) {
-            sinkConfig.setTable(pluginConfig.getString(TABLE.key()));
-        }
-        if (pluginConfig.hasPath(PASSWORD.key())) {
-            sinkConfig.setPassword(pluginConfig.getString(PASSWORD.key()));
-        }
-        if (pluginConfig.hasPath(LABEL_PREFIX.key())) {
-            
sinkConfig.setLabelPrefix(pluginConfig.getString(LABEL_PREFIX.key()));
-        }
-        if (pluginConfig.hasPath(BATCH_MAX_SIZE.key())) {
-            
sinkConfig.setBatchMaxSize(pluginConfig.getInt(BATCH_MAX_SIZE.key()));
-        }
-        if (pluginConfig.hasPath(BATCH_MAX_BYTES.key())) {
-            
sinkConfig.setBatchMaxBytes(pluginConfig.getLong(BATCH_MAX_BYTES.key()));
-        }
-        if (pluginConfig.hasPath(BATCH_INTERVAL_MS.key())) {
-            
sinkConfig.setBatchIntervalMs(pluginConfig.getInt(BATCH_INTERVAL_MS.key()));
-        }
-        if (pluginConfig.hasPath(MAX_RETRIES.key())) {
-            sinkConfig.setMaxRetries(pluginConfig.getInt(MAX_RETRIES.key()));
-        }
-        if (pluginConfig.hasPath(RETRY_BACKOFF_MULTIPLIER_MS.key())) {
-            
sinkConfig.setRetryBackoffMultiplierMs(pluginConfig.getInt(RETRY_BACKOFF_MULTIPLIER_MS.key()));
-        }
-        if (pluginConfig.hasPath(MAX_RETRY_BACKOFF_MS.key())) {
-            
sinkConfig.setMaxRetryBackoffMs(pluginConfig.getInt(MAX_RETRY_BACKOFF_MS.key()));
-        }
-        if (pluginConfig.hasPath(ENABLE_UPSERT_DELETE.key())) {
-            
sinkConfig.setEnableUpsertDelete(pluginConfig.getBoolean(ENABLE_UPSERT_DELETE.key()));
-        }
-        if (pluginConfig.hasPath(SAVE_MODE_CREATE_TEMPLATE.key())) {
-            
sinkConfig.setSaveModeCreateTemplate(pluginConfig.getString(SAVE_MODE_CREATE_TEMPLATE.key()));
-        }
-        parseSinkStreamLoadProperties(pluginConfig, sinkConfig);
-        if (sinkConfig.streamLoadProps.containsKey(COLUMN_SEPARATOR)) {
-            sinkConfig.setColumnSeparator((String) 
sinkConfig.streamLoadProps.get(COLUMN_SEPARATOR));
-        }
-        if (sinkConfig.streamLoadProps.containsKey(LOAD_FORMAT)) {
-            sinkConfig.setLoadFormat(StreamLoadFormat.parse((String) 
sinkConfig.streamLoadProps.get(LOAD_FORMAT)));
-        }
+        sinkConfig.setNodeUrls(config.get(StarRocksSinkOptions.NODE_URLS));
+        sinkConfig.setDatabase(config.get(StarRocksSinkOptions.DATABASE));
+        sinkConfig.setJdbcUrl(String.format("jdbc:mysql://%s:%d",
+            sinkConfig.getNodeUrls().get(0).split(":")[0],
+            config.get(StarRocksSinkOptions.QUERY_PORT)));
+        
config.getOptional(StarRocksOptions.USERNAME).ifPresent(sinkConfig::setUsername);
+        
config.getOptional(StarRocksOptions.PASSWORD).ifPresent(sinkConfig::setPassword);
+        
config.getOptional(StarRocksSinkOptions.TABLE).ifPresent(sinkConfig::setTable);
+        
config.getOptional(StarRocksSinkOptions.LABEL_PREFIX).ifPresent(sinkConfig::setLabelPrefix);
+        
sinkConfig.setBatchMaxSize(config.get(StarRocksSinkOptions.BATCH_MAX_SIZE));
+        
sinkConfig.setBatchMaxBytes(config.get(StarRocksSinkOptions.BATCH_MAX_BYTES));
+        
config.getOptional(StarRocksSinkOptions.BATCH_INTERVAL_MS).ifPresent(sinkConfig::setBatchIntervalMs);
+        
config.getOptional(StarRocksSinkOptions.MAX_RETRIES).ifPresent(sinkConfig::setMaxRetries);
+        
config.getOptional(StarRocksSinkOptions.RETRY_BACKOFF_MULTIPLIER_MS).ifPresent(sinkConfig::setRetryBackoffMultiplierMs);
+        
config.getOptional(StarRocksSinkOptions.MAX_RETRY_BACKOFF_MS).ifPresent(sinkConfig::setMaxRetryBackoffMs);
+        
config.getOptional(StarRocksSinkOptions.ENABLE_UPSERT_DELETE).ifPresent(sinkConfig::setEnableUpsertDelete);
+        
sinkConfig.setSaveModeCreateTemplate(config.get(StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE));
+        
config.getOptional(StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE).ifPresent(sinkConfig::setSaveModeCreateTemplate);
+        
config.getOptional(StarRocksSinkOptions.STARROCKS_CONFIG).ifPresent(options -> 
sinkConfig.getStreamLoadProps().putAll(options));
+        
config.getOptional(StarRocksSinkOptions.COLUMN_SEPARATOR).ifPresent(sinkConfig::setColumnSeparator);
+        sinkConfig.setLoadFormat(config.get(StarRocksSinkOptions.LOAD_FORMAT));
         return sinkConfig;
     }
-
-    private static void parseSinkStreamLoadProperties(Config pluginConfig, 
SinkConfig sinkConfig) {
-        if (CheckConfigUtil.isValidParam(pluginConfig, 
STARROCKS_CONFIG.key())) {
-            pluginConfig.getObject(STARROCKS_CONFIG.key()).forEach((key, 
value) -> {
-                final String configKey = key.toLowerCase();
-                sinkConfig.streamLoadProps.put(configKey, value.unwrapped());
-            });
-        }
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksOptions.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksOptions.java
new file mode 100644
index 000000000..766a05759
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksOptions.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.starrocks.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public interface StarRocksOptions {
+    Option<String> BASE_URL = Options.key("base-url")
+        .stringType()
+        .noDefaultValue()
+        .withDescription("URL has to be without database, like 
\"jdbc:mysql://localhost:5432/\" or" +
+            "\"jdbc:mysql://localhost:5432\" rather than 
\"jdbc:mysql://localhost:5432/db\"");
+
+    Option<String> USERNAME = Options.key("username")
+        .stringType()
+        .noDefaultValue()
+        .withDescription("StarRocks user username");
+
+    Option<String> PASSWORD = Options.key("password")
+        .stringType()
+        .noDefaultValue()
+        .withDescription("StarRocks user password");
+
+}
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
new file mode 100644
index 000000000..5a00b0e76
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.starrocks.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig.StreamLoadFormat;
+
+import java.util.List;
+import java.util.Map;
+
+@SuppressWarnings("MagicNumber")
+public interface StarRocksSinkOptions {
+    Option<List<String>> NODE_URLS = Options.key("nodeUrls")
+        .listType()
+        .noDefaultValue()
+        .withDescription("StarRocks cluster address, the format is 
[\"fe_ip:fe_http_port\", ...]");
+
+    Option<String> LABEL_PREFIX = Options.key("labelPrefix")
+        .stringType()
+        .noDefaultValue()
+        .withDescription("The prefix of StarRocks stream load label");
+
+    Option<String> DATABASE = Options.key("database")
+        .stringType()
+        .noDefaultValue()
+        .withDescription("The name of StarRocks database");
+
+    Option<String> TABLE = Options.key("table")
+        .stringType()
+        .noDefaultValue()
+        .withDescription("The name of StarRocks table");
+
+
+    Option<String> SAVE_MODE_CREATE_TEMPLATE = 
Options.key("save_mode_create_template")
+        .stringType()
+        .defaultValue("CREATE TABLE IF NOT EXISTS 
`${database}`.`${table_name}` (\n" +
+            "${rowtype_fields}\n" +
+            ") ENGINE=OLAP\n" +
+            " PRIMARY KEY (${rowtype_primary_key})\n" +
+            "DISTRIBUTED BY HASH (${rowtype_primary_key})" +
+            "PROPERTIES (\n" +
+            "    \"replication_num\" = \"1\" \n" +
+            ")").withDescription("Create table statement template, used to 
create StarRocks table");
+
+    Option<Integer> QUERY_PORT = Options.key("query_port")
+        .intType()
+        .defaultValue(9030)
+        .withDescription("FE MySQL server port");
+
+    Option<Integer> BATCH_MAX_SIZE = Options.key("batch_max_rows")
+        .intType()
+        .defaultValue(1024)
+        .withDescription("For batch writing, when the number of buffers 
reaches the number of batch_max_rows or the byte size of batch_max_bytes or the 
time reaches batch_interval_ms, the data will be flushed into the StarRocks");
+
+    Option<Long> BATCH_MAX_BYTES = Options.key("batch_max_bytes")
+        .longType()
+        .defaultValue((long) (5 * 1024 * 1024))
+        .withDescription("For batch writing, when the number of buffers 
reaches the number of batch_max_rows or the byte size of batch_max_bytes or the 
time reaches batch_interval_ms, the data will be flushed into the StarRocks");
+
+    Option<Integer> BATCH_INTERVAL_MS = Options.key("batch_interval_ms")
+        .intType()
+        .noDefaultValue()
+        .withDescription("For batch writing, when the number of buffers 
reaches the number of batch_max_rows or the byte size of batch_max_bytes or the 
time reaches batch_interval_ms, the data will be flushed into the StarRocks");
+
+    Option<Integer> MAX_RETRIES = Options.key("max_retries")
+        .intType()
+        .noDefaultValue()
+        .withDescription("The number of retries to flush failed");
+
+    Option<Integer> RETRY_BACKOFF_MULTIPLIER_MS = 
Options.key("retry_backoff_multiplier_ms")
+        .intType()
+        .noDefaultValue()
+        .withDescription("Using as a multiplier for generating the next delay 
for backoff");
+
+    Option<Integer> MAX_RETRY_BACKOFF_MS = Options.key("max_retry_backoff_ms")
+        .intType()
+        .noDefaultValue()
+        .withDescription("The amount of time to wait before attempting to 
retry a request to StarRocks");
+
+    Option<Boolean> ENABLE_UPSERT_DELETE = Options.key("enable_upsert_delete")
+        .booleanType()
+        .defaultValue(false)
+        .withDescription("Whether to enable upsert/delete, only supports 
PrimaryKey model.");
+
+    Option<Map<String, String>> STARROCKS_CONFIG = 
Options.key("starrocks.config")
+        .mapType()
+        .noDefaultValue()
+        .withDescription("The parameter of the stream load data_desc. " +
+            "The way to specify the parameter is to add the original stream 
load parameter into map");
+
+    Option<String> COLUMN_SEPARATOR = 
Options.key("starrocks.config.column_separator")
+        .stringType()
+        .noDefaultValue()
+        .withDescription("");
+
+    Option<StreamLoadFormat> LOAD_FORMAT = 
Options.key("starrocks.config.format")
+        .enumType(StreamLoadFormat.class)
+        .defaultValue(StreamLoadFormat.CSV)
+        .withDescription("");
+}
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
index c62300c46..19c0ce853 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
@@ -17,14 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.starrocks.sink;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig.DATABASE;
-import static 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig.NODE_URLS;
-import static 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig.PASSWORD;
-import static 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig.TABLE;
-import static 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig.USERNAME;
-
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.ConfigValidator;
 import org.apache.seatunnel.api.sink.DataSaveMode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
@@ -34,14 +29,11 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 import 
org.apache.seatunnel.connectors.seatunnel.starrocks.catalog.StarRocksCatalog;
+import 
org.apache.seatunnel.connectors.seatunnel.starrocks.catalog.StarRocksCatalogFactory;
 import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
-import 
org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
@@ -57,26 +49,30 @@ public class StarRocksSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> implem
     private SeaTunnelRowType seaTunnelRowType;
     private SinkConfig sinkConfig;
     private DataSaveMode dataSaveMode;
+
+    public StarRocksSink(DataSaveMode dataSaveMode,
+                         SinkConfig sinkConfig,
+                         SeaTunnelRowType seaTunnelRowType) {
+        this.dataSaveMode = dataSaveMode;
+        this.sinkConfig = sinkConfig;
+        this.seaTunnelRowType = seaTunnelRowType;
+    }
+
     @Override
     public String getPluginName() {
-        return "StarRocks";
+        return StarRocksCatalogFactory.IDENTIFIER;
     }
 
     @Override
     public void prepare(Config pluginConfig) throws PrepareFailException {
-        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, 
NODE_URLS.key(), DATABASE.key(), TABLE.key(), USERNAME.key(), PASSWORD.key());
-        if (!result.isSuccess()) {
-            throw new 
StarRocksConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                String.format("PluginName: %s, PluginType: %s, Message: %s",
-                    getPluginName(), PluginType.SINK, result.getMsg()));
-        }
+        ConfigValidator.of(ReadonlyConfig.fromConfig(pluginConfig))
+            .validate(new StarRocksCatalogFactory().optionRule());
         // TODO get catalog Table
         CatalogTable catalogTable = null;
-        sinkConfig = SinkConfig.loadConfig(pluginConfig);
+        sinkConfig = SinkConfig.of(ReadonlyConfig.fromConfig(pluginConfig));
         if (StringUtils.isEmpty(sinkConfig.getTable())) {
             sinkConfig.setTable(catalogTable.getTableId().getTableName());
         }
-        sinkConfig.setTable(catalogTable.getTableId().getTableName());
         dataSaveMode = DataSaveMode.KEEP_SCHEMA_AND_DATA;
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
index 8863c3115..5bbd57142 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
@@ -18,11 +18,18 @@
 package org.apache.seatunnel.connectors.seatunnel.starrocks.sink;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableFactoryContext;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
 import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSinkOptions;
 
 import com.google.auto.service.AutoService;
+import org.apache.commons.lang3.StringUtils;
 
 @AutoService(Factory.class)
 public class StarRocksSinkFactory implements TableSinkFactory {
@@ -34,11 +41,25 @@ public class StarRocksSinkFactory implements 
TableSinkFactory {
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-            .required(SinkConfig.NODE_URLS, SinkConfig.USERNAME, 
SinkConfig.PASSWORD, SinkConfig.DATABASE, SinkConfig.QUERY_PORT)
-            .optional(SinkConfig.TABLE, SinkConfig.LABEL_PREFIX, 
SinkConfig.BATCH_MAX_SIZE, SinkConfig.BATCH_MAX_BYTES,
-                SinkConfig.BATCH_INTERVAL_MS, SinkConfig.MAX_RETRIES, 
SinkConfig.MAX_RETRY_BACKOFF_MS,
-                SinkConfig.RETRY_BACKOFF_MULTIPLIER_MS, 
SinkConfig.STARROCKS_CONFIG, SinkConfig.ENABLE_UPSERT_DELETE,
-                SinkConfig.SAVE_MODE_CREATE_TEMPLATE)
+            .required(StarRocksOptions.USERNAME, StarRocksOptions.PASSWORD)
+            .required(StarRocksSinkOptions.DATABASE, 
StarRocksSinkOptions.QUERY_PORT)
+            .required(StarRocksSinkOptions.NODE_URLS)
+            .optional(StarRocksSinkOptions.TABLE, 
StarRocksSinkOptions.LABEL_PREFIX, StarRocksSinkOptions.BATCH_MAX_SIZE, 
StarRocksSinkOptions.BATCH_MAX_BYTES,
+                StarRocksSinkOptions.BATCH_INTERVAL_MS, 
StarRocksSinkOptions.MAX_RETRIES, StarRocksSinkOptions.MAX_RETRY_BACKOFF_MS,
+                StarRocksSinkOptions.RETRY_BACKOFF_MULTIPLIER_MS, 
StarRocksSinkOptions.STARROCKS_CONFIG, 
StarRocksSinkOptions.ENABLE_UPSERT_DELETE,
+                StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE)
                 .build();
     }
+
+    @Override
+    public TableSink createSink(TableFactoryContext context) {
+        SinkConfig sinkConfig = SinkConfig.of(context.getOptions());
+        CatalogTable catalogTable = context.getCatalogTable();
+        if (StringUtils.isBlank(sinkConfig.getTable())) {
+            sinkConfig.setTable(catalogTable.getTableId().getTableName());
+        }
+        return () -> new StarRocksSink(DataSaveMode.KEEP_SCHEMA_AND_DATA,
+            sinkConfig,
+            catalogTable.getTableSchema().toPhysicalRowDataType());
+    }
 }

Reply via email to