Hisoka-X commented on code in PR #4349:
URL: 
https://github.com/apache/incubator-seatunnel/pull/4349#discussion_r1136460656


##########
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java:
##########
@@ -89,11 +104,50 @@ public void write(SeaTunnelRow element) {
 
         String indexRequestRow = seaTunnelRowSerializer.serializeRow(element);
         requestEsList.add(indexRequestRow);
+        // Initialize the interval flush
+        tryOpen();
         if (requestEsList.size() >= maxBatchSize) {
+            log.info("Batch write completion row :" + requestEsList.size());
             bulkEsWithRetry(this.esRestClient, this.requestEsList);
         }
     }
 
+    private void tryOpen() {
+        if (!isOpen) {
+            isOpen = true;
+            open();
+        }
+    }
+
+    public void open() {
+        this.scheduler =
+                Executors.newScheduledThreadPool(
+                        1,
+                        runnable -> {
+                            AtomicInteger cnt = new AtomicInteger(0);
+                            Thread thread = new Thread(runnable);
+                            thread.setDaemon(true);
+                            thread.setName(
+                                    "sink-elasticsearch-interval" + "-" + 
cnt.incrementAndGet());
+                            return thread;
+                        });
+        this.scheduledFuture =
+                this.scheduler.scheduleWithFixedDelay(
+                        () -> {
+                            synchronized (ElasticsearchSinkWriter.this) {
+                                if (requestEsList.size() > 0 && !isClose) {
+                                    log.info(

Review Comment:
   Move log into `bulkEsWithRetry`



##########
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java:
##########
@@ -54,21 +59,31 @@
 
     private final int maxBatchSize;
 
+    private final int batchIntervalMs;
+
     private final SeaTunnelRowSerializer seaTunnelRowSerializer;
     private final List<String> requestEsList;
     private EsRestClient esRestClient;
     private RetryMaterial retryMaterial;
     private static final long DEFAULT_SLEEP_TIME_MS = 200L;
 
+    private transient ScheduledExecutorService scheduler;
+    private transient ScheduledFuture<?> scheduledFuture;
+    // Whether pre-initialization is required
+    private transient boolean isOpen;

Review Comment:
   This field is not necessary



##########
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java:
##########
@@ -89,11 +104,50 @@ public void write(SeaTunnelRow element) {
 
         String indexRequestRow = seaTunnelRowSerializer.serializeRow(element);
         requestEsList.add(indexRequestRow);
+        // Initialize the interval flush
+        tryOpen();
         if (requestEsList.size() >= maxBatchSize) {
+            log.info("Batch write completion row :" + requestEsList.size());

Review Comment:
   Move log into `bulkEsWithRetry`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to