This is an automated email from the ASF dual-hosted git repository.

garyli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c4a2ad2  [HUDI-1954] only reset bucket when flush bucket success 
(#3029)
c4a2ad2 is described below

commit c4a2ad2702c28fc03392a917a878570af40062db
Author: yuzhaojing <[email protected]>
AuthorDate: Sat Jun 5 11:48:08 2021 +0800

    [HUDI-1954] only reset bucket when flush bucket success (#3029)
    
    Co-authored-by: 喻兆靖 <[email protected]>
---
 .../org/apache/hudi/sink/StreamWriteFunction.java  | 26 +++++++++++++---------
 1 file changed, 15 insertions(+), 11 deletions(-)

diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java 
b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index 7679f2c..c49f7f1 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -470,30 +470,34 @@ public class StreamWriteFunction<K, I, O>
     boolean flushBucket = bucket.detector.detect(item);
     boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
     if (flushBucket) {
-      flushBucket(bucket);
-      this.tracer.countDown(bucket.detector.totalSize);
-      bucket.reset();
+      if (flushBucket(bucket)) {
+        this.tracer.countDown(bucket.detector.totalSize);
+        bucket.reset();
+      }
     } else if (flushBuffer) {
       // find the max size bucket and flush it out
       List<DataBucket> sortedBuckets = this.buckets.values().stream()
           .sorted((b1, b2) -> Long.compare(b2.detector.totalSize, 
b1.detector.totalSize))
           .collect(Collectors.toList());
       final DataBucket bucketToFlush = sortedBuckets.get(0);
-      flushBucket(bucketToFlush);
-      this.tracer.countDown(bucketToFlush.detector.totalSize);
-      bucketToFlush.reset();
+      if (flushBucket(bucketToFlush)) {
+        this.tracer.countDown(bucketToFlush.detector.totalSize);
+        bucketToFlush.reset();
+      } else {
+        LOG.warn("The buffer size hits the threshold {}, but still flush the 
max size data bucket failed!", this.tracer.maxBufferSize);
+      }
     }
     bucket.records.add(item);
   }
 
   @SuppressWarnings("unchecked, rawtypes")
-  private void flushBucket(DataBucket bucket) {
+  private boolean flushBucket(DataBucket bucket) {
     String instant = this.writeClient.getLastPendingInstant(this.actionType);
 
     if (instant == null) {
       // in case there are empty checkpoints that has no input data
-      LOG.info("No inflight instant when flushing data, cancel.");
-      return;
+      LOG.info("No inflight instant when flushing data, skip.");
+      return false;
     }
 
     // if exactly-once semantics turns on,
@@ -536,6 +540,7 @@ public class StreamWriteFunction<K, I, O>
         .isEndInput(false)
         .build();
     this.eventGateway.sendEventToCoordinator(event);
+    return true;
   }
 
   @SuppressWarnings("unchecked, rawtypes")
@@ -543,8 +548,7 @@ public class StreamWriteFunction<K, I, O>
     this.currentInstant = 
this.writeClient.getLastPendingInstant(this.actionType);
     if (this.currentInstant == null) {
       // in case there are empty checkpoints that has no input data
-      LOG.info("No inflight instant when flushing data, cancel.");
-      return;
+      throw new HoodieException("No inflight instant when flushing data!");
     }
     final List<WriteStatus> writeStatus;
     if (buckets.size() > 0) {

Reply via email to