This is an automated email from the ASF dual-hosted git repository.

liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new cb7b794914 [Improve] [Connector-V2] Remove scheduler in StarRocks sink 
(#5269)
cb7b794914 is described below

commit cb7b7949140cd9d6a90de8482e82be9be5513d18
Author: Guangdong Liu <[email protected]>
AuthorDate: Thu Aug 24 19:08:39 2023 +0800

    [Improve] [Connector-V2] Remove scheduler in StarRocks sink (#5269)
    
    * [Improve] [Connector-V2] Remove scheduler in StarRocks sink
    
    * [Improve] [Connector-V2] Remove scheduler in InfluxDB sink
    
    * format doc
    
    * format doc
    
    * format doc
    
    ---------
    
    Co-authored-by: gdliu3 <[email protected]>
---
 docs/en/connector-v2/sink/StarRocks.md             | 35 +++++++++++-----------
 .../starrocks/client/StarRocksSinkManager.java     | 34 ---------------------
 .../seatunnel/starrocks/config/SinkConfig.java     |  3 --
 .../starrocks/config/StarRocksSinkOptions.java     | 11 ++-----
 .../starrocks/sink/StarRocksSinkFactory.java       |  1 -
 5 files changed, 19 insertions(+), 65 deletions(-)

diff --git a/docs/en/connector-v2/sink/StarRocks.md 
b/docs/en/connector-v2/sink/StarRocks.md
index 763743ce96..38893a429e 100644
--- a/docs/en/connector-v2/sink/StarRocks.md
+++ b/docs/en/connector-v2/sink/StarRocks.md
@@ -20,24 +20,23 @@ The internal implementation of StarRocks sink connector is 
cached and imported b
 
 ## Sink Options
 
-|            Name             |  Type   | Required |     Default     |         
                                                                                
          Description                                                           
                                        |
-|-----------------------------|---------|----------|-----------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| nodeUrls                    | list    | yes      | -               | 
`StarRocks` cluster address, the format is `["fe_ip:fe_http_port", ...]`        
                                                                                
                                                |
-| base-url                    | string  | yes      | -               | The 
JDBC URL like `jdbc:mysql://localhost:9030/` or `jdbc:mysql://localhost:9030` 
or `jdbc:mysql://localhost:9030/db`                                             
                                              |
-| username                    | string  | yes      | -               | 
`StarRocks` user username                                                       
                                                                                
                                                |
-| password                    | string  | yes      | -               | 
`StarRocks` user password                                                       
                                                                                
                                                |
-| database                    | string  | yes      | -               | The 
name of StarRocks database                                                      
                                                                                
                                            |
-| table                       | string  | no       | -               | The 
name of StarRocks table, If not set, the table name will be the name of the 
upstream table                                                                  
                                                |
-| labelPrefix                 | string  | no       | -               | The 
prefix of StarRocks stream load label                                           
                                                                                
                                            |
-| batch_max_rows              | long    | no       | 1024            | For 
batch writing, when the number of buffers reaches the number of 
`batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches 
`batch_interval_ms`, the data will be flushed into the StarRocks |
-| batch_max_bytes             | int     | no       | 5 * 1024 * 1024 | For 
batch writing, when the number of buffers reaches the number of 
`batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches 
`batch_interval_ms`, the data will be flushed into the StarRocks |
-| batch_interval_ms           | int     | no       | -               | For 
batch writing, when the number of buffers reaches the number of 
`batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches 
`batch_interval_ms`, the data will be flushed into the StarRocks |
-| max_retries                 | int     | no       | -               | The 
number of retries to flush failed                                               
                                                                                
                                            |
-| retry_backoff_multiplier_ms | int     | no       | -               | Using 
as a multiplier for generating the next delay for backoff                       
                                                                                
                                          |
-| max_retry_backoff_ms        | int     | no       | -               | The 
amount of time to wait before attempting to retry a request to `StarRocks`      
                                                                                
                                            |
-| enable_upsert_delete        | boolean | no       | false           | Whether 
to enable upsert/delete, only supports PrimaryKey model.                        
                                                                                
                                        |
-| save_mode_create_template   | string  | no       | see below       | see 
below                                                                           
                                                                                
                                            |
-| starrocks.config            | map     | no       | -               | The 
parameter of the stream load `data_desc`                                        
                                                                                
                                            |
+|            Name             |  Type   | Required |     Default     |         
                                                                                
           Description                                                          
                                          |
+|-----------------------------|---------|----------|-----------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| nodeUrls                    | list    | yes      | -               | 
`StarRocks` cluster address, the format is `["fe_ip:fe_http_port", ...]`        
                                                                                
                                                  |
+| base-url                    | string  | yes      | -               | The 
JDBC URL like `jdbc:mysql://localhost:9030/` or `jdbc:mysql://localhost:9030` 
or `jdbc:mysql://localhost:9030/db`                                             
                                                |
+| username                    | string  | yes      | -               | 
`StarRocks` user username                                                       
                                                                                
                                                  |
+| password                    | string  | yes      | -               | 
`StarRocks` user password                                                       
                                                                                
                                                  |
+| database                    | string  | yes      | -               | The 
name of StarRocks database                                                      
                                                                                
                                              |
+| table                       | string  | no       | -               | The 
name of StarRocks table, If not set, the table name will be the name of the 
upstream table                                                                  
                                                  |
+| labelPrefix                 | string  | no       | -               | The 
prefix of StarRocks stream load label                                           
                                                                                
                                              |
+| batch_max_rows              | long    | no       | 1024            | For 
batch writing, when the number of buffers reaches the number of 
`batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches 
`checkpoint.interval`, the data will be flushed into the StarRocks |
+| batch_max_bytes             | int     | no       | 5 * 1024 * 1024 | For 
batch writing, when the number of buffers reaches the number of 
`batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches 
`checkpoint.interval`, the data will be flushed into the StarRocks |
+| max_retries                 | int     | no       | -               | The 
number of retries to flush failed                                               
                                                                                
                                              |
+| retry_backoff_multiplier_ms | int     | no       | -               | Using 
as a multiplier for generating the next delay for backoff                       
                                                                                
                                            |
+| max_retry_backoff_ms        | int     | no       | -               | The 
amount of time to wait before attempting to retry a request to `StarRocks`      
                                                                                
                                              |
+| enable_upsert_delete        | boolean | no       | false           | Whether 
to enable upsert/delete, only supports PrimaryKey model.                        
                                                                                
                                          |
+| save_mode_create_template   | string  | no       | see below       | see 
below                                                                           
                                                                                
                                              |
+| starrocks.config            | map     | no       | -               | The 
parameter of the stream load `data_desc`                                        
                                                                                
                                              |
 
 ### save_mode_create_template
 
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java
index f4f37e584e..e7a1c8c2c5 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java
@@ -22,7 +22,6 @@ import 
org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksCo
 import 
org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
 
 import com.google.common.base.Strings;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
@@ -30,10 +29,6 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
 
 @Slf4j
 public class StarRocksSinkManager {
@@ -42,18 +37,14 @@ public class StarRocksSinkManager {
     private final List<byte[]> batchList;
 
     private final StarRocksStreamLoadVisitor starrocksStreamLoadVisitor;
-    private ScheduledExecutorService scheduler;
-    private ScheduledFuture<?> scheduledFuture;
     private volatile boolean initialize;
     private volatile Exception flushException;
     private int batchRowCount = 0;
     private long batchBytesSize = 0;
-    private final Integer batchIntervalMs;
 
     public StarRocksSinkManager(SinkConfig sinkConfig, List<String> fileNames) 
{
         this.sinkConfig = sinkConfig;
         this.batchList = new ArrayList<>();
-        this.batchIntervalMs = sinkConfig.getBatchIntervalMs();
         starrocksStreamLoadVisitor = new 
StarRocksStreamLoadVisitor(sinkConfig, fileNames);
     }
 
@@ -62,26 +53,6 @@ public class StarRocksSinkManager {
             return;
         }
         initialize = true;
-
-        if (batchIntervalMs != null) {
-            scheduler =
-                    Executors.newSingleThreadScheduledExecutor(
-                            new ThreadFactoryBuilder()
-                                    .setNameFormat("StarRocks-sink-output-%s")
-                                    .build());
-            scheduledFuture =
-                    scheduler.scheduleAtFixedRate(
-                            () -> {
-                                try {
-                                    flush();
-                                } catch (IOException e) {
-                                    flushException = e;
-                                }
-                            },
-                            batchIntervalMs,
-                            batchIntervalMs,
-                            TimeUnit.MILLISECONDS);
-        }
     }
 
     public synchronized void write(String record) throws IOException {
@@ -98,11 +69,6 @@ public class StarRocksSinkManager {
     }
 
     public synchronized void close() throws IOException {
-        if (scheduledFuture != null) {
-            scheduledFuture.cancel(false);
-            scheduler.shutdown();
-        }
-
         flush();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
index f5a2d0dc88..c1709b6939 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
@@ -51,7 +51,6 @@ public class SinkConfig implements Serializable {
     private int batchMaxSize;
     private long batchMaxBytes;
 
-    private Integer batchIntervalMs;
     private int maxRetries;
     private int retryBackoffMultiplierMs;
     private int maxRetryBackoffMs;
@@ -74,8 +73,6 @@ public class SinkConfig implements Serializable {
         
config.getOptional(StarRocksSinkOptions.LABEL_PREFIX).ifPresent(sinkConfig::setLabelPrefix);
         
sinkConfig.setBatchMaxSize(config.get(StarRocksSinkOptions.BATCH_MAX_SIZE));
         
sinkConfig.setBatchMaxBytes(config.get(StarRocksSinkOptions.BATCH_MAX_BYTES));
-        config.getOptional(StarRocksSinkOptions.BATCH_INTERVAL_MS)
-                .ifPresent(sinkConfig::setBatchIntervalMs);
         
config.getOptional(StarRocksSinkOptions.MAX_RETRIES).ifPresent(sinkConfig::setMaxRetries);
         config.getOptional(StarRocksSinkOptions.RETRY_BACKOFF_MULTIPLIER_MS)
                 .ifPresent(sinkConfig::setRetryBackoffMultiplierMs);
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
index 02918f0f96..eed2afc360 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
@@ -75,21 +75,14 @@ public interface StarRocksSinkOptions {
                     .intType()
                     .defaultValue(1024)
                     .withDescription(
-                            "For batch writing, when the number of buffers 
reaches the number of batch_max_rows or the byte size of batch_max_bytes or the 
time reaches batch_interval_ms, the data will be flushed into the StarRocks");
+                            "For batch writing, when the number of buffers 
reaches the number of batch_max_rows or the byte size of batch_max_bytes or the 
time reaches checkpoint.interval, the data will be flushed into the StarRocks");
 
     Option<Long> BATCH_MAX_BYTES =
             Options.key("batch_max_bytes")
                     .longType()
                     .defaultValue((long) (5 * 1024 * 1024))
                     .withDescription(
-                            "For batch writing, when the number of buffers 
reaches the number of batch_max_rows or the byte size of batch_max_bytes or the 
time reaches batch_interval_ms, the data will be flushed into the StarRocks");
-
-    Option<Integer> BATCH_INTERVAL_MS =
-            Options.key("batch_interval_ms")
-                    .intType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "For batch writing, when the number of buffers 
reaches the number of batch_max_rows or the byte size of batch_max_bytes or the 
time reaches batch_interval_ms, the data will be flushed into the StarRocks");
+                            "For batch writing, when the number of buffers 
reaches the number of batch_max_rows or the byte size of batch_max_bytes or the 
time reaches checkpoint.interval, the data will be flushed into the StarRocks");
 
     Option<Integer> MAX_RETRIES =
             Options.key("max_retries")
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
index 471be7001b..c0159c5fd4 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
@@ -49,7 +49,6 @@ public class StarRocksSinkFactory implements TableSinkFactory 
{
                         StarRocksSinkOptions.LABEL_PREFIX,
                         StarRocksSinkOptions.BATCH_MAX_SIZE,
                         StarRocksSinkOptions.BATCH_MAX_BYTES,
-                        StarRocksSinkOptions.BATCH_INTERVAL_MS,
                         StarRocksSinkOptions.MAX_RETRIES,
                         StarRocksSinkOptions.MAX_RETRY_BACKOFF_MS,
                         StarRocksSinkOptions.RETRY_BACKOFF_MULTIPLIER_MS,

Reply via email to