This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4b619f05b0a [HUDI-8370] Removed excessive `DataBucket::preWrite`
(#12104)
4b619f05b0a is described below
commit 4b619f05b0aa4d95f3e70fffdbba0c2368cdf5f1
Author: Geser Dugarov <[email protected]>
AuthorDate: Mon Oct 21 19:07:26 2024 +0700
[HUDI-8370] Removed excessive `DataBucket::preWrite` (#12104)
---
.../org/apache/hudi/sink/StreamWriteFunction.java | 47 +++++-----------------
.../ConsistentBucketStreamWriteFunction.java | 5 +--
2 files changed, 13 insertions(+), 39 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index 57fb24a413c..48dd225ecb9 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -19,11 +19,8 @@
package org.apache.hudi.sink;
import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordMerger;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.ObjectSizeCalculator;
@@ -218,33 +215,18 @@ public class StreamWriteFunction<I> extends
AbstractStreamWriteFunction<I> {
protected static class DataBucket {
private final List<HoodieRecord> records;
private final BufferSizeDetector detector;
- private final String partitionPath;
- private final String fileID;
- private DataBucket(Double batchSize, HoodieRecord<?> hoodieRecord) {
+ private DataBucket(Double batchSize) {
this.records = new ArrayList<>();
this.detector = new BufferSizeDetector(batchSize);
- this.partitionPath = hoodieRecord.getPartitionPath();
- this.fileID = hoodieRecord.getCurrentLocation().getFileId();
}
public List<HoodieRecord> getRecords() {
return records;
}
- /**
- * Sets up before flush: patch up the first record with correct partition
path and fileID.
- *
- * <p>Note: the method may modify the given records {@code records}.
- */
- public void preWrite(List<HoodieRecord> records) {
- // rewrite the first record with expected fileID
- HoodieRecord<?> first = records.get(0);
- HoodieRecord<?> record = new HoodieAvroRecord<>(first.getKey(),
(HoodieRecordPayload) first.getData(), first.getOperation());
- HoodieRecordLocation newLoc = new
HoodieRecordLocation(first.getCurrentLocation().getInstantTime(), fileID);
- record.setCurrentLocation(newLoc);
-
- records.set(0, record);
+ public boolean isEmpty() {
+ return records.isEmpty();
}
public void reset() {
@@ -351,7 +333,7 @@ public class StreamWriteFunction<I> extends
AbstractStreamWriteFunction<I> {
final String bucketID = getBucketID(value);
DataBucket bucket = this.buckets.computeIfAbsent(bucketID,
- k -> new
DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE), value));
+ k -> new
DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE)));
bucket.records.add(value);
boolean flushBucket = bucket.detector.detect(value);
@@ -392,11 +374,8 @@ public class StreamWriteFunction<I> extends
AbstractStreamWriteFunction<I> {
return false;
}
- List<HoodieRecord> records = bucket.getRecords();
- ValidationUtils.checkState(!records.isEmpty(), "Data bucket to flush has
no buffering records");
- records = deduplicateRecordsIfNeeded(records);
- final List<WriteStatus> writeStatus = writeBucket(instant, bucket,
records);
- records.clear();
+ ValidationUtils.checkState(!bucket.isEmpty(), "Data bucket to flush has no
buffering records");
+ final List<WriteStatus> writeStatus = writeRecords(instant,
bucket.getRecords());
final WriteMetadataEvent event = WriteMetadataEvent.builder()
.taskID(taskID)
.instantTime(instant) // the write instant may shift but the event
still use the currentInstant.
@@ -425,11 +404,8 @@ public class StreamWriteFunction<I> extends
AbstractStreamWriteFunction<I> {
// The records are partitioned by the bucket ID and each batch sent
to
// the writer belongs to one bucket.
.forEach(bucket -> {
- List<HoodieRecord> records = bucket.getRecords();
- if (!records.isEmpty()) {
- records = deduplicateRecordsIfNeeded(records);
- writeStatus.addAll(writeBucket(currentInstant, bucket, records));
- records.clear();
+ if (!bucket.isEmpty()) {
+ writeStatus.addAll(writeRecords(currentInstant,
bucket.getRecords()));
bucket.reset();
}
});
@@ -463,16 +439,15 @@ public class StreamWriteFunction<I> extends
AbstractStreamWriteFunction<I> {
writeMetrics.registerMetrics();
}
- protected List<WriteStatus> writeBucket(String instant, DataBucket bucket,
List<HoodieRecord> records) {
- bucket.preWrite(records);
+ protected List<WriteStatus> writeRecords(String instant, List<HoodieRecord>
records) {
writeMetrics.startFileFlush();
- List<WriteStatus> statuses = writeFunction.apply(records, instant);
+ List<WriteStatus> statuses =
writeFunction.apply(deduplicateRecordsIfNeeded(records), instant);
writeMetrics.endFileFlush();
writeMetrics.increaseNumOfFilesWritten();
return statuses;
}
- private List<HoodieRecord> deduplicateRecordsIfNeeded(List<HoodieRecord>
records) {
+ protected List<HoodieRecord> deduplicateRecordsIfNeeded(List<HoodieRecord>
records) {
if (config.getBoolean(FlinkOptions.PRE_COMBINE)) {
return FlinkWriteHelper.newInstance()
.deduplicateRecords(records, null, -1,
this.writeClient.getConfig().getSchema(),
this.writeClient.getConfig().getProps(), recordMerger);
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketStreamWriteFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketStreamWriteFunction.java
index 9372d70b68e..aa5405e2aea 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketStreamWriteFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketStreamWriteFunction.java
@@ -71,11 +71,10 @@ public class ConsistentBucketStreamWriteFunction<I> extends
StreamWriteFunction<
}
@Override
- protected List<WriteStatus> writeBucket(String instant, DataBucket bucket,
List<HoodieRecord> records) {
+ protected List<WriteStatus> writeRecords(String instant, List<HoodieRecord>
records) {
updateStrategy.initialize(this.writeClient);
- bucket.preWrite(records);
Pair<List<Pair<List<HoodieRecord>, String>>, Set<HoodieFileGroupId>>
recordListFgPair =
- updateStrategy.handleUpdate(Collections.singletonList(Pair.of(records,
instant)));
+
updateStrategy.handleUpdate(Collections.singletonList(Pair.of(deduplicateRecordsIfNeeded(records),
instant)));
return recordListFgPair.getKey().stream().flatMap(
recordsInstantPair ->
writeFunction.apply(recordsInstantPair.getLeft(),
recordsInstantPair.getRight()).stream()
).collect(Collectors.toList());