This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f74828f [HUDI-1705] Flush as per data bucket for mini-batch write
(#2695)
f74828f is described below
commit f74828fca10ca5539c1aaad2c5e99bb1ddb373c5
Author: Danny Chan <[email protected]>
AuthorDate: Fri Mar 19 16:30:54 2021 +0800
[HUDI-1705] Flush as per data bucket for mini-batch write (#2695)
Detects the buffer size for each data bucket before flushing. So that we
avoid flushing data buckets with few records.
---
.../org/apache/hudi/sink/StreamWriteFunction.java | 112 +++++++++++++++------
.../org/apache/hudi/sink/TestWriteCopyOnWrite.java | 3 +-
.../sink/utils/StreamWriteFunctionWrapper.java | 2 +-
.../hudi/utils/source/ContinuousFileSource.java | 18 ++--
4 files changed, 88 insertions(+), 47 deletions(-)
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index a67cdae..53104fb 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -26,6 +26,7 @@ import
org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.ObjectSizeCalculator;
+import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.sink.event.BatchWriteSuccessEvent;
@@ -47,6 +48,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -100,9 +102,9 @@ public class StreamWriteFunction<K, I, O>
private static final Logger LOG =
LoggerFactory.getLogger(StreamWriteFunction.class);
/**
- * Write buffer for a checkpoint.
+ * Write buffer as buckets for a checkpoint. The key is bucket ID.
*/
- private transient Map<String, List<HoodieRecord>> buffer;
+ private transient Map<String, DataBucket> buckets;
/**
* The buffer lock to control data buffering/flushing.
@@ -147,11 +149,6 @@ public class StreamWriteFunction<K, I, O>
private transient OperatorEventGateway eventGateway;
/**
- * The detector that tells if to flush the data as mini-batch.
- */
- private transient BufferSizeDetector detector;
-
- /**
* Constructs a StreamingSinkFunction.
*
* @param config The config options
@@ -163,7 +160,6 @@ public class StreamWriteFunction<K, I, O>
@Override
public void open(Configuration parameters) throws IOException {
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
- this.detector = new
BufferSizeDetector(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE));
initBuffer();
initWriteClient();
initWriteFunction();
@@ -182,7 +178,7 @@ public class StreamWriteFunction<K, I, O>
// it would check the validity.
this.onCheckpointing = true;
// wait for the buffer data flush out and request a new instant
- flushBuffer(true, false);
+ flushRemaining(false);
// signal the task thread to start buffering
addToBufferCondition.signal();
} finally {
@@ -198,8 +194,7 @@ public class StreamWriteFunction<K, I, O>
if (onCheckpointing) {
addToBufferCondition.await();
}
- flushBufferOnCondition(value);
- putDataIntoBuffer(value);
+ bufferRecord(value);
} finally {
bufferLock.unlock();
}
@@ -221,7 +216,7 @@ public class StreamWriteFunction<K, I, O>
* End input action for batch source.
*/
public void endInput() {
- flushBuffer(true, true);
+ flushRemaining(true);
this.writeClient.cleanHandles();
}
@@ -231,8 +226,12 @@ public class StreamWriteFunction<K, I, O>
@VisibleForTesting
@SuppressWarnings("rawtypes")
- public Map<String, List<HoodieRecord>> getBuffer() {
- return buffer;
+ public Map<String, List<HoodieRecord>> getDataBuffer() {
+ Map<String, List<HoodieRecord>> ret = new HashMap<>();
+ for (Map.Entry<String, DataBucket> entry : buckets.entrySet()) {
+ ret.put(entry.getKey(), entry.getValue().records);
+ }
+ return ret;
}
@VisibleForTesting
@@ -250,7 +249,7 @@ public class StreamWriteFunction<K, I, O>
// -------------------------------------------------------------------------
private void initBuffer() {
- this.buffer = new LinkedHashMap<>();
+ this.buckets = new LinkedHashMap<>();
this.bufferLock = new ReentrantLock();
this.addToBufferCondition = this.bufferLock.newCondition();
}
@@ -279,6 +278,24 @@ public class StreamWriteFunction<K, I, O>
}
/**
+ * Data bucket.
+ */
+ private static class DataBucket {
+ private final List<HoodieRecord> records;
+ private final BufferSizeDetector detector;
+
+ private DataBucket(Double batchSize) {
+ this.records = new ArrayList<>();
+ this.detector = new BufferSizeDetector(batchSize);
+ }
+
+ public void reset() {
+ this.records.clear();
+ this.detector.reset();
+ }
+ }
+
+ /**
* Tool to detect if to flush out the existing buffer.
* Sampling the record to compute the size with 0.01 percentage.
*/
@@ -314,32 +331,62 @@ public class StreamWriteFunction<K, I, O>
}
}
- private void putDataIntoBuffer(I value) {
+ /**
+ * Returns the bucket ID with the given value {@code value}.
+ */
+ private String getBucketID(I value) {
HoodieRecord<?> record = (HoodieRecord<?>) value;
final String fileId = record.getCurrentLocation().getFileId();
- final String key =
StreamerUtil.generateBucketKey(record.getPartitionPath(), fileId);
- if (!this.buffer.containsKey(key)) {
- this.buffer.put(key, new ArrayList<>());
- }
- this.buffer.get(key).add(record);
+ return StreamerUtil.generateBucketKey(record.getPartitionPath(), fileId);
}
/**
- * Flush the data buffer if the buffer size is greater than
+ * Buffers the given record.
+ *
+ * <p>Flush the data bucket first if the bucket records size is greater than
* the configured value {@link FlinkOptions#WRITE_BATCH_SIZE}.
*
* @param value HoodieRecord
*/
- private void flushBufferOnCondition(I value) {
- boolean needFlush = this.detector.detect(value);
+ private void bufferRecord(I value) {
+ final String bucketID = getBucketID(value);
+
+ DataBucket bucket = this.buckets.computeIfAbsent(bucketID,
+ k -> new
DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE)));
+ boolean needFlush = bucket.detector.detect(value);
if (needFlush) {
- flushBuffer(false, false);
- this.detector.reset();
+ flushBucket(bucket);
+ bucket.reset();
}
+ bucket.records.add((HoodieRecord<?>) value);
+ }
+
+ @SuppressWarnings("unchecked, rawtypes")
+ private void flushBucket(DataBucket bucket) {
+ this.currentInstant =
this.writeClient.getInflightAndRequestedInstant(this.config.get(FlinkOptions.TABLE_TYPE));
+ if (this.currentInstant == null) {
+ // in case there are empty checkpoints that has no input data
+ LOG.info("No inflight instant when flushing data, cancel.");
+ return;
+ }
+ List<HoodieRecord> records = bucket.records;
+ ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has
no buffering records");
+ if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) {
+ records = FlinkWriteHelper.newInstance().deduplicateRecords(records,
(HoodieIndex) null, -1);
+ }
+ final List<WriteStatus> writeStatus = new
ArrayList<>(writeFunction.apply(records, currentInstant));
+ final BatchWriteSuccessEvent event = BatchWriteSuccessEvent.builder()
+ .taskID(taskID)
+ .instantTime(currentInstant)
+ .writeStatus(writeStatus)
+ .isLastBatch(false)
+ .isEndInput(false)
+ .build();
+ this.eventGateway.sendEventToCoordinator(event);
}
@SuppressWarnings("unchecked, rawtypes")
- private void flushBuffer(boolean isLastBatch, boolean isEndInput) {
+ private void flushRemaining(boolean isEndInput) {
this.currentInstant =
this.writeClient.getInflightAndRequestedInstant(this.config.get(FlinkOptions.TABLE_TYPE));
if (this.currentInstant == null) {
// in case there are empty checkpoints that has no input data
@@ -347,12 +394,13 @@ public class StreamWriteFunction<K, I, O>
return;
}
final List<WriteStatus> writeStatus;
- if (buffer.size() > 0) {
+ if (buckets.size() > 0) {
writeStatus = new ArrayList<>();
- this.buffer.values()
+ this.buckets.values()
// The records are partitioned by the bucket ID and each batch sent
to
// the writer belongs to one bucket.
- .forEach(records -> {
+ .forEach(bucket -> {
+ List<HoodieRecord> records = bucket.records;
if (records.size() > 0) {
if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) {
records =
FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null,
-1);
@@ -368,11 +416,11 @@ public class StreamWriteFunction<K, I, O>
.taskID(taskID)
.instantTime(currentInstant)
.writeStatus(writeStatus)
- .isLastBatch(isLastBatch)
+ .isLastBatch(true)
.isEndInput(isEndInput)
.build();
this.eventGateway.sendEventToCoordinator(event);
- this.buffer.clear();
+ this.buckets.clear();
this.currentInstant = "";
}
}
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index 1167779..a98bd3d 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -399,6 +399,7 @@ public class TestWriteCopyOnWrite {
// this triggers the data write and event send
funcWrapper.checkpointFunction(1);
+ dataBuffer = funcWrapper.getDataBuffer();
assertThat("All data should be flushed out", dataBuffer.size(), is(0));
final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the
first event first
@@ -430,10 +431,8 @@ public class TestWriteCopyOnWrite {
final OperatorEvent event3 = funcWrapper.getNextEvent(); // remove the
first event first
final OperatorEvent event4 = funcWrapper.getNextEvent();
- final OperatorEvent event5 = funcWrapper.getNextEvent();
funcWrapper.getCoordinator().handleEventFromOperator(0, event3);
funcWrapper.getCoordinator().handleEventFromOperator(0, event4);
- funcWrapper.getCoordinator().handleEventFromOperator(0, event5);
funcWrapper.checkpointComplete(2);
// Same the original base file content.
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index b80ac72..783f785 100644
---
a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++
b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -137,7 +137,7 @@ public class StreamWriteFunctionWrapper<I> {
}
public Map<String, List<HoodieRecord>> getDataBuffer() {
- return this.writeFunction.getBuffer();
+ return this.writeFunction.getDataBuffer();
}
@SuppressWarnings("rawtypes")
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java
b/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java
index 2f759b8..9924956 100644
---
a/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java
+++
b/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java
@@ -33,11 +33,11 @@ import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
-import java.io.BufferedReader;
-import java.io.FileReader;
+import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
@@ -151,15 +151,9 @@ public class ContinuousFileSource implements
StreamTableSource<RowData> {
}
private void loadDataBuffer() {
- this.dataBuffer = new ArrayList<>();
- try (BufferedReader reader =
- new BufferedReader(new FileReader(this.path.toString()))) {
- String line = reader.readLine();
- while (line != null) {
- this.dataBuffer.add(line);
- // read next line
- line = reader.readLine();
- }
+ try {
+ new File(this.path.toString()).exists();
+ this.dataBuffer = Files.readAllLines(Paths.get(this.path.toUri()));
} catch (IOException e) {
throw new RuntimeException("Read file " + this.path + " error", e);
}