This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new da8d9cbd35 [improve] add StarRocks options (#8639)
da8d9cbd35 is described below
commit da8d9cbd3559817e42f758a4f30f61bf7677c4a4
Author: fcb-xiaobo <[email protected]>
AuthorDate: Tue Feb 11 22:22:45 2025 +0800
[improve] add StarRocks options (#8639)
---
.../seatunnel/api/ConnectorOptionCheckTest.java | 1 -
.../starrocks/catalog/StarRocksCatalogFactory.java | 17 ++--
.../seatunnel/starrocks/config/SinkConfig.java | 6 +-
.../seatunnel/starrocks/config/SourceConfig.java | 93 +++++-----------------
...CommonConfig.java => StarRocksBaseOptions.java} | 27 +------
.../starrocks/config/StarRocksConfig.java | 47 +++++++++++
.../starrocks/config/StarRocksOptions.java | 43 ----------
.../starrocks/config/StarRocksSinkOptions.java | 65 ++++++---------
...urceConfig.java => StarRocksSourceOptions.java} | 55 ++-----------
.../starrocks/sink/StarRocksSinkFactory.java | 11 ++-
.../starrocks/source/StarRocksSource.java | 4 +-
.../starrocks/source/StarRocksSourceFactory.java | 31 ++++----
12 files changed, 133 insertions(+), 267 deletions(-)
diff --git
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index 9cf980f4bd..6d2bbf17e9 100644
---
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -191,7 +191,6 @@ public class ConnectorOptionCheckTest {
whiteList.add("IoTDBSinkOptions");
whiteList.add("EasysearchSourceOptions");
whiteList.add("RabbitmqSinkOptions");
- whiteList.add("StarRocksSourceOptions");
whiteList.add("IcebergSourceOptions");
whiteList.add("HbaseSourceOptions");
whiteList.add("PaimonSourceOptions");
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogFactory.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogFactory.java
index 124e025719..3ad81b68d1 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogFactory.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogFactory.java
@@ -22,23 +22,22 @@ import
org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.connectors.seatunnel.starrocks.config.CommonConfig;
-import
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksOptions;
import
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSinkOptions;
+import
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSourceOptions;
import com.google.auto.service.AutoService;
@AutoService(Factory.class)
public class StarRocksCatalogFactory implements CatalogFactory {
- public static final String IDENTIFIER = CommonConfig.CONNECTOR_IDENTITY;
+ public static final String IDENTIFIER =
StarRocksSinkOptions.CONNECTOR_IDENTITY;
@Override
public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
return new StarRocksCatalog(
catalogName,
- options.get(StarRocksOptions.USERNAME),
- options.get(StarRocksOptions.PASSWORD),
- options.get(StarRocksOptions.BASE_URL),
+ options.get(StarRocksSourceOptions.USERNAME),
+ options.get(StarRocksSourceOptions.PASSWORD),
+ options.get(StarRocksSinkOptions.BASE_URL),
options.get(StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE));
}
@@ -50,9 +49,9 @@ public class StarRocksCatalogFactory implements
CatalogFactory {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(StarRocksOptions.BASE_URL)
- .required(StarRocksOptions.USERNAME)
- .required(StarRocksOptions.PASSWORD)
+ .required(StarRocksSinkOptions.BASE_URL)
+ .required(StarRocksSourceOptions.USERNAME)
+ .required(StarRocksSourceOptions.PASSWORD)
.build();
}
}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
index 88b608dab3..4b0ee5e6eb 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
@@ -71,9 +71,9 @@ public class SinkConfig implements Serializable {
SinkConfig sinkConfig = new SinkConfig();
sinkConfig.setNodeUrls(config.get(StarRocksSinkOptions.NODE_URLS));
sinkConfig.setDatabase(config.get(StarRocksSinkOptions.DATABASE));
- sinkConfig.setJdbcUrl(config.get(StarRocksOptions.BASE_URL));
-
config.getOptional(StarRocksOptions.USERNAME).ifPresent(sinkConfig::setUsername);
-
config.getOptional(StarRocksOptions.PASSWORD).ifPresent(sinkConfig::setPassword);
+ sinkConfig.setJdbcUrl(config.get(StarRocksSinkOptions.BASE_URL));
+
config.getOptional(StarRocksSinkOptions.USERNAME).ifPresent(sinkConfig::setUsername);
+
config.getOptional(StarRocksSinkOptions.PASSWORD).ifPresent(sinkConfig::setPassword);
config.getOptional(StarRocksSinkOptions.TABLE).ifPresent(sinkConfig::setTable);
config.getOptional(StarRocksSinkOptions.LABEL_PREFIX).ifPresent(sinkConfig::setLabelPrefix);
sinkConfig.setBatchMaxSize(config.get(StarRocksSinkOptions.BATCH_MAX_SIZE));
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SourceConfig.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SourceConfig.java
index d069863843..e7a54ed403 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SourceConfig.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SourceConfig.java
@@ -17,8 +17,6 @@
package org.apache.seatunnel.connectors.seatunnel.starrocks.config;
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import lombok.Getter;
@@ -29,22 +27,20 @@ import java.util.Map;
@Setter
@Getter
-public class SourceConfig extends CommonConfig {
-
- private static final long DEFAULT_SCAN_MEM_LIMIT = 1024 * 1024 * 1024L;
+public class SourceConfig extends StarRocksConfig {
public SourceConfig(ReadonlyConfig config) {
super(config);
- this.maxRetries = config.get(MAX_RETRIES);
- this.requestTabletSize = config.get(QUERY_TABLET_SIZE);
- this.scanFilter = config.get(SCAN_FILTER);
- this.connectTimeoutMs = config.get(SCAN_CONNECT_TIMEOUT);
- this.batchRows = config.get(SCAN_BATCH_ROWS);
- this.keepAliveMin = config.get(SCAN_KEEP_ALIVE_MIN);
- this.queryTimeoutSec = config.get(SCAN_QUERY_TIMEOUT_SEC);
- this.memLimit = config.get(SCAN_MEM_LIMIT);
-
- String prefix = STARROCKS_SCAN_CONFIG_PREFIX.key();
+ this.maxRetries = config.get(StarRocksSourceOptions.MAX_RETRIES);
+ this.requestTabletSize =
config.get(StarRocksSourceOptions.QUERY_TABLET_SIZE);
+ this.scanFilter = config.get(StarRocksSourceOptions.SCAN_FILTER);
+ this.connectTimeoutMs =
config.get(StarRocksSourceOptions.SCAN_CONNECT_TIMEOUT);
+ this.batchRows = config.get(StarRocksSourceOptions.SCAN_BATCH_ROWS);
+ this.keepAliveMin =
config.get(StarRocksSourceOptions.SCAN_KEEP_ALIVE_MIN);
+ this.queryTimeoutSec =
config.get(StarRocksSourceOptions.SCAN_QUERY_TIMEOUT_SEC);
+ this.memLimit = config.get(StarRocksSourceOptions.SCAN_MEM_LIMIT);
+
+ String prefix =
StarRocksSourceOptions.STARROCKS_SCAN_CONFIG_PREFIX.key();
config.toMap()
.forEach(
(key, value) -> {
@@ -55,64 +51,13 @@ public class SourceConfig extends CommonConfig {
});
}
- public static final Option<Integer> MAX_RETRIES =
- Options.key("max_retries")
- .intType()
- .defaultValue(3)
- .withDescription("number of retry requests sent to
StarRocks");
-
- public static final Option<Integer> QUERY_TABLET_SIZE =
- Options.key("request_tablet_size")
- .intType()
- .defaultValue(Integer.MAX_VALUE)
- .withDescription("The number of Tablets corresponding to
an Partition");
-
- public static final Option<String> SCAN_FILTER =
-
Options.key("scan_filter").stringType().defaultValue("").withDescription("SQL
filter");
-
- public static final Option<Integer> SCAN_CONNECT_TIMEOUT =
- Options.key("scan_connect_timeout_ms")
- .intType()
- .defaultValue(1000)
- .withDescription("scan connect timeout");
-
- public static final Option<Integer> SCAN_BATCH_ROWS =
- Options.key("scan_batch_rows")
- .intType()
- .defaultValue(1024)
- .withDescription("scan batch rows");
-
- public static final Option<Integer> SCAN_KEEP_ALIVE_MIN =
- Options.key("scan_keep_alive_min")
- .intType()
- .defaultValue(10)
- .withDescription("Max keep alive time min");
-
- public static final Option<Integer> SCAN_QUERY_TIMEOUT_SEC =
- Options.key("scan_query_timeout_sec")
- .intType()
- .defaultValue(3600)
- .withDescription("Query timeout for a single query");
-
- public static final Option<Long> SCAN_MEM_LIMIT =
- Options.key("scan_mem_limit")
- .longType()
- .defaultValue(DEFAULT_SCAN_MEM_LIMIT)
- .withDescription("Memory byte limit for a single query");
-
- public static final Option<String> STARROCKS_SCAN_CONFIG_PREFIX =
- Options.key("scan.params.")
- .stringType()
- .noDefaultValue()
- .withDescription("The parameter of the scan data from be");
-
- private int maxRetries = MAX_RETRIES.defaultValue();
- private int requestTabletSize = QUERY_TABLET_SIZE.defaultValue();
- private String scanFilter = SCAN_FILTER.defaultValue();
- private long memLimit = SCAN_MEM_LIMIT.defaultValue();
- private int queryTimeoutSec = SCAN_QUERY_TIMEOUT_SEC.defaultValue();
- private int keepAliveMin = SCAN_KEEP_ALIVE_MIN.defaultValue();
- private int batchRows = SCAN_BATCH_ROWS.defaultValue();
- private int connectTimeoutMs = SCAN_CONNECT_TIMEOUT.defaultValue();
+ private int maxRetries = StarRocksSourceOptions.MAX_RETRIES.defaultValue();
+ private int requestTabletSize =
StarRocksSourceOptions.QUERY_TABLET_SIZE.defaultValue();
+ private String scanFilter =
StarRocksSourceOptions.SCAN_FILTER.defaultValue();
+ private long memLimit =
StarRocksSourceOptions.SCAN_MEM_LIMIT.defaultValue();
+ private int queryTimeoutSec =
StarRocksSourceOptions.SCAN_QUERY_TIMEOUT_SEC.defaultValue();
+ private int keepAliveMin =
StarRocksSourceOptions.SCAN_KEEP_ALIVE_MIN.defaultValue();
+ private int batchRows =
StarRocksSourceOptions.SCAN_BATCH_ROWS.defaultValue();
+ private int connectTimeoutMs =
StarRocksSourceOptions.SCAN_CONNECT_TIMEOUT.defaultValue();
private Map<String, String> sourceOptionProps = new HashMap<>();
}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/CommonConfig.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksBaseOptions.java
similarity index 77%
rename from
seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/CommonConfig.java
rename to
seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksBaseOptions.java
index c8a4775fcf..1aa9b99a93 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/CommonConfig.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksBaseOptions.java
@@ -14,27 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.seatunnel.connectors.seatunnel.starrocks.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-
-import lombok.AllArgsConstructor;
-import lombok.Getter;
-import lombok.ToString;
import java.io.Serializable;
import java.util.List;
-@Getter
-@ToString
-@AllArgsConstructor
-public class CommonConfig implements Serializable {
-
+public class StarRocksBaseOptions implements Serializable {
public static final String CONNECTOR_IDENTITY = "StarRocks";
-
public static final Option<List<String>> NODE_URLS =
Options.key("nodeUrls")
.listType()
@@ -65,18 +54,4 @@ public class CommonConfig implements Serializable {
.stringType()
.noDefaultValue()
.withDescription("StarRocks user password");
-
- private List<String> nodeUrls;
- private String username;
- private String password;
- private String database;
- private String table;
-
- public CommonConfig(ReadonlyConfig config) {
- this.nodeUrls = config.get(NODE_URLS);
- this.username = config.get(USERNAME);
- this.password = config.get(PASSWORD);
- this.database = config.get(DATABASE);
- this.table = config.get(TABLE);
- }
}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksConfig.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksConfig.java
new file mode 100644
index 0000000000..d57df3c15a
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksConfig.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.starrocks.config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.ToString;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Getter
+@ToString
+@AllArgsConstructor
+public class StarRocksConfig implements Serializable {
+
+ private List<String> nodeUrls;
+ private String username;
+ private String password;
+ private String database;
+ private String table;
+
+ public StarRocksConfig(ReadonlyConfig config) {
+ this.nodeUrls = config.get(StarRocksBaseOptions.NODE_URLS);
+ this.username = config.get(StarRocksBaseOptions.USERNAME);
+ this.password = config.get(StarRocksBaseOptions.PASSWORD);
+ this.database = config.get(StarRocksBaseOptions.DATABASE);
+ this.table = config.get(StarRocksBaseOptions.TABLE);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksOptions.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksOptions.java
deleted file mode 100644
index f5ade9ae3e..0000000000
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksOptions.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.starrocks.config;
-
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
-
-public interface StarRocksOptions {
- Option<String> BASE_URL =
- Options.key("base-url")
- .stringType()
- .noDefaultValue()
- .withDescription(
- "The JDBC URL like
\"jdbc:mysql://localhost:9030/\" or"
- + "\"jdbc:mysql://localhost:9030/\" or
\"jdbc:mysql://localhost:9030/db\"");
-
- Option<String> USERNAME =
- Options.key("username")
- .stringType()
- .noDefaultValue()
- .withDescription("StarRocks user username");
-
- Option<String> PASSWORD =
- Options.key("password")
- .stringType()
- .noDefaultValue()
- .withDescription("StarRocks user password");
-}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
index a0467d9917..d30aa6f9ef 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
@@ -24,37 +24,25 @@ import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
import org.apache.seatunnel.api.sink.SchemaSaveMode;
import
org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig.StreamLoadFormat;
-import java.util.List;
import java.util.Map;
@SuppressWarnings("MagicNumber")
-public interface StarRocksSinkOptions {
- Option<List<String>> NODE_URLS =
- Options.key("nodeUrls")
- .listType()
+public class StarRocksSinkOptions extends StarRocksBaseOptions {
+
+ public static final Option<String> BASE_URL =
+ Options.key("base-url")
+ .stringType()
.noDefaultValue()
.withDescription(
- "StarRocks cluster http address, the format is
[\"fe_ip:fe_http_port\", ...]");
-
- Option<String> LABEL_PREFIX =
+ "The JDBC URL like
\"jdbc:mysql://localhost:9030/\" or"
+ + "\"jdbc:mysql://localhost:9030/\" or
\"jdbc:mysql://localhost:9030/db\"");
+ public static final Option<String> LABEL_PREFIX =
Options.key("labelPrefix")
.stringType()
.noDefaultValue()
.withDescription("The prefix of StarRocks stream load
label");
- Option<String> DATABASE =
- Options.key("database")
- .stringType()
- .noDefaultValue()
- .withDescription("The name of StarRocks database");
-
- Option<String> TABLE =
- Options.key("table")
- .stringType()
- .noDefaultValue()
- .withDescription("The name of StarRocks table");
-
- Option<String> SAVE_MODE_CREATE_TEMPLATE =
+ public static final Option<String> SAVE_MODE_CREATE_TEMPLATE =
Options.key("save_mode_create_template")
.stringType()
.defaultValue(
@@ -83,48 +71,47 @@ public interface StarRocksSinkOptions {
.withDescription(
"Create table statement template, used to create
StarRocks table");
- Option<Integer> BATCH_MAX_SIZE =
+ public static final Option<Integer> MAX_RETRIES =
+ Options.key("max_retries")
+ .intType()
+ .noDefaultValue()
+ .withDescription("The number of retries to flush failed");
+ public static final Option<Integer> BATCH_MAX_SIZE =
Options.key("batch_max_rows")
.intType()
.defaultValue(1024)
.withDescription(
"For batch writing, when the number of buffers
reaches the number of batch_max_rows or the byte size of batch_max_bytes or the
time reaches checkpoint.interval, the data will be flushed into the StarRocks");
- Option<Long> BATCH_MAX_BYTES =
+ public static final Option<Long> BATCH_MAX_BYTES =
Options.key("batch_max_bytes")
.longType()
.defaultValue((long) (5 * 1024 * 1024))
.withDescription(
"For batch writing, when the number of buffers
reaches the number of batch_max_rows or the byte size of batch_max_bytes or the
time reaches checkpoint.interval, the data will be flushed into the StarRocks");
- Option<Integer> MAX_RETRIES =
- Options.key("max_retries")
- .intType()
- .noDefaultValue()
- .withDescription("The number of retries to flush failed");
-
- Option<Integer> RETRY_BACKOFF_MULTIPLIER_MS =
+ public static final Option<Integer> RETRY_BACKOFF_MULTIPLIER_MS =
Options.key("retry_backoff_multiplier_ms")
.intType()
.noDefaultValue()
.withDescription(
"Using as a multiplier for generating the next
delay for backoff");
- Option<Integer> MAX_RETRY_BACKOFF_MS =
+ public static final Option<Integer> MAX_RETRY_BACKOFF_MS =
Options.key("max_retry_backoff_ms")
.intType()
.noDefaultValue()
.withDescription(
"The amount of time to wait before attempting to
retry a request to StarRocks");
- Option<Boolean> ENABLE_UPSERT_DELETE =
+ public static final Option<Boolean> ENABLE_UPSERT_DELETE =
Options.key("enable_upsert_delete")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to enable upsert/delete, only supports
PrimaryKey model.");
- Option<Map<String, String>> STARROCKS_CONFIG =
+ public static final Option<Map<String, String>> STARROCKS_CONFIG =
Options.key("starrocks.config")
.mapType()
.noDefaultValue()
@@ -132,38 +119,38 @@ public interface StarRocksSinkOptions {
"The parameter of the stream load data_desc. "
+ "The way to specify the parameter is to
add the original stream load parameter into map");
- Option<String> COLUMN_SEPARATOR =
+ public static final Option<String> COLUMN_SEPARATOR =
Options.key("starrocks.config.column_separator")
.stringType()
.noDefaultValue()
.withDescription("");
- Option<StreamLoadFormat> LOAD_FORMAT =
+ public static final Option<StreamLoadFormat> LOAD_FORMAT =
Options.key("starrocks.config.format")
.enumType(StreamLoadFormat.class)
.defaultValue(StreamLoadFormat.JSON)
.withDescription("");
- Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
+ public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
Options.key("schema_save_mode")
.enumType(SchemaSaveMode.class)
.defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
.withDescription(
"different treatment schemes are selected for the
existing surface structure of the target side");
- Option<DataSaveMode> DATA_SAVE_MODE =
+ public static final Option<DataSaveMode> DATA_SAVE_MODE =
Options.key("data_save_mode")
.enumType(DataSaveMode.class)
.defaultValue(DataSaveMode.APPEND_DATA)
.withDescription(
"different processing schemes are selected for
data existing data on the target side");
- Option<Integer> HTTP_SOCKET_TIMEOUT_MS =
+ public static final Option<Integer> HTTP_SOCKET_TIMEOUT_MS =
Options.key("http_socket_timeout_ms")
.intType()
.defaultValue(3 * 60 * 1000)
.withDescription("Set http socket timeout, default is 3
minutes.");
- Option<String> CUSTOM_SQL =
+ public static final Option<String> CUSTOM_SQL =
Options.key("custom_sql")
.stringType()
.noDefaultValue()
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SourceConfig.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSourceOptions.java
similarity index 64%
copy from
seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SourceConfig.java
copy to
seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSourceOptions.java
index d069863843..c37dc47f87 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SourceConfig.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSourceOptions.java
@@ -19,48 +19,10 @@ package
org.apache.seatunnel.connectors.seatunnel.starrocks.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-
-import lombok.Getter;
-import lombok.Setter;
-
-import java.util.HashMap;
-import java.util.Map;
-
-@Setter
-@Getter
-public class SourceConfig extends CommonConfig {
+public class StarRocksSourceOptions extends StarRocksBaseOptions {
private static final long DEFAULT_SCAN_MEM_LIMIT = 1024 * 1024 * 1024L;
- public SourceConfig(ReadonlyConfig config) {
- super(config);
- this.maxRetries = config.get(MAX_RETRIES);
- this.requestTabletSize = config.get(QUERY_TABLET_SIZE);
- this.scanFilter = config.get(SCAN_FILTER);
- this.connectTimeoutMs = config.get(SCAN_CONNECT_TIMEOUT);
- this.batchRows = config.get(SCAN_BATCH_ROWS);
- this.keepAliveMin = config.get(SCAN_KEEP_ALIVE_MIN);
- this.queryTimeoutSec = config.get(SCAN_QUERY_TIMEOUT_SEC);
- this.memLimit = config.get(SCAN_MEM_LIMIT);
-
- String prefix = STARROCKS_SCAN_CONFIG_PREFIX.key();
- config.toMap()
- .forEach(
- (key, value) -> {
- if (key.startsWith(prefix)) {
- this.sourceOptionProps.put(
-
key.substring(prefix.length()).toLowerCase(), value);
- }
- });
- }
-
- public static final Option<Integer> MAX_RETRIES =
- Options.key("max_retries")
- .intType()
- .defaultValue(3)
- .withDescription("number of retry requests sent to
StarRocks");
-
public static final Option<Integer> QUERY_TABLET_SIZE =
Options.key("request_tablet_size")
.intType()
@@ -70,6 +32,11 @@ public class SourceConfig extends CommonConfig {
public static final Option<String> SCAN_FILTER =
Options.key("scan_filter").stringType().defaultValue("").withDescription("SQL
filter");
+ public static final Option<Integer> MAX_RETRIES =
+ Options.key("max_retries")
+ .intType()
+ .defaultValue(3)
+ .withDescription("number of retry requests sent to
StarRocks");
public static final Option<Integer> SCAN_CONNECT_TIMEOUT =
Options.key("scan_connect_timeout_ms")
.intType()
@@ -105,14 +72,4 @@ public class SourceConfig extends CommonConfig {
.stringType()
.noDefaultValue()
.withDescription("The parameter of the scan data from be");
-
- private int maxRetries = MAX_RETRIES.defaultValue();
- private int requestTabletSize = QUERY_TABLET_SIZE.defaultValue();
- private String scanFilter = SCAN_FILTER.defaultValue();
- private long memLimit = SCAN_MEM_LIMIT.defaultValue();
- private int queryTimeoutSec = SCAN_QUERY_TIMEOUT_SEC.defaultValue();
- private int keepAliveMin = SCAN_KEEP_ALIVE_MIN.defaultValue();
- private int batchRows = SCAN_BATCH_ROWS.defaultValue();
- private int connectTimeoutMs = SCAN_CONNECT_TIMEOUT.defaultValue();
- private Map<String, String> sourceOptionProps = new HashMap<>();
}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
index b2f482c201..2c9247bba6 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
@@ -25,9 +25,8 @@ import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
-import org.apache.seatunnel.connectors.seatunnel.starrocks.config.CommonConfig;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
-import
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksOptions;
+import
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksBaseOptions;
import
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSinkOptions;
import org.apache.commons.lang3.StringUtils;
@@ -44,14 +43,14 @@ import static
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRoc
public class StarRocksSinkFactory implements TableSinkFactory {
@Override
public String factoryIdentifier() {
- return CommonConfig.CONNECTOR_IDENTITY;
+ return StarRocksBaseOptions.CONNECTOR_IDENTITY;
}
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(StarRocksOptions.USERNAME, StarRocksOptions.PASSWORD)
- .required(StarRocksSinkOptions.DATABASE,
StarRocksOptions.BASE_URL)
+ .required(StarRocksSinkOptions.USERNAME,
StarRocksSinkOptions.PASSWORD)
+ .required(StarRocksSinkOptions.DATABASE,
StarRocksSinkOptions.BASE_URL)
.required(StarRocksSinkOptions.NODE_URLS)
.optional(
StarRocksSinkOptions.TABLE,
@@ -64,7 +63,7 @@ public class StarRocksSinkFactory implements TableSinkFactory
{
StarRocksSinkOptions.STARROCKS_CONFIG,
StarRocksSinkOptions.ENABLE_UPSERT_DELETE,
StarRocksSinkOptions.SCHEMA_SAVE_MODE,
- StarRocksSinkOptions.DATA_SAVE_MODE,
+ DATA_SAVE_MODE,
MULTI_TABLE_SINK_REPLICA,
StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE,
StarRocksSinkOptions.HTTP_SOCKET_TIMEOUT_MS)
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSource.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSource.java
index 211a8b96fc..5b949ca434 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSource.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSource.java
@@ -23,8 +23,8 @@ import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.connectors.seatunnel.starrocks.config.CommonConfig;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksBaseOptions;
import java.util.Collections;
import java.util.List;
@@ -37,7 +37,7 @@ public class StarRocksSource
@Override
public String getPluginName() {
- return CommonConfig.CONNECTOR_IDENTITY;
+ return StarRocksBaseOptions.CONNECTOR_IDENTITY;
}
public StarRocksSource(SourceConfig sourceConfig, CatalogTable
catalogTable) {
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSourceFactory.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSourceFactory.java
index 1f5e3c1690..5d33ad2558 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSourceFactory.java
@@ -28,8 +28,9 @@ import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
-import org.apache.seatunnel.connectors.seatunnel.starrocks.config.CommonConfig;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksBaseOptions;
+import
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSourceOptions;
import com.google.auto.service.AutoService;
@@ -39,28 +40,28 @@ import java.io.Serializable;
public class StarRocksSourceFactory implements TableSourceFactory {
@Override
public String factoryIdentifier() {
- return CommonConfig.CONNECTOR_IDENTITY;
+ return StarRocksBaseOptions.CONNECTOR_IDENTITY;
}
@Override
public OptionRule optionRule() {
return OptionRule.builder()
.required(
- SourceConfig.NODE_URLS,
- SourceConfig.USERNAME,
- SourceConfig.PASSWORD,
- SourceConfig.DATABASE,
- SourceConfig.TABLE,
+ StarRocksSourceOptions.NODE_URLS,
+ StarRocksSourceOptions.USERNAME,
+ StarRocksSourceOptions.PASSWORD,
+ StarRocksSourceOptions.DATABASE,
+ StarRocksSourceOptions.TABLE,
TableSchemaOptions.SCHEMA)
.optional(
- SourceConfig.MAX_RETRIES,
- SourceConfig.QUERY_TABLET_SIZE,
- SourceConfig.SCAN_FILTER,
- SourceConfig.SCAN_MEM_LIMIT,
- SourceConfig.SCAN_QUERY_TIMEOUT_SEC,
- SourceConfig.SCAN_KEEP_ALIVE_MIN,
- SourceConfig.SCAN_BATCH_ROWS,
- SourceConfig.SCAN_CONNECT_TIMEOUT)
+ StarRocksSourceOptions.MAX_RETRIES,
+ StarRocksSourceOptions.QUERY_TABLET_SIZE,
+ StarRocksSourceOptions.SCAN_FILTER,
+ StarRocksSourceOptions.SCAN_MEM_LIMIT,
+ StarRocksSourceOptions.SCAN_QUERY_TIMEOUT_SEC,
+ StarRocksSourceOptions.SCAN_KEEP_ALIVE_MIN,
+ StarRocksSourceOptions.SCAN_BATCH_ROWS,
+ StarRocksSourceOptions.SCAN_CONNECT_TIMEOUT)
.build();
}