This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 86ed2fd233bbb51ea6e21f2617017c3c1fd828e5
Author: Yizhou Yang <[email protected]>
AuthorDate: Thu Feb 2 10:43:28 2023 +0800

    [INLONG-7293][Sort] S3DirtySink flushes too quickly (#7290)
---
 .../inlong/sort/base/dirty/sink/s3/S3DirtySink.java      | 16 +++++++++++++++-
 1 file changed, 15 insertions(+), 1 deletion(-)

diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java
index b8f1f5f10..8e5cf23df 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java
@@ -71,6 +71,7 @@ public class S3DirtySink<T> implements DirtySink<T> {
     private final DataType physicalRowDataType;
     private RowData.FieldGetter[] fieldGetters;
     private RowDataToJsonConverter converter;
+    private long lastExecutetime;
     private long batchBytes = 0L;
     private int size;
     private transient volatile boolean closed = false;
@@ -124,7 +125,7 @@ public class S3DirtySink<T> implements DirtySink<T> {
                     + "and the dirty data will be throw away in the future"
                     + " because the option 'dirty.side-output.ignore-errors' 
is 'true'", dirtyData.getIdentifier());
         }
-        if (valid() && !flushing) {
+        if (buffered() && valid() && !flushing) {
             flush();
         }
     }
@@ -134,6 +135,18 @@ public class S3DirtySink<T> implements DirtySink<T> {
                 || batchBytes >= s3Options.getMaxBatchBytes();
     }
 
+    private boolean buffered() {
+        // stash dirty data for at least a minute to avoid flushing too fast
+        if (lastExecutetime == 0) {
+            lastExecutetime = System.currentTimeMillis();
+            return false;
+        }
+        if (System.currentTimeMillis() - lastExecutetime < 
s3Options.getBatchIntervalMs()) {
+            return false;
+        }
+        return true;
+    }
+
     private void addBatch(DirtyData<T> dirtyData) throws IOException {
         readInNum.incrementAndGet();
         String value;
@@ -218,6 +231,7 @@ public class S3DirtySink<T> implements DirtySink<T> {
      */
     public synchronized void flush() {
         flushing = true;
+        lastExecutetime = System.currentTimeMillis();
         if (!hasRecords()) {
             flushing = false;
             return;

Reply via email to