This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f74828f  [HUDI-1705] Flush as per data bucket for mini-batch write 
(#2695)
f74828f is described below

commit f74828fca10ca5539c1aaad2c5e99bb1ddb373c5
Author: Danny Chan <[email protected]>
AuthorDate: Fri Mar 19 16:30:54 2021 +0800

    [HUDI-1705] Flush as per data bucket for mini-batch write (#2695)
    
    Detects the buffer size for each data bucket before flushing. So that we
    avoid flushing data buckets with few records.
---
 .../org/apache/hudi/sink/StreamWriteFunction.java  | 112 +++++++++++++++------
 .../org/apache/hudi/sink/TestWriteCopyOnWrite.java |   3 +-
 .../sink/utils/StreamWriteFunctionWrapper.java     |   2 +-
 .../hudi/utils/source/ContinuousFileSource.java    |  18 ++--
 4 files changed, 88 insertions(+), 47 deletions(-)

diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java 
b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index a67cdae..53104fb 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -26,6 +26,7 @@ import 
org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.util.ObjectSizeCalculator;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.sink.event.BatchWriteSuccessEvent;
@@ -47,6 +48,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -100,9 +102,9 @@ public class StreamWriteFunction<K, I, O>
   private static final Logger LOG = 
LoggerFactory.getLogger(StreamWriteFunction.class);
 
   /**
-   * Write buffer for a checkpoint.
+   * Write buffer as buckets for a checkpoint. The key is bucket ID.
    */
-  private transient Map<String, List<HoodieRecord>> buffer;
+  private transient Map<String, DataBucket> buckets;
 
   /**
    * The buffer lock to control data buffering/flushing.
@@ -147,11 +149,6 @@ public class StreamWriteFunction<K, I, O>
   private transient OperatorEventGateway eventGateway;
 
   /**
-   * The detector that tells if to flush the data as mini-batch.
-   */
-  private transient BufferSizeDetector detector;
-
-  /**
    * Constructs a StreamingSinkFunction.
    *
    * @param config The config options
@@ -163,7 +160,6 @@ public class StreamWriteFunction<K, I, O>
   @Override
   public void open(Configuration parameters) throws IOException {
     this.taskID = getRuntimeContext().getIndexOfThisSubtask();
-    this.detector = new 
BufferSizeDetector(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE));
     initBuffer();
     initWriteClient();
     initWriteFunction();
@@ -182,7 +178,7 @@ public class StreamWriteFunction<K, I, O>
       // it would check the validity.
       this.onCheckpointing = true;
       // wait for the buffer data flush out and request a new instant
-      flushBuffer(true, false);
+      flushRemaining(false);
       // signal the task thread to start buffering
       addToBufferCondition.signal();
     } finally {
@@ -198,8 +194,7 @@ public class StreamWriteFunction<K, I, O>
       if (onCheckpointing) {
         addToBufferCondition.await();
       }
-      flushBufferOnCondition(value);
-      putDataIntoBuffer(value);
+      bufferRecord(value);
     } finally {
       bufferLock.unlock();
     }
@@ -221,7 +216,7 @@ public class StreamWriteFunction<K, I, O>
    * End input action for batch source.
    */
   public void endInput() {
-    flushBuffer(true, true);
+    flushRemaining(true);
     this.writeClient.cleanHandles();
   }
 
@@ -231,8 +226,12 @@ public class StreamWriteFunction<K, I, O>
 
   @VisibleForTesting
   @SuppressWarnings("rawtypes")
-  public Map<String, List<HoodieRecord>> getBuffer() {
-    return buffer;
+  public Map<String, List<HoodieRecord>> getDataBuffer() {
+    Map<String, List<HoodieRecord>> ret = new HashMap<>();
+    for (Map.Entry<String, DataBucket> entry : buckets.entrySet()) {
+      ret.put(entry.getKey(), entry.getValue().records);
+    }
+    return ret;
   }
 
   @VisibleForTesting
@@ -250,7 +249,7 @@ public class StreamWriteFunction<K, I, O>
   // -------------------------------------------------------------------------
 
   private void initBuffer() {
-    this.buffer = new LinkedHashMap<>();
+    this.buckets = new LinkedHashMap<>();
     this.bufferLock = new ReentrantLock();
     this.addToBufferCondition = this.bufferLock.newCondition();
   }
@@ -279,6 +278,24 @@ public class StreamWriteFunction<K, I, O>
   }
 
   /**
+   * Data bucket.
+   */
+  private static class DataBucket {
+    private final List<HoodieRecord> records;
+    private final BufferSizeDetector detector;
+
+    private DataBucket(Double batchSize) {
+      this.records = new ArrayList<>();
+      this.detector = new BufferSizeDetector(batchSize);
+    }
+
+    public void reset() {
+      this.records.clear();
+      this.detector.reset();
+    }
+  }
+
+  /**
    * Tool to detect if to flush out the existing buffer.
    * Sampling the record to compute the size with 0.01 percentage.
    */
@@ -314,32 +331,62 @@ public class StreamWriteFunction<K, I, O>
     }
   }
 
-  private void putDataIntoBuffer(I value) {
+  /**
+   * Returns the bucket ID with the given value {@code value}.
+   */
+  private String getBucketID(I value) {
     HoodieRecord<?> record = (HoodieRecord<?>) value;
     final String fileId = record.getCurrentLocation().getFileId();
-    final String key = 
StreamerUtil.generateBucketKey(record.getPartitionPath(), fileId);
-    if (!this.buffer.containsKey(key)) {
-      this.buffer.put(key, new ArrayList<>());
-    }
-    this.buffer.get(key).add(record);
+    return StreamerUtil.generateBucketKey(record.getPartitionPath(), fileId);
   }
 
   /**
-   * Flush the data buffer if the buffer size is greater than
+   * Buffers the given record.
+   *
+   * <p>Flush the data bucket first if the bucket records size is greater than
    * the configured value {@link FlinkOptions#WRITE_BATCH_SIZE}.
    *
    * @param value HoodieRecord
    */
-  private void flushBufferOnCondition(I value) {
-    boolean needFlush = this.detector.detect(value);
+  private void bufferRecord(I value) {
+    final String bucketID = getBucketID(value);
+
+    DataBucket bucket = this.buckets.computeIfAbsent(bucketID,
+        k -> new 
DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE)));
+    boolean needFlush = bucket.detector.detect(value);
     if (needFlush) {
-      flushBuffer(false, false);
-      this.detector.reset();
+      flushBucket(bucket);
+      bucket.reset();
     }
+    bucket.records.add((HoodieRecord<?>) value);
+  }
+
+  @SuppressWarnings("unchecked, rawtypes")
+  private void flushBucket(DataBucket bucket) {
+    this.currentInstant = 
this.writeClient.getInflightAndRequestedInstant(this.config.get(FlinkOptions.TABLE_TYPE));
+    if (this.currentInstant == null) {
+      // in case there are empty checkpoints that has no input data
+      LOG.info("No inflight instant when flushing data, cancel.");
+      return;
+    }
+    List<HoodieRecord> records = bucket.records;
+    ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has 
no buffering records");
+    if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) {
+      records = FlinkWriteHelper.newInstance().deduplicateRecords(records, 
(HoodieIndex) null, -1);
+    }
+    final List<WriteStatus> writeStatus = new 
ArrayList<>(writeFunction.apply(records, currentInstant));
+    final BatchWriteSuccessEvent event = BatchWriteSuccessEvent.builder()
+        .taskID(taskID)
+        .instantTime(currentInstant)
+        .writeStatus(writeStatus)
+        .isLastBatch(false)
+        .isEndInput(false)
+        .build();
+    this.eventGateway.sendEventToCoordinator(event);
   }
 
   @SuppressWarnings("unchecked, rawtypes")
-  private void flushBuffer(boolean isLastBatch, boolean isEndInput) {
+  private void flushRemaining(boolean isEndInput) {
     this.currentInstant = 
this.writeClient.getInflightAndRequestedInstant(this.config.get(FlinkOptions.TABLE_TYPE));
     if (this.currentInstant == null) {
       // in case there are empty checkpoints that has no input data
@@ -347,12 +394,13 @@ public class StreamWriteFunction<K, I, O>
       return;
     }
     final List<WriteStatus> writeStatus;
-    if (buffer.size() > 0) {
+    if (buckets.size() > 0) {
       writeStatus = new ArrayList<>();
-      this.buffer.values()
+      this.buckets.values()
           // The records are partitioned by the bucket ID and each batch sent 
to
           // the writer belongs to one bucket.
-          .forEach(records -> {
+          .forEach(bucket -> {
+            List<HoodieRecord> records = bucket.records;
             if (records.size() > 0) {
               if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) {
                 records = 
FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, 
-1);
@@ -368,11 +416,11 @@ public class StreamWriteFunction<K, I, O>
         .taskID(taskID)
         .instantTime(currentInstant)
         .writeStatus(writeStatus)
-        .isLastBatch(isLastBatch)
+        .isLastBatch(true)
         .isEndInput(isEndInput)
         .build();
     this.eventGateway.sendEventToCoordinator(event);
-    this.buffer.clear();
+    this.buckets.clear();
     this.currentInstant = "";
   }
 }
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java 
b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index 1167779..a98bd3d 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -399,6 +399,7 @@ public class TestWriteCopyOnWrite {
 
     // this triggers the data write and event send
     funcWrapper.checkpointFunction(1);
+    dataBuffer = funcWrapper.getDataBuffer();
     assertThat("All data should be flushed out", dataBuffer.size(), is(0));
 
     final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the 
first event first
@@ -430,10 +431,8 @@ public class TestWriteCopyOnWrite {
 
     final OperatorEvent event3 = funcWrapper.getNextEvent(); // remove the 
first event first
     final OperatorEvent event4 = funcWrapper.getNextEvent();
-    final OperatorEvent event5 = funcWrapper.getNextEvent();
     funcWrapper.getCoordinator().handleEventFromOperator(0, event3);
     funcWrapper.getCoordinator().handleEventFromOperator(0, event4);
-    funcWrapper.getCoordinator().handleEventFromOperator(0, event5);
     funcWrapper.checkpointComplete(2);
 
     // Same the original base file content.
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
 
b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index b80ac72..783f785 100644
--- 
a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++ 
b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -137,7 +137,7 @@ public class StreamWriteFunctionWrapper<I> {
   }
 
   public Map<String, List<HoodieRecord>> getDataBuffer() {
-    return this.writeFunction.getBuffer();
+    return this.writeFunction.getDataBuffer();
   }
 
   @SuppressWarnings("rawtypes")
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java
 
b/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java
index 2f759b8..9924956 100644
--- 
a/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java
+++ 
b/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java
@@ -33,11 +33,11 @@ import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
 
-import java.io.BufferedReader;
-import java.io.FileReader;
+import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -151,15 +151,9 @@ public class ContinuousFileSource implements 
StreamTableSource<RowData> {
     }
 
     private void loadDataBuffer() {
-      this.dataBuffer = new ArrayList<>();
-      try (BufferedReader reader =
-               new BufferedReader(new FileReader(this.path.toString()))) {
-        String line = reader.readLine();
-        while (line != null) {
-          this.dataBuffer.add(line);
-          // read next line
-          line = reader.readLine();
-        }
+      try {
+        new File(this.path.toString()).exists();
+        this.dataBuffer = Files.readAllLines(Paths.get(this.path.toUri()));
       } catch (IOException e) {
         throw new RuntimeException("Read file " + this.path + " error", e);
       }

Reply via email to