This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 33a02987b7f [HUDI-8301] Removed `DataItem` for Flink stream writing
(#12054)
33a02987b7f is described below
commit 33a02987b7fc385253dc0e0efbf112066b8cf190
Author: Geser Dugarov <[email protected]>
AuthorDate: Tue Oct 8 08:37:54 2024 +0700
[HUDI-8301] Removed `DataItem` for Flink stream writing (#12054)
---
.../org/apache/hudi/sink/StreamWriteFunction.java | 74 ++++------------------
.../org/apache/hudi/sink/TestWriteCopyOnWrite.java | 31 +++++----
2 files changed, 30 insertions(+), 75 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index be6e901f171..57fb24a413c 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -20,8 +20,6 @@ package org.apache.hudi.sink;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieAvroRecord;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordMerger;
@@ -57,7 +55,6 @@ import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Random;
import java.util.function.BiFunction;
-import java.util.stream.Collectors;
/**
* Sink function to write the data to the underneath filesystem.
@@ -175,7 +172,7 @@ public class StreamWriteFunction<I> extends
AbstractStreamWriteFunction<I> {
public Map<String, List<HoodieRecord>> getDataBuffer() {
Map<String, List<HoodieRecord>> ret = new HashMap<>();
for (Map.Entry<String, DataBucket> entry : buckets.entrySet()) {
- ret.put(entry.getKey(), entry.getValue().writeBuffer());
+ ret.put(entry.getKey(), entry.getValue().getRecords());
}
return ret;
}
@@ -215,48 +212,11 @@ public class StreamWriteFunction<I> extends
AbstractStreamWriteFunction<I> {
LOG.info("init hoodie merge with class [{}]",
recordMerger.getClass().getName());
}
- /**
- * Represents a data item in the buffer, this is needed to reduce the
- * memory footprint.
- *
- * <p>A {@link HoodieRecord} was firstly transformed into a {@link DataItem}
- * for buffering, it then transforms back to the {@link HoodieRecord} before
flushing.
- */
- private static class DataItem {
- private final String key; // record key
- private final String instant; // 'U' or 'I'
- private final HoodieRecordPayload<?> data; // record payload
- private final HoodieOperation operation; // operation
-
- private DataItem(String key, String instant, HoodieRecordPayload<?> data,
HoodieOperation operation) {
- this.key = key;
- this.instant = instant;
- this.data = data;
- this.operation = operation;
- }
-
- public static DataItem fromHoodieRecord(HoodieRecord<?> record) {
- return new DataItem(
- record.getRecordKey(),
- record.getCurrentLocation().getInstantTime(),
- ((HoodieAvroRecord) record).getData(),
- record.getOperation());
- }
-
- public HoodieRecord<?> toHoodieRecord(String partitionPath) {
- HoodieKey hoodieKey = new HoodieKey(this.key, partitionPath);
- HoodieRecord<?> record = new HoodieAvroRecord<>(hoodieKey, data,
operation);
- HoodieRecordLocation loc = new HoodieRecordLocation(instant, null);
- record.setCurrentLocation(loc);
- return record;
- }
- }
-
/**
* Data bucket.
*/
protected static class DataBucket {
- private final List<DataItem> records;
+ private final List<HoodieRecord> records;
private final BufferSizeDetector detector;
private final String partitionPath;
private final String fileID;
@@ -268,14 +228,8 @@ public class StreamWriteFunction<I> extends
AbstractStreamWriteFunction<I> {
this.fileID = hoodieRecord.getCurrentLocation().getFileId();
}
- /**
- * Prepare the write data buffer: patch up all the records with correct
partition path.
- */
- public List<HoodieRecord> writeBuffer() {
- // rewrite all the records with new record key
- return records.stream()
- .map(record -> record.toHoodieRecord(partitionPath))
- .collect(Collectors.toList());
+ public List<HoodieRecord> getRecords() {
+ return records;
}
/**
@@ -398,11 +352,9 @@ public class StreamWriteFunction<I> extends
AbstractStreamWriteFunction<I> {
DataBucket bucket = this.buckets.computeIfAbsent(bucketID,
k -> new
DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE), value));
- final DataItem item = DataItem.fromHoodieRecord(value);
-
- bucket.records.add(item);
+ bucket.records.add(value);
- boolean flushBucket = bucket.detector.detect(item);
+ boolean flushBucket = bucket.detector.detect(value);
boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
// update buffer metrics after tracing buffer size
writeMetrics.setWriteBufferedSize(this.tracer.bufferSize);
@@ -426,8 +378,8 @@ public class StreamWriteFunction<I> extends
AbstractStreamWriteFunction<I> {
}
private boolean hasData() {
- return this.buckets.size() > 0
- && this.buckets.values().stream().anyMatch(bucket ->
bucket.records.size() > 0);
+ return !this.buckets.isEmpty()
+ && this.buckets.values().stream().anyMatch(bucket ->
!bucket.records.isEmpty());
}
@SuppressWarnings("unchecked, rawtypes")
@@ -440,8 +392,8 @@ public class StreamWriteFunction<I> extends
AbstractStreamWriteFunction<I> {
return false;
}
- List<HoodieRecord> records = bucket.writeBuffer();
- ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has
no buffering records");
+ List<HoodieRecord> records = bucket.getRecords();
+ ValidationUtils.checkState(!records.isEmpty(), "Data bucket to flush has
no buffering records");
records = deduplicateRecordsIfNeeded(records);
final List<WriteStatus> writeStatus = writeBucket(instant, bucket,
records);
records.clear();
@@ -467,14 +419,14 @@ public class StreamWriteFunction<I> extends
AbstractStreamWriteFunction<I> {
throw new HoodieException("No inflight instant when flushing data!");
}
final List<WriteStatus> writeStatus;
- if (buckets.size() > 0) {
+ if (!buckets.isEmpty()) {
writeStatus = new ArrayList<>();
this.buckets.values()
// The records are partitioned by the bucket ID and each batch sent
to
// the writer belongs to one bucket.
.forEach(bucket -> {
- List<HoodieRecord> records = bucket.writeBuffer();
- if (records.size() > 0) {
+ List<HoodieRecord> records = bucket.getRecords();
+ if (!records.isEmpty()) {
records = deduplicateRecordsIfNeeded(records);
writeStatus.addAll(writeBucket(currentInstant, bucket, records));
records.clear();
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index d80b0746965..8a1278db9ab 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -53,6 +53,9 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
*/
public class TestWriteCopyOnWrite extends TestWriteBase {
+ // to trigger buffer flush of 3 rows, each is 576 bytes for INSERT and 624
bytes for UPSERT
+ private static final double BATCH_SIZE_MB = 0.0016;
+
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testCheckpoint(boolean allowEmptyCommit) throws Exception {
@@ -232,13 +235,12 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
@Test
public void testInsertWithMiniBatches() throws Exception {
// reset the config option
- conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0008); // 839 bytes batch
size
+ conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, getBatchSize());
Map<String, String> expected = getMiniBatchExpected();
preparePipeline(conf)
- // record (operation: 'I') is 304 bytes and record (operation: 'U') is
352 bytes.
- // so 3 records expect to trigger a mini-batch write
+ // 3 records from 5 should trigger a mini-batch write
.consume(TestData.DATA_SET_INSERT_DUPLICATES)
.assertDataBuffer(1, 2)
.checkpoint(1)
@@ -257,15 +259,14 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
@Test
public void testInsertWithDeduplication() throws Exception {
// reset the config option
- conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0008); // 839 bytes batch
size
+ conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, getBatchSize());
conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
Map<String, String> expected = new HashMap<>();
expected.put("par1", "[id1,par1,id1,Danny,23,4,par1]");
preparePipeline(conf)
- // record (operation: 'I') is 304 bytes and record (operation: 'U') is
352 bytes.
- // so 3 records expect to trigger a mini-batch write
+ // 3 records from 5 should trigger a mini-batch write
.consume(TestData.DATA_SET_INSERT_SAME_KEY)
.assertDataBuffer(1, 2)
.checkpoint(1)
@@ -308,13 +309,12 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
// reset the config option
conf.setString(FlinkOptions.OPERATION, "insert");
conf.setBoolean(FlinkOptions.INSERT_CLUSTER, true);
- conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0008); // 839 bytes
buffer size
+ conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0 + getBatchSize());
TestWriteMergeOnRead.TestHarness.instance()
- // record (operation: 'I') is 304 bytes and record (operation: 'U') is
352 bytes.
- // so 3 records expect to trigger a mini-batch write
- // flush the max size bucket once at a time.
.preparePipeline(tempFile, conf)
+ // 3 records from 5 should trigger a mini-batch write
+ // flush the max size bucket once at a time
.consume(TestData.DATA_SET_INSERT_SAME_KEY)
.assertDataBuffer(1, 2)
.checkpoint(1)
@@ -357,14 +357,13 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
@Test
public void testInsertWithSmallBufferSize() throws Exception {
// reset the config option
- conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0008); // 839 bytes
buffer size
+ conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200 + getBatchSize());
Map<String, String> expected = getMiniBatchExpected();
preparePipeline(conf)
- // record (operation: 'I') is 304 bytes and record (operation: 'U') is
352 bytes.
- // so 3 records expect to trigger a mini-batch write
- // flush the max size bucket once at a time.
+ // 3 records from 5 should trigger a mini-batch write
+ // flush the max size bucket once at a time
.consume(TestData.DATA_SET_INSERT_DUPLICATES)
.assertDataBuffer(1, 2)
.checkpoint(1)
@@ -418,6 +417,10 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
.end();
}
+ protected double getBatchSize() {
+ return BATCH_SIZE_MB;
+ }
+
protected Map<String, String> getMiniBatchExpected() {
Map<String, String> expected = new HashMap<>();
// the last 2 lines are merged