This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/cdc-multiple-table by this
push:
new c485d887e [feature][starrocks] add StarRocks factories (#4191)
c485d887e is described below
commit c485d887ecae6c1a356e86e837313d262d2d96b5
Author: Zongwen Li <[email protected]>
AuthorDate: Thu Feb 23 10:50:05 2023 +0800
[feature][starrocks] add StarRocks factories (#4191)
---
.../StarRocksCatalogFactory.java} | 33 ++--
.../seatunnel/starrocks/config/SinkConfig.java | 195 +++------------------
.../starrocks/config/StarRocksOptions.java | 40 +++++
.../starrocks/config/StarRocksSinkOptions.java | 116 ++++++++++++
.../seatunnel/starrocks/sink/StarRocksSink.java | 36 ++--
.../starrocks/sink/StarRocksSinkFactory.java | 31 +++-
6 files changed, 245 insertions(+), 206 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogFactory.java
similarity index 54%
copy from
seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
copy to
seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogFactory.java
index 8863c3115..e17b8591a 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalogFactory.java
@@ -15,30 +15,35 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.starrocks.sink;
+package org.apache.seatunnel.connectors.seatunnel.starrocks.catalog;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
-import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableSinkFactory;
-import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksOptions;
-import com.google.auto.service.AutoService;
+public class StarRocksCatalogFactory implements CatalogFactory {
+ public static final String IDENTIFIER = "StarRocks";
+ @Override
+ public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
+ return new StarRocksCatalog(catalogName,
+ options.get(StarRocksOptions.USERNAME),
+ options.get(StarRocksOptions.PASSWORD),
+ options.get(StarRocksOptions.BASE_URL));
+ }
-@AutoService(Factory.class)
-public class StarRocksSinkFactory implements TableSinkFactory {
@Override
public String factoryIdentifier() {
- return "StarRocks";
+ return IDENTIFIER;
}
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(SinkConfig.NODE_URLS, SinkConfig.USERNAME,
SinkConfig.PASSWORD, SinkConfig.DATABASE, SinkConfig.QUERY_PORT)
- .optional(SinkConfig.TABLE, SinkConfig.LABEL_PREFIX,
SinkConfig.BATCH_MAX_SIZE, SinkConfig.BATCH_MAX_BYTES,
- SinkConfig.BATCH_INTERVAL_MS, SinkConfig.MAX_RETRIES,
SinkConfig.MAX_RETRY_BACKOFF_MS,
- SinkConfig.RETRY_BACKOFF_MULTIPLIER_MS,
SinkConfig.STARROCKS_CONFIG, SinkConfig.ENABLE_UPSERT_DELETE,
- SinkConfig.SAVE_MODE_CREATE_TEMPLATE)
- .build();
+ .required(StarRocksOptions.BASE_URL)
+ .required(StarRocksOptions.USERNAME)
+ .required(StarRocksOptions.PASSWORD)
+ .build();
}
}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
index 3a6212266..2095b9988 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
@@ -17,11 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.starrocks.config;
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import lombok.Getter;
import lombok.Setter;
@@ -37,108 +33,8 @@ import java.util.Map;
@ToString
public class SinkConfig implements Serializable {
- private static final int DEFAULT_BATCH_MAX_SIZE = 1024;
- private static final long DEFAULT_BATCH_BYTES = 5 * 1024 * 1024;
-
- private static final String LOAD_FORMAT = "format";
- private static final StreamLoadFormat DEFAULT_LOAD_FORMAT =
StreamLoadFormat.CSV;
- private static final String COLUMN_SEPARATOR = "column_separator";
-
- public static final Option<List<String>> NODE_URLS =
Options.key("nodeUrls")
- .listType()
- .noDefaultValue()
- .withDescription("StarRocks cluster address, the format is
[\"fe_ip:fe_http_port\", ...]");
-
- public static final Option<String> USERNAME = Options.key("username")
- .stringType()
- .noDefaultValue()
- .withDescription("StarRocks user username");
-
- public static final Option<String> PASSWORD = Options.key("password")
- .stringType()
- .noDefaultValue()
- .withDescription("StarRocks user password");
-
- public static final Option<String> LABEL_PREFIX =
Options.key("labelPrefix")
- .stringType()
- .noDefaultValue()
- .withDescription("The prefix of StarRocks stream load label");
-
- public static final Option<String> DATABASE = Options.key("database")
- .stringType()
- .noDefaultValue()
- .withDescription("The name of StarRocks database");
-
- public static final Option<String> TABLE = Options.key("table")
- .stringType()
- .noDefaultValue()
- .withDescription("The name of StarRocks table");
-
- public static final Option<Map<String, String>> STARROCKS_CONFIG =
Options.key("starrocks.config")
- .mapType()
- .noDefaultValue()
- .withDescription("The parameter of the stream load data_desc. " +
- "The way to specify the parameter is to add the original stream
load parameter into map");
-
- public static final Option<String> SAVE_MODE_CREATE_TEMPLATE =
Options.key("save_mode_create_template")
- .stringType()
- .defaultValue("CREATE TABLE IF NOT EXISTS
`${database}`.`${table_name}` (\n" +
- "${rowtype_fields}\n" +
- ") ENGINE=OLAP\n" +
- " PRIMARY KEY (${rowtype_primary_key})\n" +
- "DISTRIBUTED BY HASH (${rowtype_primary_key})" +
- "PROPERTIES (\n" +
- " \"replication_num\" = \"1\" \n" +
- ")").withDescription("Create table statement template, used to
create StarRocks table");
-
- public static final Option<String> QUERY_PORT = Options.key("query_port")
- .stringType()
- .defaultValue("9030")
- .withDescription("FE MySQL server port");
-
- public static final Option<Integer> BATCH_MAX_SIZE =
Options.key("batch_max_rows")
- .intType()
- .defaultValue(DEFAULT_BATCH_MAX_SIZE)
- .withDescription("For batch writing, when the number of buffers
reaches the number of batch_max_rows or the byte size of batch_max_bytes or the
time reaches batch_interval_ms, the data will be flushed into the StarRocks");
-
- public static final Option<Long> BATCH_MAX_BYTES =
Options.key("batch_max_bytes")
- .longType()
- .defaultValue(DEFAULT_BATCH_BYTES)
- .withDescription("For batch writing, when the number of buffers
reaches the number of batch_max_rows or the byte size of batch_max_bytes or the
time reaches batch_interval_ms, the data will be flushed into the StarRocks");
-
- public static final Option<Integer> BATCH_INTERVAL_MS =
Options.key("batch_interval_ms")
- .intType()
- .noDefaultValue()
- .withDescription("For batch writing, when the number of buffers
reaches the number of batch_max_rows or the byte size of batch_max_bytes or the
time reaches batch_interval_ms, the data will be flushed into the StarRocks");
-
- public static final Option<Integer> MAX_RETRIES =
Options.key("max_retries")
- .intType()
- .noDefaultValue()
- .withDescription("The number of retries to flush failed");
-
- public static final Option<Integer> RETRY_BACKOFF_MULTIPLIER_MS =
Options.key("retry_backoff_multiplier_ms")
- .intType()
- .noDefaultValue()
- .withDescription("Using as a multiplier for generating the next
delay for backoff");
-
- public static final Option<Integer> MAX_RETRY_BACKOFF_MS =
Options.key("max_retry_backoff_ms")
- .intType()
- .noDefaultValue()
- .withDescription("The amount of time to wait before attempting to
retry a request to StarRocks");
-
- public static final Option<Boolean> ENABLE_UPSERT_DELETE =
Options.key("enable_upsert_delete")
- .booleanType()
- .defaultValue(false)
- .withDescription("Whether to enable upsert/delete, only supports
PrimaryKey model.");
-
public enum StreamLoadFormat {
CSV, JSON;
- public static StreamLoadFormat parse(String format) {
- if (StreamLoadFormat.JSON.name().equals(format)) {
- return JSON;
- }
- return CSV;
- }
}
private List<String> nodeUrls;
@@ -149,10 +45,9 @@ public class SinkConfig implements Serializable {
private String table;
private String labelPrefix;
private String columnSeparator;
- private StreamLoadFormat loadFormat = DEFAULT_LOAD_FORMAT;
-
- private int batchMaxSize = DEFAULT_BATCH_MAX_SIZE;
- private long batchMaxBytes = DEFAULT_BATCH_BYTES;
+ private StreamLoadFormat loadFormat;
+ private int batchMaxSize;
+ private long batchMaxBytes;
private Integer batchIntervalMs;
private int maxRetries;
@@ -160,68 +55,34 @@ public class SinkConfig implements Serializable {
private int maxRetryBackoffMs;
private boolean enableUpsertDelete;
- private String saveModeCreateTemplate =
SAVE_MODE_CREATE_TEMPLATE.defaultValue();
+ private String saveModeCreateTemplate;
+ @Getter
private final Map<String, Object> streamLoadProps = new HashMap<>();
- public static SinkConfig loadConfig(Config pluginConfig) {
+ public static SinkConfig of(ReadonlyConfig config) {
SinkConfig sinkConfig = new SinkConfig();
- sinkConfig.setNodeUrls(pluginConfig.getStringList(NODE_URLS.key()));
- sinkConfig.setDatabase(pluginConfig.getString(DATABASE.key()));
- sinkConfig.setJdbcUrl("jdbc:mysql://" +
sinkConfig.getNodeUrls().get(0).split(":")[0] +
- ":" + pluginConfig.getString(QUERY_PORT.key()) + "/");
- if (pluginConfig.hasPath(USERNAME.key())) {
- sinkConfig.setUsername(pluginConfig.getString(USERNAME.key()));
- }
- if (pluginConfig.hasPath(TABLE.key())) {
- sinkConfig.setTable(pluginConfig.getString(TABLE.key()));
- }
- if (pluginConfig.hasPath(PASSWORD.key())) {
- sinkConfig.setPassword(pluginConfig.getString(PASSWORD.key()));
- }
- if (pluginConfig.hasPath(LABEL_PREFIX.key())) {
-
sinkConfig.setLabelPrefix(pluginConfig.getString(LABEL_PREFIX.key()));
- }
- if (pluginConfig.hasPath(BATCH_MAX_SIZE.key())) {
-
sinkConfig.setBatchMaxSize(pluginConfig.getInt(BATCH_MAX_SIZE.key()));
- }
- if (pluginConfig.hasPath(BATCH_MAX_BYTES.key())) {
-
sinkConfig.setBatchMaxBytes(pluginConfig.getLong(BATCH_MAX_BYTES.key()));
- }
- if (pluginConfig.hasPath(BATCH_INTERVAL_MS.key())) {
-
sinkConfig.setBatchIntervalMs(pluginConfig.getInt(BATCH_INTERVAL_MS.key()));
- }
- if (pluginConfig.hasPath(MAX_RETRIES.key())) {
- sinkConfig.setMaxRetries(pluginConfig.getInt(MAX_RETRIES.key()));
- }
- if (pluginConfig.hasPath(RETRY_BACKOFF_MULTIPLIER_MS.key())) {
-
sinkConfig.setRetryBackoffMultiplierMs(pluginConfig.getInt(RETRY_BACKOFF_MULTIPLIER_MS.key()));
- }
- if (pluginConfig.hasPath(MAX_RETRY_BACKOFF_MS.key())) {
-
sinkConfig.setMaxRetryBackoffMs(pluginConfig.getInt(MAX_RETRY_BACKOFF_MS.key()));
- }
- if (pluginConfig.hasPath(ENABLE_UPSERT_DELETE.key())) {
-
sinkConfig.setEnableUpsertDelete(pluginConfig.getBoolean(ENABLE_UPSERT_DELETE.key()));
- }
- if (pluginConfig.hasPath(SAVE_MODE_CREATE_TEMPLATE.key())) {
-
sinkConfig.setSaveModeCreateTemplate(pluginConfig.getString(SAVE_MODE_CREATE_TEMPLATE.key()));
- }
- parseSinkStreamLoadProperties(pluginConfig, sinkConfig);
- if (sinkConfig.streamLoadProps.containsKey(COLUMN_SEPARATOR)) {
- sinkConfig.setColumnSeparator((String)
sinkConfig.streamLoadProps.get(COLUMN_SEPARATOR));
- }
- if (sinkConfig.streamLoadProps.containsKey(LOAD_FORMAT)) {
- sinkConfig.setLoadFormat(StreamLoadFormat.parse((String)
sinkConfig.streamLoadProps.get(LOAD_FORMAT)));
- }
+ sinkConfig.setNodeUrls(config.get(StarRocksSinkOptions.NODE_URLS));
+ sinkConfig.setDatabase(config.get(StarRocksSinkOptions.DATABASE));
+ sinkConfig.setJdbcUrl(String.format("jdbc:mysql://%s:%d",
+ sinkConfig.getNodeUrls().get(0).split(":")[0],
+ config.get(StarRocksSinkOptions.QUERY_PORT)));
+
config.getOptional(StarRocksOptions.USERNAME).ifPresent(sinkConfig::setUsername);
+
config.getOptional(StarRocksOptions.PASSWORD).ifPresent(sinkConfig::setPassword);
+
config.getOptional(StarRocksSinkOptions.TABLE).ifPresent(sinkConfig::setTable);
+
config.getOptional(StarRocksSinkOptions.LABEL_PREFIX).ifPresent(sinkConfig::setLabelPrefix);
+
sinkConfig.setBatchMaxSize(config.get(StarRocksSinkOptions.BATCH_MAX_SIZE));
+
sinkConfig.setBatchMaxBytes(config.get(StarRocksSinkOptions.BATCH_MAX_BYTES));
+
config.getOptional(StarRocksSinkOptions.BATCH_INTERVAL_MS).ifPresent(sinkConfig::setBatchIntervalMs);
+
config.getOptional(StarRocksSinkOptions.MAX_RETRIES).ifPresent(sinkConfig::setMaxRetries);
+
config.getOptional(StarRocksSinkOptions.RETRY_BACKOFF_MULTIPLIER_MS).ifPresent(sinkConfig::setRetryBackoffMultiplierMs);
+
config.getOptional(StarRocksSinkOptions.MAX_RETRY_BACKOFF_MS).ifPresent(sinkConfig::setMaxRetryBackoffMs);
+
config.getOptional(StarRocksSinkOptions.ENABLE_UPSERT_DELETE).ifPresent(sinkConfig::setEnableUpsertDelete);
+
sinkConfig.setSaveModeCreateTemplate(config.get(StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE));
+
config.getOptional(StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE).ifPresent(sinkConfig::setSaveModeCreateTemplate);
+
config.getOptional(StarRocksSinkOptions.STARROCKS_CONFIG).ifPresent(options ->
sinkConfig.getStreamLoadProps().putAll(options));
+
config.getOptional(StarRocksSinkOptions.COLUMN_SEPARATOR).ifPresent(sinkConfig::setColumnSeparator);
+ sinkConfig.setLoadFormat(config.get(StarRocksSinkOptions.LOAD_FORMAT));
return sinkConfig;
}
-
- private static void parseSinkStreamLoadProperties(Config pluginConfig,
SinkConfig sinkConfig) {
- if (CheckConfigUtil.isValidParam(pluginConfig,
STARROCKS_CONFIG.key())) {
- pluginConfig.getObject(STARROCKS_CONFIG.key()).forEach((key,
value) -> {
- final String configKey = key.toLowerCase();
- sinkConfig.streamLoadProps.put(configKey, value.unwrapped());
- });
- }
- }
}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksOptions.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksOptions.java
new file mode 100644
index 000000000..766a05759
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksOptions.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.starrocks.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public interface StarRocksOptions {
+ Option<String> BASE_URL = Options.key("base-url")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("URL has to be without database, like
\"jdbc:mysql://localhost:5432/\" or" +
+ "\"jdbc:mysql://localhost:5432\" rather than
\"jdbc:mysql://localhost:5432/db\"");
+
+ Option<String> USERNAME = Options.key("username")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("StarRocks user username");
+
+ Option<String> PASSWORD = Options.key("password")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("StarRocks user password");
+
+}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
new file mode 100644
index 000000000..5a00b0e76
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.starrocks.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import
org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig.StreamLoadFormat;
+
+import java.util.List;
+import java.util.Map;
+
+@SuppressWarnings("MagicNumber")
+public interface StarRocksSinkOptions {
+ Option<List<String>> NODE_URLS = Options.key("nodeUrls")
+ .listType()
+ .noDefaultValue()
+ .withDescription("StarRocks cluster address, the format is
[\"fe_ip:fe_http_port\", ...]");
+
+ Option<String> LABEL_PREFIX = Options.key("labelPrefix")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The prefix of StarRocks stream load label");
+
+ Option<String> DATABASE = Options.key("database")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The name of StarRocks database");
+
+ Option<String> TABLE = Options.key("table")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The name of StarRocks table");
+
+
+ Option<String> SAVE_MODE_CREATE_TEMPLATE =
Options.key("save_mode_create_template")
+ .stringType()
+ .defaultValue("CREATE TABLE IF NOT EXISTS
`${database}`.`${table_name}` (\n" +
+ "${rowtype_fields}\n" +
+ ") ENGINE=OLAP\n" +
+ " PRIMARY KEY (${rowtype_primary_key})\n" +
+ "DISTRIBUTED BY HASH (${rowtype_primary_key})" +
+ "PROPERTIES (\n" +
+ " \"replication_num\" = \"1\" \n" +
+ ")").withDescription("Create table statement template, used to
create StarRocks table");
+
+ Option<Integer> QUERY_PORT = Options.key("query_port")
+ .intType()
+ .defaultValue(9030)
+ .withDescription("FE MySQL server port");
+
+ Option<Integer> BATCH_MAX_SIZE = Options.key("batch_max_rows")
+ .intType()
+ .defaultValue(1024)
+ .withDescription("For batch writing, when the number of buffers
reaches the number of batch_max_rows or the byte size of batch_max_bytes or the
time reaches batch_interval_ms, the data will be flushed into the StarRocks");
+
+ Option<Long> BATCH_MAX_BYTES = Options.key("batch_max_bytes")
+ .longType()
+ .defaultValue((long) (5 * 1024 * 1024))
+ .withDescription("For batch writing, when the number of buffers
reaches the number of batch_max_rows or the byte size of batch_max_bytes or the
time reaches batch_interval_ms, the data will be flushed into the StarRocks");
+
+ Option<Integer> BATCH_INTERVAL_MS = Options.key("batch_interval_ms")
+ .intType()
+ .noDefaultValue()
+ .withDescription("For batch writing, when the number of buffers
reaches the number of batch_max_rows or the byte size of batch_max_bytes or the
time reaches batch_interval_ms, the data will be flushed into the StarRocks");
+
+ Option<Integer> MAX_RETRIES = Options.key("max_retries")
+ .intType()
+ .noDefaultValue()
+ .withDescription("The number of retries to flush failed");
+
+ Option<Integer> RETRY_BACKOFF_MULTIPLIER_MS =
Options.key("retry_backoff_multiplier_ms")
+ .intType()
+ .noDefaultValue()
+ .withDescription("Using as a multiplier for generating the next delay
for backoff");
+
+ Option<Integer> MAX_RETRY_BACKOFF_MS = Options.key("max_retry_backoff_ms")
+ .intType()
+ .noDefaultValue()
+ .withDescription("The amount of time to wait before attempting to
retry a request to StarRocks");
+
+ Option<Boolean> ENABLE_UPSERT_DELETE = Options.key("enable_upsert_delete")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether to enable upsert/delete, only supports
PrimaryKey model.");
+
+ Option<Map<String, String>> STARROCKS_CONFIG =
Options.key("starrocks.config")
+ .mapType()
+ .noDefaultValue()
+ .withDescription("The parameter of the stream load data_desc. " +
+ "The way to specify the parameter is to add the original stream
load parameter into map");
+
+ Option<String> COLUMN_SEPARATOR =
Options.key("starrocks.config.column_separator")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("");
+
+ Option<StreamLoadFormat> LOAD_FORMAT =
Options.key("starrocks.config.format")
+ .enumType(StreamLoadFormat.class)
+ .defaultValue(StreamLoadFormat.CSV)
+ .withDescription("");
+}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
index c62300c46..19c0ce853 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
@@ -17,14 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.starrocks.sink;
-import static
org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig.DATABASE;
-import static
org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig.NODE_URLS;
-import static
org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig.PASSWORD;
-import static
org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig.TABLE;
-import static
org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig.USERNAME;
-
import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
@@ -34,14 +29,11 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import
org.apache.seatunnel.connectors.seatunnel.starrocks.catalog.StarRocksCatalog;
+import
org.apache.seatunnel.connectors.seatunnel.starrocks.catalog.StarRocksCatalogFactory;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
-import
org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -57,26 +49,30 @@ public class StarRocksSink extends
AbstractSimpleSink<SeaTunnelRow, Void> implem
private SeaTunnelRowType seaTunnelRowType;
private SinkConfig sinkConfig;
private DataSaveMode dataSaveMode;
+
+ public StarRocksSink(DataSaveMode dataSaveMode,
+ SinkConfig sinkConfig,
+ SeaTunnelRowType seaTunnelRowType) {
+ this.dataSaveMode = dataSaveMode;
+ this.sinkConfig = sinkConfig;
+ this.seaTunnelRowType = seaTunnelRowType;
+ }
+
@Override
public String getPluginName() {
- return "StarRocks";
+ return StarRocksCatalogFactory.IDENTIFIER;
}
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
- CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
NODE_URLS.key(), DATABASE.key(), TABLE.key(), USERNAME.key(), PASSWORD.key());
- if (!result.isSuccess()) {
- throw new
StarRocksConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format("PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SINK, result.getMsg()));
- }
+ ConfigValidator.of(ReadonlyConfig.fromConfig(pluginConfig))
+ .validate(new StarRocksCatalogFactory().optionRule());
// TODO get catalog Table
CatalogTable catalogTable = null;
- sinkConfig = SinkConfig.loadConfig(pluginConfig);
+ sinkConfig = SinkConfig.of(ReadonlyConfig.fromConfig(pluginConfig));
if (StringUtils.isEmpty(sinkConfig.getTable())) {
sinkConfig.setTable(catalogTable.getTableId().getTableName());
}
- sinkConfig.setTable(catalogTable.getTableId().getTableName());
dataSaveMode = DataSaveMode.KEEP_SCHEMA_AND_DATA;
}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
index 8863c3115..5bbd57142 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
@@ -18,11 +18,18 @@
package org.apache.seatunnel.connectors.seatunnel.starrocks.sink;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableFactoryContext;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksOptions;
+import
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSinkOptions;
import com.google.auto.service.AutoService;
+import org.apache.commons.lang3.StringUtils;
@AutoService(Factory.class)
public class StarRocksSinkFactory implements TableSinkFactory {
@@ -34,11 +41,25 @@ public class StarRocksSinkFactory implements
TableSinkFactory {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(SinkConfig.NODE_URLS, SinkConfig.USERNAME,
SinkConfig.PASSWORD, SinkConfig.DATABASE, SinkConfig.QUERY_PORT)
- .optional(SinkConfig.TABLE, SinkConfig.LABEL_PREFIX,
SinkConfig.BATCH_MAX_SIZE, SinkConfig.BATCH_MAX_BYTES,
- SinkConfig.BATCH_INTERVAL_MS, SinkConfig.MAX_RETRIES,
SinkConfig.MAX_RETRY_BACKOFF_MS,
- SinkConfig.RETRY_BACKOFF_MULTIPLIER_MS,
SinkConfig.STARROCKS_CONFIG, SinkConfig.ENABLE_UPSERT_DELETE,
- SinkConfig.SAVE_MODE_CREATE_TEMPLATE)
+ .required(StarRocksOptions.USERNAME, StarRocksOptions.PASSWORD)
+ .required(StarRocksSinkOptions.DATABASE,
StarRocksSinkOptions.QUERY_PORT)
+ .required(StarRocksSinkOptions.NODE_URLS)
+ .optional(StarRocksSinkOptions.TABLE,
StarRocksSinkOptions.LABEL_PREFIX, StarRocksSinkOptions.BATCH_MAX_SIZE,
StarRocksSinkOptions.BATCH_MAX_BYTES,
+ StarRocksSinkOptions.BATCH_INTERVAL_MS,
StarRocksSinkOptions.MAX_RETRIES, StarRocksSinkOptions.MAX_RETRY_BACKOFF_MS,
+ StarRocksSinkOptions.RETRY_BACKOFF_MULTIPLIER_MS,
StarRocksSinkOptions.STARROCKS_CONFIG,
StarRocksSinkOptions.ENABLE_UPSERT_DELETE,
+ StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE)
.build();
}
+
+ @Override
+ public TableSink createSink(TableFactoryContext context) {
+ SinkConfig sinkConfig = SinkConfig.of(context.getOptions());
+ CatalogTable catalogTable = context.getCatalogTable();
+ if (StringUtils.isBlank(sinkConfig.getTable())) {
+ sinkConfig.setTable(catalogTable.getTableId().getTableName());
+ }
+ return () -> new StarRocksSink(DataSaveMode.KEEP_SCHEMA_AND_DATA,
+ sinkConfig,
+ catalogTable.getTableSchema().toPhysicalRowDataType());
+ }
}