This is an automated email from the ASF dual-hosted git repository.
liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new cb7b794914 [Improve] [Connector-V2] Remove scheduler in StarRocks sink
(#5269)
cb7b794914 is described below
commit cb7b7949140cd9d6a90de8482e82be9be5513d18
Author: Guangdong Liu <[email protected]>
AuthorDate: Thu Aug 24 19:08:39 2023 +0800
[Improve] [Connector-V2] Remove scheduler in StarRocks sink (#5269)
* [Improve] [Connector-V2] Remove scheduler in StarRocks sink
* [Improve] [Connector-V2] Remove scheduler in InfluxDB sink
* format doc
* format doc
* format doc
---------
Co-authored-by: gdliu3 <[email protected]>
---
docs/en/connector-v2/sink/StarRocks.md | 35 +++++++++++-----------
.../starrocks/client/StarRocksSinkManager.java | 34 ---------------------
.../seatunnel/starrocks/config/SinkConfig.java | 3 --
.../starrocks/config/StarRocksSinkOptions.java | 11 ++-----
.../starrocks/sink/StarRocksSinkFactory.java | 1 -
5 files changed, 19 insertions(+), 65 deletions(-)
diff --git a/docs/en/connector-v2/sink/StarRocks.md
b/docs/en/connector-v2/sink/StarRocks.md
index 763743ce96..38893a429e 100644
--- a/docs/en/connector-v2/sink/StarRocks.md
+++ b/docs/en/connector-v2/sink/StarRocks.md
@@ -20,24 +20,23 @@ The internal implementation of StarRocks sink connector is
cached and imported b
## Sink Options
-| Name | Type | Required | Default |
Description
|
-|-----------------------------|---------|----------|-----------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| nodeUrls | list | yes | - |
`StarRocks` cluster address, the format is `["fe_ip:fe_http_port", ...]`
|
-| base-url | string | yes | - | The
JDBC URL like `jdbc:mysql://localhost:9030/` or `jdbc:mysql://localhost:9030`
or `jdbc:mysql://localhost:9030/db`
|
-| username | string | yes | - |
`StarRocks` user username
|
-| password | string | yes | - |
`StarRocks` user password
|
-| database | string | yes | - | The
name of StarRocks database
|
-| table | string | no | - | The
name of StarRocks table, If not set, the table name will be the name of the
upstream table
|
-| labelPrefix | string | no | - | The
prefix of StarRocks stream load label
|
-| batch_max_rows | long | no | 1024 | For
batch writing, when the number of buffers reaches the number of
`batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches
`batch_interval_ms`, the data will be flushed into the StarRocks |
-| batch_max_bytes | int | no | 5 * 1024 * 1024 | For
batch writing, when the number of buffers reaches the number of
`batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches
`batch_interval_ms`, the data will be flushed into the StarRocks |
-| batch_interval_ms | int | no | - | For
batch writing, when the number of buffers reaches the number of
`batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches
`batch_interval_ms`, the data will be flushed into the StarRocks |
-| max_retries | int | no | - | The
number of retries to flush failed
|
-| retry_backoff_multiplier_ms | int | no | - | Using
as a multiplier for generating the next delay for backoff
|
-| max_retry_backoff_ms | int | no | - | The
amount of time to wait before attempting to retry a request to `StarRocks`
|
-| enable_upsert_delete | boolean | no | false | Whether
to enable upsert/delete, only supports PrimaryKey model.
|
-| save_mode_create_template | string | no | see below | see
below
|
-| starrocks.config | map | no | - | The
parameter of the stream load `data_desc`
|
+| Name | Type | Required | Default |
Description
|
+|-----------------------------|---------|----------|-----------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| nodeUrls | list | yes | - |
`StarRocks` cluster address, the format is `["fe_ip:fe_http_port", ...]`
|
+| base-url | string | yes | - | The
JDBC URL like `jdbc:mysql://localhost:9030/` or `jdbc:mysql://localhost:9030`
or `jdbc:mysql://localhost:9030/db`
|
+| username | string | yes | - |
`StarRocks` user username
|
+| password | string | yes | - |
`StarRocks` user password
|
+| database | string | yes | - | The
name of StarRocks database
|
+| table | string | no | - | The
name of StarRocks table, If not set, the table name will be the name of the
upstream table
|
+| labelPrefix | string | no | - | The
prefix of StarRocks stream load label
|
+| batch_max_rows | long | no | 1024 | For
batch writing, when the number of buffers reaches the number of
`batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches
`checkpoint.interval`, the data will be flushed into the StarRocks |
+| batch_max_bytes | int | no | 5 * 1024 * 1024 | For
batch writing, when the number of buffers reaches the number of
`batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches
`checkpoint.interval`, the data will be flushed into the StarRocks |
+| max_retries | int | no | - | The
number of retries to flush failed
|
+| retry_backoff_multiplier_ms | int | no | - | Using
as a multiplier for generating the next delay for backoff
|
+| max_retry_backoff_ms | int | no | - | The
amount of time to wait before attempting to retry a request to `StarRocks`
|
+| enable_upsert_delete | boolean | no | false | Whether
to enable upsert/delete, only supports PrimaryKey model.
|
+| save_mode_create_template | string | no | see below | see
below
|
+| starrocks.config | map | no | - | The
parameter of the stream load `data_desc`
|
### save_mode_create_template
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java
index f4f37e584e..e7a1c8c2c5 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java
@@ -22,7 +22,6 @@ import
org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksCo
import
org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
import com.google.common.base.Strings;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
@@ -30,10 +29,6 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
@Slf4j
public class StarRocksSinkManager {
@@ -42,18 +37,14 @@ public class StarRocksSinkManager {
private final List<byte[]> batchList;
private final StarRocksStreamLoadVisitor starrocksStreamLoadVisitor;
- private ScheduledExecutorService scheduler;
- private ScheduledFuture<?> scheduledFuture;
private volatile boolean initialize;
private volatile Exception flushException;
private int batchRowCount = 0;
private long batchBytesSize = 0;
- private final Integer batchIntervalMs;
public StarRocksSinkManager(SinkConfig sinkConfig, List<String> fileNames)
{
this.sinkConfig = sinkConfig;
this.batchList = new ArrayList<>();
- this.batchIntervalMs = sinkConfig.getBatchIntervalMs();
starrocksStreamLoadVisitor = new
StarRocksStreamLoadVisitor(sinkConfig, fileNames);
}
@@ -62,26 +53,6 @@ public class StarRocksSinkManager {
return;
}
initialize = true;
-
- if (batchIntervalMs != null) {
- scheduler =
- Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder()
- .setNameFormat("StarRocks-sink-output-%s")
- .build());
- scheduledFuture =
- scheduler.scheduleAtFixedRate(
- () -> {
- try {
- flush();
- } catch (IOException e) {
- flushException = e;
- }
- },
- batchIntervalMs,
- batchIntervalMs,
- TimeUnit.MILLISECONDS);
- }
}
public synchronized void write(String record) throws IOException {
@@ -98,11 +69,6 @@ public class StarRocksSinkManager {
}
public synchronized void close() throws IOException {
- if (scheduledFuture != null) {
- scheduledFuture.cancel(false);
- scheduler.shutdown();
- }
-
flush();
}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
index f5a2d0dc88..c1709b6939 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
@@ -51,7 +51,6 @@ public class SinkConfig implements Serializable {
private int batchMaxSize;
private long batchMaxBytes;
- private Integer batchIntervalMs;
private int maxRetries;
private int retryBackoffMultiplierMs;
private int maxRetryBackoffMs;
@@ -74,8 +73,6 @@ public class SinkConfig implements Serializable {
config.getOptional(StarRocksSinkOptions.LABEL_PREFIX).ifPresent(sinkConfig::setLabelPrefix);
sinkConfig.setBatchMaxSize(config.get(StarRocksSinkOptions.BATCH_MAX_SIZE));
sinkConfig.setBatchMaxBytes(config.get(StarRocksSinkOptions.BATCH_MAX_BYTES));
- config.getOptional(StarRocksSinkOptions.BATCH_INTERVAL_MS)
- .ifPresent(sinkConfig::setBatchIntervalMs);
config.getOptional(StarRocksSinkOptions.MAX_RETRIES).ifPresent(sinkConfig::setMaxRetries);
config.getOptional(StarRocksSinkOptions.RETRY_BACKOFF_MULTIPLIER_MS)
.ifPresent(sinkConfig::setRetryBackoffMultiplierMs);
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
index 02918f0f96..eed2afc360 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
@@ -75,21 +75,14 @@ public interface StarRocksSinkOptions {
.intType()
.defaultValue(1024)
.withDescription(
- "For batch writing, when the number of buffers
reaches the number of batch_max_rows or the byte size of batch_max_bytes or the
time reaches batch_interval_ms, the data will be flushed into the StarRocks");
+ "For batch writing, when the number of buffers
reaches the number of batch_max_rows or the byte size of batch_max_bytes or the
time reaches checkpoint.interval, the data will be flushed into the StarRocks");
Option<Long> BATCH_MAX_BYTES =
Options.key("batch_max_bytes")
.longType()
.defaultValue((long) (5 * 1024 * 1024))
.withDescription(
- "For batch writing, when the number of buffers
reaches the number of batch_max_rows or the byte size of batch_max_bytes or the
time reaches batch_interval_ms, the data will be flushed into the StarRocks");
-
- Option<Integer> BATCH_INTERVAL_MS =
- Options.key("batch_interval_ms")
- .intType()
- .noDefaultValue()
- .withDescription(
- "For batch writing, when the number of buffers
reaches the number of batch_max_rows or the byte size of batch_max_bytes or the
time reaches batch_interval_ms, the data will be flushed into the StarRocks");
+ "For batch writing, when the number of buffers
reaches the number of batch_max_rows or the byte size of batch_max_bytes or the
time reaches checkpoint.interval, the data will be flushed into the StarRocks");
Option<Integer> MAX_RETRIES =
Options.key("max_retries")
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
index 471be7001b..c0159c5fd4 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
@@ -49,7 +49,6 @@ public class StarRocksSinkFactory implements TableSinkFactory
{
StarRocksSinkOptions.LABEL_PREFIX,
StarRocksSinkOptions.BATCH_MAX_SIZE,
StarRocksSinkOptions.BATCH_MAX_BYTES,
- StarRocksSinkOptions.BATCH_INTERVAL_MS,
StarRocksSinkOptions.MAX_RETRIES,
StarRocksSinkOptions.MAX_RETRY_BACKOFF_MS,
StarRocksSinkOptions.RETRY_BACKOFF_MULTIPLIER_MS,