This is an automated email from the ASF dual-hosted git repository. healchow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 86ed2fd233bbb51ea6e21f2617017c3c1fd828e5 Author: Yizhou Yang <[email protected]> AuthorDate: Thu Feb 2 10:43:28 2023 +0800 [INLONG-7293][Sort] S3DirtySink flushes too quickly (#7290) --- .../inlong/sort/base/dirty/sink/s3/S3DirtySink.java | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java index b8f1f5f10..8e5cf23df 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java @@ -71,6 +71,7 @@ public class S3DirtySink<T> implements DirtySink<T> { private final DataType physicalRowDataType; private RowData.FieldGetter[] fieldGetters; private RowDataToJsonConverter converter; + private long lastExecutetime; private long batchBytes = 0L; private int size; private transient volatile boolean closed = false; @@ -124,7 +125,7 @@ public class S3DirtySink<T> implements DirtySink<T> { + "and the dirty data will be throw away in the future" + " because the option 'dirty.side-output.ignore-errors' is 'true'", dirtyData.getIdentifier()); } - if (valid() && !flushing) { + if (buffered() && valid() && !flushing) { flush(); } } @@ -134,6 +135,18 @@ public class S3DirtySink<T> implements DirtySink<T> { || batchBytes >= s3Options.getMaxBatchBytes(); } + private boolean buffered() { + // stash dirty data for at least a minute to avoid flushing too fast + if (lastExecutetime == 0) { + lastExecutetime = System.currentTimeMillis(); + return false; + } + if (System.currentTimeMillis() - lastExecutetime < s3Options.getBatchIntervalMs()) { + return false; + } + return true; + } + private void addBatch(DirtyData<T> dirtyData) throws IOException { readInNum.incrementAndGet(); String value; @@ -218,6 +231,7 @@ public class S3DirtySink<T> implements DirtySink<T> { */ public synchronized void flush() { flushing = true; + lastExecutetime = System.currentTimeMillis(); if (!hasRecords()) { flushing = false; return;
