This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new da8d9cbd35 [improve] add StarRocks options (#8639)
da8d9cbd35 is described below

commit da8d9cbd3559817e42f758a4f30f61bf7677c4a4
Author: fcb-xiaobo <[email protected]>
AuthorDate: Tue Feb 11 22:22:45 2025 +0800

    [improve] add StarRocks options (#8639)
---
 .../seatunnel/api/ConnectorOptionCheckTest.java    |  1 -
 .../starrocks/catalog/StarRocksCatalogFactory.java | 17 ++--
 .../seatunnel/starrocks/config/SinkConfig.java     |  6 +-
 .../seatunnel/starrocks/config/SourceConfig.java   | 93 +++++-----------------
 ...CommonConfig.java => StarRocksBaseOptions.java} | 27 +------
 .../starrocks/config/StarRocksConfig.java          | 47 +++++++++++
 .../starrocks/config/StarRocksOptions.java         | 43 ----------
 .../starrocks/config/StarRocksSinkOptions.java     | 65 ++++++---------
 ...urceConfig.java => StarRocksSourceOptions.java} | 55 ++-----------
 .../starrocks/sink/StarRocksSinkFactory.java       | 11 ++-
 .../starrocks/source/StarRocksSource.java          |  4 +-
 .../starrocks/source/StarRocksSourceFactory.java   | 31 ++++----
 12 files changed, 133 insertions(+), 267 deletions(-)

diff --git 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index 9cf980f4bd..6d2bbf17e9 100644
--- 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++ 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -191,7 +191,6 @@ public class ConnectorOptionCheckTest {
         whiteList.add("IoTDBSinkOptions");
         whiteList.add("EasysearchSourceOptions");
         whiteList.add("RabbitmqSinkOptions");
-        whiteList.add("StarRocksSourceOptions");
         whiteList.add("IcebergSourceOptions");
         whiteList.add("HbaseSourceOptions");
         whiteList.add("PaimonSourceOptions");
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogFactory.java
index 124e025719..3ad81b68d1 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogFactory.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogFactory.java
@@ -22,23 +22,22 @@ import 
org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.factory.CatalogFactory;
 import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.connectors.seatunnel.starrocks.config.CommonConfig;
-import 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSinkOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSourceOptions;
 
 import com.google.auto.service.AutoService;
 
 @AutoService(Factory.class)
 public class StarRocksCatalogFactory implements CatalogFactory {
-    public static final String IDENTIFIER = CommonConfig.CONNECTOR_IDENTITY;
+    public static final String IDENTIFIER = 
StarRocksSinkOptions.CONNECTOR_IDENTITY;
 
     @Override
     public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
         return new StarRocksCatalog(
                 catalogName,
-                options.get(StarRocksOptions.USERNAME),
-                options.get(StarRocksOptions.PASSWORD),
-                options.get(StarRocksOptions.BASE_URL),
+                options.get(StarRocksSourceOptions.USERNAME),
+                options.get(StarRocksSourceOptions.PASSWORD),
+                options.get(StarRocksSinkOptions.BASE_URL),
                 options.get(StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE));
     }
 
@@ -50,9 +49,9 @@ public class StarRocksCatalogFactory implements 
CatalogFactory {
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(StarRocksOptions.BASE_URL)
-                .required(StarRocksOptions.USERNAME)
-                .required(StarRocksOptions.PASSWORD)
+                .required(StarRocksSinkOptions.BASE_URL)
+                .required(StarRocksSourceOptions.USERNAME)
+                .required(StarRocksSourceOptions.PASSWORD)
                 .build();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
index 88b608dab3..4b0ee5e6eb 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
@@ -71,9 +71,9 @@ public class SinkConfig implements Serializable {
         SinkConfig sinkConfig = new SinkConfig();
         sinkConfig.setNodeUrls(config.get(StarRocksSinkOptions.NODE_URLS));
         sinkConfig.setDatabase(config.get(StarRocksSinkOptions.DATABASE));
-        sinkConfig.setJdbcUrl(config.get(StarRocksOptions.BASE_URL));
-        
config.getOptional(StarRocksOptions.USERNAME).ifPresent(sinkConfig::setUsername);
-        
config.getOptional(StarRocksOptions.PASSWORD).ifPresent(sinkConfig::setPassword);
+        sinkConfig.setJdbcUrl(config.get(StarRocksSinkOptions.BASE_URL));
+        
config.getOptional(StarRocksSinkOptions.USERNAME).ifPresent(sinkConfig::setUsername);
+        
config.getOptional(StarRocksSinkOptions.PASSWORD).ifPresent(sinkConfig::setPassword);
         
config.getOptional(StarRocksSinkOptions.TABLE).ifPresent(sinkConfig::setTable);
         
config.getOptional(StarRocksSinkOptions.LABEL_PREFIX).ifPresent(sinkConfig::setLabelPrefix);
         
sinkConfig.setBatchMaxSize(config.get(StarRocksSinkOptions.BATCH_MAX_SIZE));
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SourceConfig.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SourceConfig.java
index d069863843..e7a54ed403 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SourceConfig.java
@@ -17,8 +17,6 @@
 
 package org.apache.seatunnel.connectors.seatunnel.starrocks.config;
 
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 
 import lombok.Getter;
@@ -29,22 +27,20 @@ import java.util.Map;
 
 @Setter
 @Getter
-public class SourceConfig extends CommonConfig {
-
-    private static final long DEFAULT_SCAN_MEM_LIMIT = 1024 * 1024 * 1024L;
+public class SourceConfig extends StarRocksConfig {
 
     public SourceConfig(ReadonlyConfig config) {
         super(config);
-        this.maxRetries = config.get(MAX_RETRIES);
-        this.requestTabletSize = config.get(QUERY_TABLET_SIZE);
-        this.scanFilter = config.get(SCAN_FILTER);
-        this.connectTimeoutMs = config.get(SCAN_CONNECT_TIMEOUT);
-        this.batchRows = config.get(SCAN_BATCH_ROWS);
-        this.keepAliveMin = config.get(SCAN_KEEP_ALIVE_MIN);
-        this.queryTimeoutSec = config.get(SCAN_QUERY_TIMEOUT_SEC);
-        this.memLimit = config.get(SCAN_MEM_LIMIT);
-
-        String prefix = STARROCKS_SCAN_CONFIG_PREFIX.key();
+        this.maxRetries = config.get(StarRocksSourceOptions.MAX_RETRIES);
+        this.requestTabletSize = 
config.get(StarRocksSourceOptions.QUERY_TABLET_SIZE);
+        this.scanFilter = config.get(StarRocksSourceOptions.SCAN_FILTER);
+        this.connectTimeoutMs = 
config.get(StarRocksSourceOptions.SCAN_CONNECT_TIMEOUT);
+        this.batchRows = config.get(StarRocksSourceOptions.SCAN_BATCH_ROWS);
+        this.keepAliveMin = 
config.get(StarRocksSourceOptions.SCAN_KEEP_ALIVE_MIN);
+        this.queryTimeoutSec = 
config.get(StarRocksSourceOptions.SCAN_QUERY_TIMEOUT_SEC);
+        this.memLimit = config.get(StarRocksSourceOptions.SCAN_MEM_LIMIT);
+
+        String prefix = 
StarRocksSourceOptions.STARROCKS_SCAN_CONFIG_PREFIX.key();
         config.toMap()
                 .forEach(
                         (key, value) -> {
@@ -55,64 +51,13 @@ public class SourceConfig extends CommonConfig {
                         });
     }
 
-    public static final Option<Integer> MAX_RETRIES =
-            Options.key("max_retries")
-                    .intType()
-                    .defaultValue(3)
-                    .withDescription("number of retry requests sent to 
StarRocks");
-
-    public static final Option<Integer> QUERY_TABLET_SIZE =
-            Options.key("request_tablet_size")
-                    .intType()
-                    .defaultValue(Integer.MAX_VALUE)
-                    .withDescription("The number of Tablets corresponding to 
an Partition");
-
-    public static final Option<String> SCAN_FILTER =
-            
Options.key("scan_filter").stringType().defaultValue("").withDescription("SQL 
filter");
-
-    public static final Option<Integer> SCAN_CONNECT_TIMEOUT =
-            Options.key("scan_connect_timeout_ms")
-                    .intType()
-                    .defaultValue(1000)
-                    .withDescription("scan connect timeout");
-
-    public static final Option<Integer> SCAN_BATCH_ROWS =
-            Options.key("scan_batch_rows")
-                    .intType()
-                    .defaultValue(1024)
-                    .withDescription("scan batch rows");
-
-    public static final Option<Integer> SCAN_KEEP_ALIVE_MIN =
-            Options.key("scan_keep_alive_min")
-                    .intType()
-                    .defaultValue(10)
-                    .withDescription("Max keep alive time min");
-
-    public static final Option<Integer> SCAN_QUERY_TIMEOUT_SEC =
-            Options.key("scan_query_timeout_sec")
-                    .intType()
-                    .defaultValue(3600)
-                    .withDescription("Query timeout for a single query");
-
-    public static final Option<Long> SCAN_MEM_LIMIT =
-            Options.key("scan_mem_limit")
-                    .longType()
-                    .defaultValue(DEFAULT_SCAN_MEM_LIMIT)
-                    .withDescription("Memory byte limit for a single query");
-
-    public static final Option<String> STARROCKS_SCAN_CONFIG_PREFIX =
-            Options.key("scan.params.")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("The parameter of the scan data from be");
-
-    private int maxRetries = MAX_RETRIES.defaultValue();
-    private int requestTabletSize = QUERY_TABLET_SIZE.defaultValue();
-    private String scanFilter = SCAN_FILTER.defaultValue();
-    private long memLimit = SCAN_MEM_LIMIT.defaultValue();
-    private int queryTimeoutSec = SCAN_QUERY_TIMEOUT_SEC.defaultValue();
-    private int keepAliveMin = SCAN_KEEP_ALIVE_MIN.defaultValue();
-    private int batchRows = SCAN_BATCH_ROWS.defaultValue();
-    private int connectTimeoutMs = SCAN_CONNECT_TIMEOUT.defaultValue();
+    private int maxRetries = StarRocksSourceOptions.MAX_RETRIES.defaultValue();
+    private int requestTabletSize = 
StarRocksSourceOptions.QUERY_TABLET_SIZE.defaultValue();
+    private String scanFilter = 
StarRocksSourceOptions.SCAN_FILTER.defaultValue();
+    private long memLimit = 
StarRocksSourceOptions.SCAN_MEM_LIMIT.defaultValue();
+    private int queryTimeoutSec = 
StarRocksSourceOptions.SCAN_QUERY_TIMEOUT_SEC.defaultValue();
+    private int keepAliveMin = 
StarRocksSourceOptions.SCAN_KEEP_ALIVE_MIN.defaultValue();
+    private int batchRows = 
StarRocksSourceOptions.SCAN_BATCH_ROWS.defaultValue();
+    private int connectTimeoutMs = 
StarRocksSourceOptions.SCAN_CONNECT_TIMEOUT.defaultValue();
     private Map<String, String> sourceOptionProps = new HashMap<>();
 }
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/CommonConfig.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksBaseOptions.java
similarity index 77%
rename from 
seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/CommonConfig.java
rename to 
seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksBaseOptions.java
index c8a4775fcf..1aa9b99a93 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/CommonConfig.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksBaseOptions.java
@@ -14,27 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.seatunnel.connectors.seatunnel.starrocks.config;
 
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-
-import lombok.AllArgsConstructor;
-import lombok.Getter;
-import lombok.ToString;
 
 import java.io.Serializable;
 import java.util.List;
 
-@Getter
-@ToString
-@AllArgsConstructor
-public class CommonConfig implements Serializable {
-
+public class StarRocksBaseOptions implements Serializable {
     public static final String CONNECTOR_IDENTITY = "StarRocks";
-
     public static final Option<List<String>> NODE_URLS =
             Options.key("nodeUrls")
                     .listType()
@@ -65,18 +54,4 @@ public class CommonConfig implements Serializable {
                     .stringType()
                     .noDefaultValue()
                     .withDescription("StarRocks user password");
-
-    private List<String> nodeUrls;
-    private String username;
-    private String password;
-    private String database;
-    private String table;
-
-    public CommonConfig(ReadonlyConfig config) {
-        this.nodeUrls = config.get(NODE_URLS);
-        this.username = config.get(USERNAME);
-        this.password = config.get(PASSWORD);
-        this.database = config.get(DATABASE);
-        this.table = config.get(TABLE);
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksConfig.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksConfig.java
new file mode 100644
index 0000000000..d57df3c15a
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksConfig.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.starrocks.config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.ToString;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Getter
+@ToString
+@AllArgsConstructor
+public class StarRocksConfig implements Serializable {
+
+    private List<String> nodeUrls;
+    private String username;
+    private String password;
+    private String database;
+    private String table;
+
+    public StarRocksConfig(ReadonlyConfig config) {
+        this.nodeUrls = config.get(StarRocksBaseOptions.NODE_URLS);
+        this.username = config.get(StarRocksBaseOptions.USERNAME);
+        this.password = config.get(StarRocksBaseOptions.PASSWORD);
+        this.database = config.get(StarRocksBaseOptions.DATABASE);
+        this.table = config.get(StarRocksBaseOptions.TABLE);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksOptions.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksOptions.java
deleted file mode 100644
index f5ade9ae3e..0000000000
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksOptions.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.starrocks.config;
-
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
-
-public interface StarRocksOptions {
-    Option<String> BASE_URL =
-            Options.key("base-url")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "The JDBC URL like 
\"jdbc:mysql://localhost:9030/\" or"
-                                    + "\"jdbc:mysql://localhost:9030/\" or 
\"jdbc:mysql://localhost:9030/db\"");
-
-    Option<String> USERNAME =
-            Options.key("username")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("StarRocks user username");
-
-    Option<String> PASSWORD =
-            Options.key("password")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("StarRocks user password");
-}
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
index a0467d9917..d30aa6f9ef 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
@@ -24,37 +24,25 @@ import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
 import org.apache.seatunnel.api.sink.SchemaSaveMode;
 import 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig.StreamLoadFormat;
 
-import java.util.List;
 import java.util.Map;
 
 @SuppressWarnings("MagicNumber")
-public interface StarRocksSinkOptions {
-    Option<List<String>> NODE_URLS =
-            Options.key("nodeUrls")
-                    .listType()
+public class StarRocksSinkOptions extends StarRocksBaseOptions {
+
+    public static final Option<String> BASE_URL =
+            Options.key("base-url")
+                    .stringType()
                     .noDefaultValue()
                     .withDescription(
-                            "StarRocks cluster http address, the format is 
[\"fe_ip:fe_http_port\", ...]");
-
-    Option<String> LABEL_PREFIX =
+                            "The JDBC URL like 
\"jdbc:mysql://localhost:9030/\" or"
+                                    + "\"jdbc:mysql://localhost:9030/\" or 
\"jdbc:mysql://localhost:9030/db\"");
+    public static final Option<String> LABEL_PREFIX =
             Options.key("labelPrefix")
                     .stringType()
                     .noDefaultValue()
                     .withDescription("The prefix of StarRocks stream load 
label");
 
-    Option<String> DATABASE =
-            Options.key("database")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("The name of StarRocks database");
-
-    Option<String> TABLE =
-            Options.key("table")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("The name of StarRocks table");
-
-    Option<String> SAVE_MODE_CREATE_TEMPLATE =
+    public static final Option<String> SAVE_MODE_CREATE_TEMPLATE =
             Options.key("save_mode_create_template")
                     .stringType()
                     .defaultValue(
@@ -83,48 +71,47 @@ public interface StarRocksSinkOptions {
                     .withDescription(
                             "Create table statement template, used to create 
StarRocks table");
 
-    Option<Integer> BATCH_MAX_SIZE =
+    public static final Option<Integer> MAX_RETRIES =
+            Options.key("max_retries")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription("The number of retries to flush failed");
+    public static final Option<Integer> BATCH_MAX_SIZE =
             Options.key("batch_max_rows")
                     .intType()
                     .defaultValue(1024)
                     .withDescription(
                             "For batch writing, when the number of buffers 
reaches the number of batch_max_rows or the byte size of batch_max_bytes or the 
time reaches checkpoint.interval, the data will be flushed into the StarRocks");
 
-    Option<Long> BATCH_MAX_BYTES =
+    public static final Option<Long> BATCH_MAX_BYTES =
             Options.key("batch_max_bytes")
                     .longType()
                     .defaultValue((long) (5 * 1024 * 1024))
                     .withDescription(
                             "For batch writing, when the number of buffers 
reaches the number of batch_max_rows or the byte size of batch_max_bytes or the 
time reaches checkpoint.interval, the data will be flushed into the StarRocks");
 
-    Option<Integer> MAX_RETRIES =
-            Options.key("max_retries")
-                    .intType()
-                    .noDefaultValue()
-                    .withDescription("The number of retries to flush failed");
-
-    Option<Integer> RETRY_BACKOFF_MULTIPLIER_MS =
+    public static final Option<Integer> RETRY_BACKOFF_MULTIPLIER_MS =
             Options.key("retry_backoff_multiplier_ms")
                     .intType()
                     .noDefaultValue()
                     .withDescription(
                             "Using as a multiplier for generating the next 
delay for backoff");
 
-    Option<Integer> MAX_RETRY_BACKOFF_MS =
+    public static final Option<Integer> MAX_RETRY_BACKOFF_MS =
             Options.key("max_retry_backoff_ms")
                     .intType()
                     .noDefaultValue()
                     .withDescription(
                             "The amount of time to wait before attempting to 
retry a request to StarRocks");
 
-    Option<Boolean> ENABLE_UPSERT_DELETE =
+    public static final Option<Boolean> ENABLE_UPSERT_DELETE =
             Options.key("enable_upsert_delete")
                     .booleanType()
                     .defaultValue(false)
                     .withDescription(
                             "Whether to enable upsert/delete, only supports 
PrimaryKey model.");
 
-    Option<Map<String, String>> STARROCKS_CONFIG =
+    public static final Option<Map<String, String>> STARROCKS_CONFIG =
             Options.key("starrocks.config")
                     .mapType()
                     .noDefaultValue()
@@ -132,38 +119,38 @@ public interface StarRocksSinkOptions {
                             "The parameter of the stream load data_desc. "
                                     + "The way to specify the parameter is to 
add the original stream load parameter into map");
 
-    Option<String> COLUMN_SEPARATOR =
+    public static final Option<String> COLUMN_SEPARATOR =
             Options.key("starrocks.config.column_separator")
                     .stringType()
                     .noDefaultValue()
                     .withDescription("");
 
-    Option<StreamLoadFormat> LOAD_FORMAT =
+    public static final Option<StreamLoadFormat> LOAD_FORMAT =
             Options.key("starrocks.config.format")
                     .enumType(StreamLoadFormat.class)
                     .defaultValue(StreamLoadFormat.JSON)
                     .withDescription("");
-    Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
+    public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
             Options.key("schema_save_mode")
                     .enumType(SchemaSaveMode.class)
                     .defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
                     .withDescription(
                             "different treatment schemes are selected for the 
existing surface structure of the target side");
 
-    Option<DataSaveMode> DATA_SAVE_MODE =
+    public static final Option<DataSaveMode> DATA_SAVE_MODE =
             Options.key("data_save_mode")
                     .enumType(DataSaveMode.class)
                     .defaultValue(DataSaveMode.APPEND_DATA)
                     .withDescription(
                             "different processing schemes are selected for 
data existing data on the target side");
 
-    Option<Integer> HTTP_SOCKET_TIMEOUT_MS =
+    public static final Option<Integer> HTTP_SOCKET_TIMEOUT_MS =
             Options.key("http_socket_timeout_ms")
                     .intType()
                     .defaultValue(3 * 60 * 1000)
                     .withDescription("Set http socket timeout, default is 3 
minutes.");
 
-    Option<String> CUSTOM_SQL =
+    public static final Option<String> CUSTOM_SQL =
             Options.key("custom_sql")
                     .stringType()
                     .noDefaultValue()
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SourceConfig.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSourceOptions.java
similarity index 64%
copy from 
seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SourceConfig.java
copy to 
seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSourceOptions.java
index d069863843..c37dc47f87 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSourceOptions.java
@@ -19,48 +19,10 @@ package 
org.apache.seatunnel.connectors.seatunnel.starrocks.config;
 
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-
-import lombok.Getter;
-import lombok.Setter;
-
-import java.util.HashMap;
-import java.util.Map;
-
-@Setter
-@Getter
-public class SourceConfig extends CommonConfig {
 
+public class StarRocksSourceOptions extends StarRocksBaseOptions {
     private static final long DEFAULT_SCAN_MEM_LIMIT = 1024 * 1024 * 1024L;
 
-    public SourceConfig(ReadonlyConfig config) {
-        super(config);
-        this.maxRetries = config.get(MAX_RETRIES);
-        this.requestTabletSize = config.get(QUERY_TABLET_SIZE);
-        this.scanFilter = config.get(SCAN_FILTER);
-        this.connectTimeoutMs = config.get(SCAN_CONNECT_TIMEOUT);
-        this.batchRows = config.get(SCAN_BATCH_ROWS);
-        this.keepAliveMin = config.get(SCAN_KEEP_ALIVE_MIN);
-        this.queryTimeoutSec = config.get(SCAN_QUERY_TIMEOUT_SEC);
-        this.memLimit = config.get(SCAN_MEM_LIMIT);
-
-        String prefix = STARROCKS_SCAN_CONFIG_PREFIX.key();
-        config.toMap()
-                .forEach(
-                        (key, value) -> {
-                            if (key.startsWith(prefix)) {
-                                this.sourceOptionProps.put(
-                                        
key.substring(prefix.length()).toLowerCase(), value);
-                            }
-                        });
-    }
-
-    public static final Option<Integer> MAX_RETRIES =
-            Options.key("max_retries")
-                    .intType()
-                    .defaultValue(3)
-                    .withDescription("number of retry requests sent to 
StarRocks");
-
     public static final Option<Integer> QUERY_TABLET_SIZE =
             Options.key("request_tablet_size")
                     .intType()
@@ -70,6 +32,11 @@ public class SourceConfig extends CommonConfig {
     public static final Option<String> SCAN_FILTER =
             
Options.key("scan_filter").stringType().defaultValue("").withDescription("SQL 
filter");
 
+    public static final Option<Integer> MAX_RETRIES =
+            Options.key("max_retries")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription("number of retry requests sent to 
StarRocks");
     public static final Option<Integer> SCAN_CONNECT_TIMEOUT =
             Options.key("scan_connect_timeout_ms")
                     .intType()
@@ -105,14 +72,4 @@ public class SourceConfig extends CommonConfig {
                     .stringType()
                     .noDefaultValue()
                     .withDescription("The parameter of the scan data from be");
-
-    private int maxRetries = MAX_RETRIES.defaultValue();
-    private int requestTabletSize = QUERY_TABLET_SIZE.defaultValue();
-    private String scanFilter = SCAN_FILTER.defaultValue();
-    private long memLimit = SCAN_MEM_LIMIT.defaultValue();
-    private int queryTimeoutSec = SCAN_QUERY_TIMEOUT_SEC.defaultValue();
-    private int keepAliveMin = SCAN_KEEP_ALIVE_MIN.defaultValue();
-    private int batchRows = SCAN_BATCH_ROWS.defaultValue();
-    private int connectTimeoutMs = SCAN_CONNECT_TIMEOUT.defaultValue();
-    private Map<String, String> sourceOptionProps = new HashMap<>();
 }
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
index b2f482c201..2c9247bba6 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
@@ -25,9 +25,8 @@ import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
-import org.apache.seatunnel.connectors.seatunnel.starrocks.config.CommonConfig;
 import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
-import 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksBaseOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSinkOptions;
 
 import org.apache.commons.lang3.StringUtils;
@@ -44,14 +43,14 @@ import static 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRoc
 public class StarRocksSinkFactory implements TableSinkFactory {
     @Override
     public String factoryIdentifier() {
-        return CommonConfig.CONNECTOR_IDENTITY;
+        return StarRocksBaseOptions.CONNECTOR_IDENTITY;
     }
 
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(StarRocksOptions.USERNAME, StarRocksOptions.PASSWORD)
-                .required(StarRocksSinkOptions.DATABASE, 
StarRocksOptions.BASE_URL)
+                .required(StarRocksSinkOptions.USERNAME, 
StarRocksSinkOptions.PASSWORD)
+                .required(StarRocksSinkOptions.DATABASE, 
StarRocksSinkOptions.BASE_URL)
                 .required(StarRocksSinkOptions.NODE_URLS)
                 .optional(
                         StarRocksSinkOptions.TABLE,
@@ -64,7 +63,7 @@ public class StarRocksSinkFactory implements TableSinkFactory 
{
                         StarRocksSinkOptions.STARROCKS_CONFIG,
                         StarRocksSinkOptions.ENABLE_UPSERT_DELETE,
                         StarRocksSinkOptions.SCHEMA_SAVE_MODE,
-                        StarRocksSinkOptions.DATA_SAVE_MODE,
+                        DATA_SAVE_MODE,
                         MULTI_TABLE_SINK_REPLICA,
                         StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE,
                         StarRocksSinkOptions.HTTP_SOCKET_TIMEOUT_MS)
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSource.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSource.java
index 211a8b96fc..5b949ca434 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSource.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSource.java
@@ -23,8 +23,8 @@ import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.connectors.seatunnel.starrocks.config.CommonConfig;
 import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksBaseOptions;
 
 import java.util.Collections;
 import java.util.List;
@@ -37,7 +37,7 @@ public class StarRocksSource
 
     @Override
     public String getPluginName() {
-        return CommonConfig.CONNECTOR_IDENTITY;
+        return StarRocksBaseOptions.CONNECTOR_IDENTITY;
     }
 
     public StarRocksSource(SourceConfig sourceConfig, CatalogTable 
catalogTable) {
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSourceFactory.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSourceFactory.java
index 1f5e3c1690..5d33ad2558 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSourceFactory.java
@@ -28,8 +28,9 @@ import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
-import org.apache.seatunnel.connectors.seatunnel.starrocks.config.CommonConfig;
 import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksBaseOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSourceOptions;
 
 import com.google.auto.service.AutoService;
 
@@ -39,28 +40,28 @@ import java.io.Serializable;
 public class StarRocksSourceFactory implements TableSourceFactory {
     @Override
     public String factoryIdentifier() {
-        return CommonConfig.CONNECTOR_IDENTITY;
+        return StarRocksBaseOptions.CONNECTOR_IDENTITY;
     }
 
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
                 .required(
-                        SourceConfig.NODE_URLS,
-                        SourceConfig.USERNAME,
-                        SourceConfig.PASSWORD,
-                        SourceConfig.DATABASE,
-                        SourceConfig.TABLE,
+                        StarRocksSourceOptions.NODE_URLS,
+                        StarRocksSourceOptions.USERNAME,
+                        StarRocksSourceOptions.PASSWORD,
+                        StarRocksSourceOptions.DATABASE,
+                        StarRocksSourceOptions.TABLE,
                         TableSchemaOptions.SCHEMA)
                 .optional(
-                        SourceConfig.MAX_RETRIES,
-                        SourceConfig.QUERY_TABLET_SIZE,
-                        SourceConfig.SCAN_FILTER,
-                        SourceConfig.SCAN_MEM_LIMIT,
-                        SourceConfig.SCAN_QUERY_TIMEOUT_SEC,
-                        SourceConfig.SCAN_KEEP_ALIVE_MIN,
-                        SourceConfig.SCAN_BATCH_ROWS,
-                        SourceConfig.SCAN_CONNECT_TIMEOUT)
+                        StarRocksSourceOptions.MAX_RETRIES,
+                        StarRocksSourceOptions.QUERY_TABLET_SIZE,
+                        StarRocksSourceOptions.SCAN_FILTER,
+                        StarRocksSourceOptions.SCAN_MEM_LIMIT,
+                        StarRocksSourceOptions.SCAN_QUERY_TIMEOUT_SEC,
+                        StarRocksSourceOptions.SCAN_KEEP_ALIVE_MIN,
+                        StarRocksSourceOptions.SCAN_BATCH_ROWS,
+                        StarRocksSourceOptions.SCAN_CONNECT_TIMEOUT)
                 .build();
     }
 


Reply via email to