This is an automated email from the ASF dual-hosted git repository.
garyli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c4a2ad2 [HUDI-1954] only reset bucket when flush bucket success
(#3029)
c4a2ad2 is described below
commit c4a2ad2702c28fc03392a917a878570af40062db
Author: yuzhaojing <[email protected]>
AuthorDate: Sat Jun 5 11:48:08 2021 +0800
[HUDI-1954] only reset bucket when flush bucket success (#3029)
Co-authored-by: 喻兆靖 <[email protected]>
---
.../org/apache/hudi/sink/StreamWriteFunction.java | 26 +++++++++++++---------
1 file changed, 15 insertions(+), 11 deletions(-)
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index 7679f2c..c49f7f1 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -470,30 +470,34 @@ public class StreamWriteFunction<K, I, O>
boolean flushBucket = bucket.detector.detect(item);
boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
if (flushBucket) {
- flushBucket(bucket);
- this.tracer.countDown(bucket.detector.totalSize);
- bucket.reset();
+ if (flushBucket(bucket)) {
+ this.tracer.countDown(bucket.detector.totalSize);
+ bucket.reset();
+ }
} else if (flushBuffer) {
// find the max size bucket and flush it out
List<DataBucket> sortedBuckets = this.buckets.values().stream()
.sorted((b1, b2) -> Long.compare(b2.detector.totalSize,
b1.detector.totalSize))
.collect(Collectors.toList());
final DataBucket bucketToFlush = sortedBuckets.get(0);
- flushBucket(bucketToFlush);
- this.tracer.countDown(bucketToFlush.detector.totalSize);
- bucketToFlush.reset();
+ if (flushBucket(bucketToFlush)) {
+ this.tracer.countDown(bucketToFlush.detector.totalSize);
+ bucketToFlush.reset();
+ } else {
+ LOG.warn("The buffer size hits the threshold {}, but still flush the
max size data bucket failed!", this.tracer.maxBufferSize);
+ }
}
bucket.records.add(item);
}
@SuppressWarnings("unchecked, rawtypes")
- private void flushBucket(DataBucket bucket) {
+ private boolean flushBucket(DataBucket bucket) {
String instant = this.writeClient.getLastPendingInstant(this.actionType);
if (instant == null) {
// in case there are empty checkpoints that has no input data
- LOG.info("No inflight instant when flushing data, cancel.");
- return;
+ LOG.info("No inflight instant when flushing data, skip.");
+ return false;
}
// if exactly-once semantics turns on,
@@ -536,6 +540,7 @@ public class StreamWriteFunction<K, I, O>
.isEndInput(false)
.build();
this.eventGateway.sendEventToCoordinator(event);
+ return true;
}
@SuppressWarnings("unchecked, rawtypes")
@@ -543,8 +548,7 @@ public class StreamWriteFunction<K, I, O>
this.currentInstant =
this.writeClient.getLastPendingInstant(this.actionType);
if (this.currentInstant == null) {
// in case there are empty checkpoints that has no input data
- LOG.info("No inflight instant when flushing data, cancel.");
- return;
+ throw new HoodieException("No inflight instant when flushing data!");
}
final List<WriteStatus> writeStatus;
if (buckets.size() > 0) {