This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 8d6b07e466 [Improve] [Connector-V2] Remove scheduler in Tablestore
sink (#5272)
8d6b07e466 is described below
commit 8d6b07e466ea46451cdb7a06e7999e4dd714f62f
Author: Guangdong Liu <[email protected]>
AuthorDate: Mon Aug 14 17:44:10 2023 +0800
[Improve] [Connector-V2] Remove scheduler in Tablestore sink (#5272)
---------
Co-authored-by: gdliu3 <[email protected]>
---
docs/en/connector-v2/sink/Tablestore.md | 1 -
.../tablestore/config/TablestoreConfig.java | 5 ----
.../tablestore/config/TablestoreOptions.java | 5 ----
.../tablestore/sink/TablestoreSinkClient.java | 31 +---------------------
.../tablestore/sink/TablestoreSinkFactory.java | 3 +--
.../tablestore/sink/TablestoreWriter.java | 7 +++++
6 files changed, 9 insertions(+), 43 deletions(-)
diff --git a/docs/en/connector-v2/sink/Tablestore.md
b/docs/en/connector-v2/sink/Tablestore.md
index ed59895c65..8f161ad25f 100644
--- a/docs/en/connector-v2/sink/Tablestore.md
+++ b/docs/en/connector-v2/sink/Tablestore.md
@@ -21,7 +21,6 @@ Write data to `Tablestore`
| table | string | yes | - |
| primary_keys | array | yes | - |
| batch_size | string | no | 25 |
-| batch_interval_ms | string | no | 1000 |
| common-options | config | no | - |
### end_point [string]
diff --git
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java
index f64eb8473b..3e1714c551 100644
---
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java
+++
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java
@@ -50,11 +50,6 @@ public class TablestoreConfig implements Serializable {
.stringType()
.defaultValue("25")
.withDescription(" Tablestore batch_size");
- public static final Option<String> BATCH_INTERVAL_MS =
- Options.key("batch_interval_ms")
- .stringType()
- .defaultValue("1000")
- .withDescription(" Tablestore batch_interval_ms");
public static final Option<String> PRIMARY_KEYS =
Options.key("primary_keys")
.stringType()
diff --git
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java
index ba6c008939..7b2aa6bae6 100644
---
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java
+++
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java
@@ -25,7 +25,6 @@ import lombok.Data;
import java.io.Serializable;
import java.util.List;
-import static
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.BATCH_INTERVAL_MS;
import static
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.BATCH_SIZE;
@Data
@@ -45,7 +44,6 @@ public class TablestoreOptions implements Serializable {
private List<String> primaryKeys;
public int batchSize = Integer.parseInt(BATCH_SIZE.defaultValue());
- public int batchIntervalMs =
Integer.parseInt(BATCH_INTERVAL_MS.defaultValue());
public TablestoreOptions(Config config) {
this.endpoint = config.getString(TablestoreConfig.END_POINT.key());
@@ -58,8 +56,5 @@ public class TablestoreOptions implements Serializable {
if (config.hasPath(BATCH_SIZE.key())) {
this.batchSize = config.getInt(BATCH_SIZE.key());
}
- if (config.hasPath(TablestoreConfig.BATCH_INTERVAL_MS.key())) {
- this.batchIntervalMs =
config.getInt(TablestoreConfig.BATCH_INTERVAL_MS.key());
- }
}
}
diff --git
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkClient.java
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkClient.java
index e3b6f2fbdf..0637b9b038 100644
---
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkClient.java
+++
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkClient.java
@@ -27,22 +27,15 @@ import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.model.BatchWriteRowRequest;
import com.alicloud.openservices.tablestore.model.BatchWriteRowResponse;
import com.alicloud.openservices.tablestore.model.RowPutChange;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
@Slf4j
public class TablestoreSinkClient {
private final TablestoreOptions tablestoreOptions;
- private ScheduledExecutorService scheduler;
- private ScheduledFuture<?> scheduledFuture;
private volatile boolean initialize;
private volatile Exception flushException;
private SyncClient syncClient;
@@ -64,24 +57,6 @@ public class TablestoreSinkClient {
tablestoreOptions.getAccessKeySecret(),
tablestoreOptions.getInstanceName());
- scheduler =
- Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder()
- .setNameFormat("Tablestore-sink-output-%s")
- .build());
- scheduledFuture =
- scheduler.scheduleAtFixedRate(
- () -> {
- try {
- flush();
- } catch (IOException e) {
- flushException = e;
- }
- },
- tablestoreOptions.getBatchIntervalMs(),
- tablestoreOptions.getBatchIntervalMs(),
- TimeUnit.MILLISECONDS);
-
initialize = true;
}
@@ -96,17 +71,13 @@ public class TablestoreSinkClient {
}
public void close() throws IOException {
- if (scheduledFuture != null) {
- scheduledFuture.cancel(false);
- scheduler.shutdown();
- }
if (syncClient != null) {
flush();
syncClient.shutdown();
}
}
- synchronized void flush() throws IOException {
+ synchronized void flush() {
checkFlushException();
if (batchList.isEmpty()) {
return;
diff --git
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkFactory.java
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkFactory.java
index efe39a08c4..674f641ad6 100644
---
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkFactory.java
@@ -26,7 +26,6 @@ import com.google.auto.service.AutoService;
import static
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.ACCESS_KEY_ID;
import static
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.ACCESS_KEY_SECRET;
-import static
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.BATCH_INTERVAL_MS;
import static
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.BATCH_SIZE;
import static
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.END_POINT;
import static
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.INSTANCE_NAME;
@@ -51,7 +50,7 @@ public class TablestoreSinkFactory implements
TableSinkFactory {
ACCESS_KEY_SECRET,
PRIMARY_KEYS,
CatalogTableUtil.SCHEMA)
- .optional(BATCH_INTERVAL_MS, BATCH_SIZE)
+ .optional(BATCH_SIZE)
.build();
}
}
diff --git
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreWriter.java
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreWriter.java
index 929a421f7f..22bfe1be27 100644
---
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreWriter.java
+++
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreWriter.java
@@ -25,6 +25,7 @@ import
org.apache.seatunnel.connectors.seatunnel.tablestore.serialize.DefaultSea
import
org.apache.seatunnel.connectors.seatunnel.tablestore.serialize.SeaTunnelRowSerializer;
import java.io.IOException;
+import java.util.Optional;
public class TablestoreWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
@@ -46,4 +47,10 @@ public class TablestoreWriter extends
AbstractSinkWriter<SeaTunnelRow, Void> {
public void close() throws IOException {
tablestoreSinkClient.close();
}
+
+ @Override
+ public Optional<Void> prepareCommit() {
+ tablestoreSinkClient.flush();
+ return super.prepareCommit();
+ }
}