This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new efc116727 [INLONG-7292][Sort] S3DirtySink flushes too quickly (#7290)
efc116727 is described below
commit efc116727ce3058ae3eb22a5d2651425c4125e87
Author: Yizhou Yang <[email protected]>
AuthorDate: Thu Feb 2 10:43:28 2023 +0800
[INLONG-7292][Sort] S3DirtySink flushes too quickly (#7290)
---
.../inlong/sort/base/dirty/sink/s3/S3DirtySink.java | 16 +++++++++++++++-
1 file changed, 15 insertions(+), 1 deletion(-)
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java
index b8f1f5f10..8e5cf23df 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java
@@ -71,6 +71,7 @@ public class S3DirtySink<T> implements DirtySink<T> {
private final DataType physicalRowDataType;
private RowData.FieldGetter[] fieldGetters;
private RowDataToJsonConverter converter;
+ private long lastExecutetime;
private long batchBytes = 0L;
private int size;
private transient volatile boolean closed = false;
@@ -124,7 +125,7 @@ public class S3DirtySink<T> implements DirtySink<T> {
+ "and the dirty data will be throw away in the future"
+ " because the option 'dirty.side-output.ignore-errors'
is 'true'", dirtyData.getIdentifier());
}
- if (valid() && !flushing) {
+ if (buffered() && valid() && !flushing) {
flush();
}
}
@@ -134,6 +135,18 @@ public class S3DirtySink<T> implements DirtySink<T> {
|| batchBytes >= s3Options.getMaxBatchBytes();
}
+ private boolean buffered() {
+ // stash dirty data for at least a minute to avoid flushing too fast
+ if (lastExecutetime == 0) {
+ lastExecutetime = System.currentTimeMillis();
+ return false;
+ }
+ if (System.currentTimeMillis() - lastExecutetime <
s3Options.getBatchIntervalMs()) {
+ return false;
+ }
+ return true;
+ }
+
private void addBatch(DirtyData<T> dirtyData) throws IOException {
readInNum.incrementAndGet();
String value;
@@ -218,6 +231,7 @@ public class S3DirtySink<T> implements DirtySink<T> {
*/
public synchronized void flush() {
flushing = true;
+ lastExecutetime = System.currentTimeMillis();
if (!hasRecords()) {
flushing = false;
return;