This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8d6b07e466 [Improve] [Connector-V2] Remove scheduler in Tablestore 
sink (#5272)
8d6b07e466 is described below

commit 8d6b07e466ea46451cdb7a06e7999e4dd714f62f
Author: Guangdong Liu <[email protected]>
AuthorDate: Mon Aug 14 17:44:10 2023 +0800

    [Improve] [Connector-V2] Remove scheduler in Tablestore sink (#5272)
    
    
    ---------
    
    Co-authored-by: gdliu3 <[email protected]>
---
 docs/en/connector-v2/sink/Tablestore.md            |  1 -
 .../tablestore/config/TablestoreConfig.java        |  5 ----
 .../tablestore/config/TablestoreOptions.java       |  5 ----
 .../tablestore/sink/TablestoreSinkClient.java      | 31 +---------------------
 .../tablestore/sink/TablestoreSinkFactory.java     |  3 +--
 .../tablestore/sink/TablestoreWriter.java          |  7 +++++
 6 files changed, 9 insertions(+), 43 deletions(-)

diff --git a/docs/en/connector-v2/sink/Tablestore.md 
b/docs/en/connector-v2/sink/Tablestore.md
index ed59895c65..8f161ad25f 100644
--- a/docs/en/connector-v2/sink/Tablestore.md
+++ b/docs/en/connector-v2/sink/Tablestore.md
@@ -21,7 +21,6 @@ Write data to `Tablestore`
 | table             | string | yes      | -             |
 | primary_keys      | array  | yes      | -             |
 | batch_size        | string | no       | 25            |
-| batch_interval_ms | string | no       | 1000          |
 | common-options    | config | no       | -             |
 
 ### end_point [string]
diff --git 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java
 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java
index f64eb8473b..3e1714c551 100644
--- 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java
+++ 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java
@@ -50,11 +50,6 @@ public class TablestoreConfig implements Serializable {
                     .stringType()
                     .defaultValue("25")
                     .withDescription(" Tablestore batch_size");
-    public static final Option<String> BATCH_INTERVAL_MS =
-            Options.key("batch_interval_ms")
-                    .stringType()
-                    .defaultValue("1000")
-                    .withDescription(" Tablestore batch_interval_ms");
     public static final Option<String> PRIMARY_KEYS =
             Options.key("primary_keys")
                     .stringType()
diff --git 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java
 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java
index ba6c008939..7b2aa6bae6 100644
--- 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java
+++ 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java
@@ -25,7 +25,6 @@ import lombok.Data;
 import java.io.Serializable;
 import java.util.List;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.BATCH_INTERVAL_MS;
 import static 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.BATCH_SIZE;
 
 @Data
@@ -45,7 +44,6 @@ public class TablestoreOptions implements Serializable {
     private List<String> primaryKeys;
 
     public int batchSize = Integer.parseInt(BATCH_SIZE.defaultValue());
-    public int batchIntervalMs = 
Integer.parseInt(BATCH_INTERVAL_MS.defaultValue());
 
     public TablestoreOptions(Config config) {
         this.endpoint = config.getString(TablestoreConfig.END_POINT.key());
@@ -58,8 +56,5 @@ public class TablestoreOptions implements Serializable {
         if (config.hasPath(BATCH_SIZE.key())) {
             this.batchSize = config.getInt(BATCH_SIZE.key());
         }
-        if (config.hasPath(TablestoreConfig.BATCH_INTERVAL_MS.key())) {
-            this.batchIntervalMs = 
config.getInt(TablestoreConfig.BATCH_INTERVAL_MS.key());
-        }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkClient.java
 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkClient.java
index e3b6f2fbdf..0637b9b038 100644
--- 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkClient.java
+++ 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkClient.java
@@ -27,22 +27,15 @@ import com.alicloud.openservices.tablestore.SyncClient;
 import com.alicloud.openservices.tablestore.model.BatchWriteRowRequest;
 import com.alicloud.openservices.tablestore.model.BatchWriteRowResponse;
 import com.alicloud.openservices.tablestore.model.RowPutChange;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
 
 @Slf4j
 public class TablestoreSinkClient {
     private final TablestoreOptions tablestoreOptions;
-    private ScheduledExecutorService scheduler;
-    private ScheduledFuture<?> scheduledFuture;
     private volatile boolean initialize;
     private volatile Exception flushException;
     private SyncClient syncClient;
@@ -64,24 +57,6 @@ public class TablestoreSinkClient {
                         tablestoreOptions.getAccessKeySecret(),
                         tablestoreOptions.getInstanceName());
 
-        scheduler =
-                Executors.newSingleThreadScheduledExecutor(
-                        new ThreadFactoryBuilder()
-                                .setNameFormat("Tablestore-sink-output-%s")
-                                .build());
-        scheduledFuture =
-                scheduler.scheduleAtFixedRate(
-                        () -> {
-                            try {
-                                flush();
-                            } catch (IOException e) {
-                                flushException = e;
-                            }
-                        },
-                        tablestoreOptions.getBatchIntervalMs(),
-                        tablestoreOptions.getBatchIntervalMs(),
-                        TimeUnit.MILLISECONDS);
-
         initialize = true;
     }
 
@@ -96,17 +71,13 @@ public class TablestoreSinkClient {
     }
 
     public void close() throws IOException {
-        if (scheduledFuture != null) {
-            scheduledFuture.cancel(false);
-            scheduler.shutdown();
-        }
         if (syncClient != null) {
             flush();
             syncClient.shutdown();
         }
     }
 
-    synchronized void flush() throws IOException {
+    synchronized void flush() {
         checkFlushException();
         if (batchList.isEmpty()) {
             return;
diff --git 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkFactory.java
 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkFactory.java
index efe39a08c4..674f641ad6 100644
--- 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkFactory.java
@@ -26,7 +26,6 @@ import com.google.auto.service.AutoService;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.ACCESS_KEY_ID;
 import static 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.ACCESS_KEY_SECRET;
-import static 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.BATCH_INTERVAL_MS;
 import static 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.BATCH_SIZE;
 import static 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.END_POINT;
 import static 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.INSTANCE_NAME;
@@ -51,7 +50,7 @@ public class TablestoreSinkFactory implements 
TableSinkFactory {
                         ACCESS_KEY_SECRET,
                         PRIMARY_KEYS,
                         CatalogTableUtil.SCHEMA)
-                .optional(BATCH_INTERVAL_MS, BATCH_SIZE)
+                .optional(BATCH_SIZE)
                 .build();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreWriter.java
 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreWriter.java
index 929a421f7f..22bfe1be27 100644
--- 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreWriter.java
+++ 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreWriter.java
@@ -25,6 +25,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.tablestore.serialize.DefaultSea
 import 
org.apache.seatunnel.connectors.seatunnel.tablestore.serialize.SeaTunnelRowSerializer;
 
 import java.io.IOException;
+import java.util.Optional;
 
 public class TablestoreWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
 
@@ -46,4 +47,10 @@ public class TablestoreWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void> {
     public void close() throws IOException {
         tablestoreSinkClient.close();
     }
+
+    @Override
+    public Optional<Void> prepareCommit() {
+        tablestoreSinkClient.flush();
+        return super.prepareCommit();
+    }
 }

Reply via email to