This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1b27259 [HUDI-1844] Add option to flush when total buckets memory
exceeds the threshold (#2877)
1b27259 is described below
commit 1b27259b530225e3c76fda684f888cad78045d3c
Author: Danny Chan <[email protected]>
AuthorDate: Sun Apr 25 23:06:53 2021 +0800
[HUDI-1844] Add option to flush when total buckets memory exceeds the
threshold (#2877)
Current code supports flushing as per-bucket memory usage, while the
buckets may still take too much memory for bootstrap from history data.
When the threshold hits, flush out half of the buckets with bigger
buffer size.
---
.../apache/hudi/configuration/FlinkOptions.java | 12 ++-
.../org/apache/hudi/sink/StreamWriteFunction.java | 86 +++++++++++++++++++---
.../org/apache/hudi/sink/TestWriteCopyOnWrite.java | 64 +++++++++++++++-
.../apache/hudi/table/HoodieDataSourceITCase.java | 2 +-
4 files changed, 147 insertions(+), 17 deletions(-)
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 3a942af..de9b950 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -267,11 +267,17 @@ public class FlinkOptions {
.defaultValue(4)
.withDescription("Parallelism of tasks that do actual write, default is
4");
- public static final ConfigOption<Double> WRITE_BATCH_SIZE = ConfigOptions
- .key("write.batch.size.MB")
+ public static final ConfigOption<Double> WRITE_BUFFER_SIZE = ConfigOptions
+ .key("write.buffer.size.MB")
+ .doubleType()
+ .defaultValue(256D) // 256MB
+ .withDescription("Total buffer size in MB to flush data into the
underneath filesystem, default 256MB");
+
+ public static final ConfigOption<Double> WRITE_BUCKET_SIZE = ConfigOptions
+ .key("write.bucket.size.MB")
.doubleType()
.defaultValue(64D) // 64MB
- .withDescription("Batch buffer size in MB to flush data into the
underneath filesystem, default 64MB");
+ .withDescription("Bucket size in MB to flush data into the underneath
filesystem, default 64MB");
public static final ConfigOption<Integer> WRITE_LOG_BLOCK_SIZE =
ConfigOptions
.key("write.log_block.size.MB")
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index 8244226..36bd0ed 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -41,18 +41,21 @@ import
org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
+import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.function.BiFunction;
+import java.util.stream.Collectors;
/**
* Sink function to write the data to the underneath filesystem.
@@ -60,7 +63,7 @@ import java.util.function.BiFunction;
* <p><h2>Work Flow</h2>
*
* <p>The function firstly buffers the data as a batch of {@link
HoodieRecord}s,
- * It flushes(write) the records batch when a batch exceeds the configured
size {@link FlinkOptions#WRITE_BATCH_SIZE}
+ * It flushes(write) the records batch when a batch exceeds the configured
size {@link FlinkOptions#WRITE_BUCKET_SIZE}
* or a Flink checkpoint starts. After a batch has been written successfully,
* the function notifies its operator coordinator {@link
StreamWriteOperatorCoordinator} to mark a successful write.
*
@@ -99,6 +102,11 @@ public class StreamWriteFunction<K, I, O>
private static final Logger LOG =
LoggerFactory.getLogger(StreamWriteFunction.class);
/**
+ * Write buffer size detector.
+ */
+ private transient BufferSizeDetector detector;
+
+ /**
* Write buffer as buckets for a checkpoint. The key is bucket ID.
*/
private transient Map<String, DataBucket> buckets;
@@ -223,6 +231,7 @@ public class StreamWriteFunction<K, I, O>
// -------------------------------------------------------------------------
private void initBuffer() {
+ this.detector = new
BufferSizeDetector(this.config.getDouble(FlinkOptions.WRITE_BUFFER_SIZE));
this.buckets = new LinkedHashMap<>();
}
@@ -249,18 +258,49 @@ public class StreamWriteFunction<K, I, O>
/**
* Data bucket.
*/
- private static class DataBucket {
+ private static class DataBucket implements Comparable<DataBucket> {
private final List<HoodieRecord> records;
- private final BufferSizeDetector detector;
+ private final BucketSizeTracer tracer;
private DataBucket(Double batchSize) {
this.records = new ArrayList<>();
- this.detector = new BufferSizeDetector(batchSize);
+ this.tracer = new BucketSizeTracer(batchSize);
}
public void reset() {
this.records.clear();
- this.detector.reset();
+ this.tracer.reset();
+ }
+
+ @Override
+ public int compareTo(@NotNull DataBucket other) {
+ return Double.compare(tracer.threshold, other.tracer.threshold);
+ }
+ }
+
+ /**
+ * Tool to detect if to flush out the existing bucket.
+ */
+ private static class BucketSizeTracer {
+ private final double threshold;
+
+ private long totalSize = 0L;
+
+ BucketSizeTracer(double bucketSizeMb) {
+ this.threshold = bucketSizeMb * 1024 * 1024;
+ }
+
+ /**
+ * Trace the bucket size with given record size,
+ * returns true if the bucket size exceeds specified threshold.
+ */
+ boolean trace(long recordSize) {
+ totalSize += recordSize;
+ return totalSize > this.threshold;
+ }
+
+ void reset() {
+ this.totalSize = 0L;
}
}
@@ -272,13 +312,13 @@ public class StreamWriteFunction<K, I, O>
private final Random random = new Random(47);
private static final int DENOMINATOR = 100;
- private final double batchSizeBytes;
+ private final double threshold;
private long lastRecordSize = -1L;
private long totalSize = 0L;
BufferSizeDetector(double batchSizeMb) {
- this.batchSizeBytes = batchSizeMb * 1024 * 1024;
+ this.threshold = batchSizeMb * 1024 * 1024;
}
boolean detect(Object record) {
@@ -286,7 +326,7 @@ public class StreamWriteFunction<K, I, O>
lastRecordSize = ObjectSizeCalculator.getObjectSize(record);
}
totalSize += lastRecordSize;
- return totalSize > this.batchSizeBytes;
+ return totalSize > this.threshold;
}
boolean sampling() {
@@ -298,6 +338,10 @@ public class StreamWriteFunction<K, I, O>
this.lastRecordSize = -1L;
this.totalSize = 0L;
}
+
+ public void countDown(long bucketSize) {
+ this.totalSize -= bucketSize;
+ }
}
/**
@@ -313,17 +357,34 @@ public class StreamWriteFunction<K, I, O>
* Buffers the given record.
*
* <p>Flush the data bucket first if the bucket records size is greater than
- * the configured value {@link FlinkOptions#WRITE_BATCH_SIZE}.
+ * the configured value {@link FlinkOptions#WRITE_BUCKET_SIZE}.
*
* @param value HoodieRecord
*/
private void bufferRecord(I value) {
+ boolean flushBuffer = detector.detect(value);
+ if (flushBuffer) {
+ List<DataBucket> sortedBuckets = this.buckets.values().stream()
+ .sorted(Comparator.comparingDouble(b -> b.tracer.totalSize))
+ .collect(Collectors.toList());
+ // flush half number of buckets to avoid flushing too small buckets
+ // which cause small files.
+ int numBucketsToFlush = (sortedBuckets.size() + 1) / 2;
+ LOG.info("Flush {} data buckets because the total buffer size [{} bytes]
exceeds the threshold [{} bytes]",
+ numBucketsToFlush, detector.totalSize, detector.threshold);
+ for (int i = 0; i < numBucketsToFlush; i++) {
+ DataBucket bucket = sortedBuckets.get(i);
+ flushBucket(bucket);
+ detector.countDown(bucket.tracer.totalSize);
+ bucket.reset();
+ }
+ }
final String bucketID = getBucketID(value);
DataBucket bucket = this.buckets.computeIfAbsent(bucketID,
- k -> new
DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE)));
- boolean needFlush = bucket.detector.detect(value);
- if (needFlush) {
+ k -> new
DataBucket(this.config.getDouble(FlinkOptions.WRITE_BUCKET_SIZE)));
+ boolean flushBucket = bucket.tracer.trace(detector.lastRecordSize);
+ if (flushBucket) {
flushBucket(bucket);
bucket.reset();
}
@@ -390,6 +451,7 @@ public class StreamWriteFunction<K, I, O>
.build();
this.eventGateway.sendEventToCoordinator(event);
this.buckets.clear();
+ this.detector.reset();
this.currentInstant = "";
}
}
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index 9e417e3..3d1461b 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -378,7 +378,7 @@ public class TestWriteCopyOnWrite {
@Test
public void testInsertWithMiniBatches() throws Exception {
// reset the config option
- conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.001); // 1Kb batch size
+ conf.setDouble(FlinkOptions.WRITE_BUCKET_SIZE, 0.001); // 1Kb batch size
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(),
conf);
// open the function and ingest data
@@ -436,6 +436,68 @@ public class TestWriteCopyOnWrite {
checkWrittenData(tempFile, expected, 1);
}
+ @Test
+ public void testInsertWithSmallBuffer() throws Exception {
+ // reset the config option
+ conf.setDouble(FlinkOptions.WRITE_BUFFER_SIZE, 0.001); // 1Kb buffer size
+ funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(),
conf);
+
+ // open the function and ingest data
+ funcWrapper.openFunction();
+ // each record is 424 bytes. so 3 records expect to trigger buffer flush:
+ // flush half of the buckets once at a time.
+ for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
+ funcWrapper.invoke(rowData);
+ }
+
+ Map<String, List<HoodieRecord>> dataBuffer = funcWrapper.getDataBuffer();
+ assertThat("Should have 1 data bucket", dataBuffer.size(), is(1));
+ assertThat("4 records expect to flush out as a mini-batch",
+ dataBuffer.values().stream().findFirst().map(List::size).orElse(-1),
+ is(1));
+
+ // this triggers the data write and event send
+ funcWrapper.checkpointFunction(1);
+ dataBuffer = funcWrapper.getDataBuffer();
+ assertThat("All data should be flushed out", dataBuffer.size(), is(0));
+
+ for (int i = 0; i < 3; i++) {
+ final OperatorEvent event = funcWrapper.getNextEvent(); // remove the
first event first
+ assertThat("The operator expect to send an event", event,
instanceOf(BatchWriteSuccessEvent.class));
+ funcWrapper.getCoordinator().handleEventFromOperator(0, event);
+ }
+ assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the
event");
+
+ String instant = funcWrapper.getWriteClient()
+ .getLastPendingInstant(getTableType());
+
+ funcWrapper.checkpointComplete(1);
+
+ Map<String, String> expected = getMiniBatchExpected();
+ checkWrittenData(tempFile, expected, 1);
+
+ // started a new instant already
+ checkInflightInstant(funcWrapper.getWriteClient());
+ checkInstantState(funcWrapper.getWriteClient(),
HoodieInstant.State.COMPLETED, instant);
+
+ // insert duplicates again
+ for (RowData rowData : TestData.DATA_SET_INSERT_DUPLICATES) {
+ funcWrapper.invoke(rowData);
+ }
+
+ funcWrapper.checkpointFunction(2);
+
+ for (int i = 0; i < 3; i++) {
+ final OperatorEvent event = funcWrapper.getNextEvent(); // remove the
first event first
+ funcWrapper.getCoordinator().handleEventFromOperator(0, event);
+ }
+
+ funcWrapper.checkpointComplete(2);
+
+ // Same the original base file content.
+ checkWrittenData(tempFile, expected, 1);
+ }
+
Map<String, String> getMiniBatchExpected() {
Map<String, String> expected = new HashMap<>();
expected.put("par1", "[id1,par1,id1,Danny,23,1,par1, "
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
index fe652c5..2f2dcb2 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
@@ -330,7 +330,7 @@ public class HoodieDataSourceITCase extends
AbstractTestBase {
TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv :
streamTableEnv;
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
- options.put(FlinkOptions.WRITE_BATCH_SIZE.key(), "0.001");
+ options.put(FlinkOptions.WRITE_BUCKET_SIZE.key(), "0.001");
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1",
options);
tableEnv.executeSql(hoodieTableDDL);