This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 876c4e26ecf [HUDI-7578] Avoid unnecessary rewriting to improve
performance (#11028)
876c4e26ecf is described below
commit 876c4e26ecf2b710e37e826f583cbf7c5722f246
Author: Danny Chan <[email protected]>
AuthorDate: Wed Apr 17 11:31:17 2024 +0800
[HUDI-7578] Avoid unnecessary rewriting to improve performance (#11028)
---
.../src/main/java/org/apache/hudi/io/HoodieMergeHandle.java | 13 +++++--------
.../org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java | 2 +-
.../java/org/apache/hudi/io/HoodieSortedMergeHandle.java | 4 ++--
.../hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java | 2 +-
.../org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java | 2 +-
.../src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java | 4 ++++
6 files changed, 14 insertions(+), 13 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 4f5f240c4fd..095b3986e4a 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -104,7 +104,7 @@ public class HoodieMergeHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O>
protected Map<String, HoodieRecord<T>> keyToNewRecords;
protected Set<String> writtenRecordKeys;
protected HoodieFileWriter fileWriter;
- private boolean preserveMetadata = false;
+ protected boolean preserveMetadata = false;
protected Path newFilePath;
protected Path oldFilePath;
@@ -112,7 +112,6 @@ public class HoodieMergeHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O>
protected long recordsDeleted = 0;
protected long updatedRecordsWritten = 0;
protected long insertRecordsWritten = 0;
- protected boolean useWriterSchemaForCompaction;
protected Option<BaseKeyGenerator> keyGeneratorOpt;
private HoodieBaseFile baseFileToMerge;
@@ -143,7 +142,6 @@ public class HoodieMergeHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O>
HoodieBaseFile dataFileToBeMerged,
TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator>
keyGeneratorOpt) {
super(config, instantTime, partitionPath, fileId, hoodieTable,
taskContextSupplier);
this.keyToNewRecords = keyToNewRecords;
- this.useWriterSchemaForCompaction = true;
this.preserveMetadata = true;
init(fileId, this.partitionPath, dataFileToBeMerged);
validateAndSetAndKeyGenProps(keyGeneratorOpt, config.populateMetaFields());
@@ -280,7 +278,7 @@ public class HoodieMergeHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O>
}
protected void writeInsertRecord(HoodieRecord<T> newRecord) throws
IOException {
- Schema schema = useWriterSchemaForCompaction ? writeSchemaWithMetaFields :
writeSchema;
+ Schema schema = preserveMetadata ? writeSchemaWithMetaFields : writeSchema;
// just skip the ignored record
if (newRecord.shouldIgnore(schema, config.getProps())) {
return;
@@ -313,7 +311,7 @@ public class HoodieMergeHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O>
boolean decision = recordMerger.shouldFlush(combineRecord.get(),
schema, config.getProps());
if (decision) { // CASE (1): Flush the merged record.
- writeToFile(newRecord.getKey(), combineRecord.get(), schema, prop,
preserveMetadata && useWriterSchemaForCompaction);
+ writeToFile(newRecord.getKey(), combineRecord.get(), schema, prop,
preserveMetadata);
recordsWritten++;
} else { // CASE (2): A delete operation.
recordsDeleted++;
@@ -343,7 +341,7 @@ public class HoodieMergeHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O>
*/
public void write(HoodieRecord<T> oldRecord) {
Schema oldSchema = config.populateMetaFields() ? writeSchemaWithMetaFields
: writeSchema;
- Schema newSchema = useWriterSchemaForCompaction ?
writeSchemaWithMetaFields : writeSchema;
+ Schema newSchema = preserveMetadata ? writeSchemaWithMetaFields :
writeSchema;
boolean copyOldRecord = true;
String key = oldRecord.getRecordKey(oldSchema, keyGeneratorOpt);
TypedProperties props = config.getPayloadConfig().getProps();
@@ -392,8 +390,7 @@ public class HoodieMergeHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O>
// NOTE: `FILENAME_METADATA_FIELD` has to be rewritten to correctly point
to the
// file holding this record even in cases when overall metadata is
preserved
MetadataValues metadataValues = new
MetadataValues().setFileName(newFilePath.getName());
- HoodieRecord populatedRecord =
- record.prependMetaFields(schema, writeSchemaWithMetaFields,
metadataValues, prop);
+ HoodieRecord populatedRecord = record.prependMetaFields(schema,
writeSchemaWithMetaFields, metadataValues, prop);
if (shouldPreserveRecordMetadata) {
fileWriter.write(key.getRecordKey(), populatedRecord,
writeSchemaWithMetaFields);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
index f8669416f0c..fba72310513 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
@@ -99,7 +99,7 @@ public class HoodieMergeHandleWithChangeLog<T, I, K, O>
extends HoodieMergeHandl
}
protected void writeInsertRecord(HoodieRecord<T> newRecord) throws
IOException {
- Schema schema = useWriterSchemaForCompaction ? writeSchemaWithMetaFields :
writeSchema;
+ Schema schema = preserveMetadata ? writeSchemaWithMetaFields : writeSchema;
// TODO Remove these unnecessary newInstance invocations
HoodieRecord<T> savedRecord = newRecord.newInstance();
super.writeInsertRecord(newRecord);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
index 3d3a7308bb3..ee0ee914e19 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
@@ -74,7 +74,7 @@ public class HoodieSortedMergeHandle<T, I, K, O> extends
HoodieMergeHandle<T, I,
@Override
public void write(HoodieRecord oldRecord) {
Schema oldSchema = config.populateMetaFields() ? writeSchemaWithMetaFields
: writeSchema;
- Schema newSchema = useWriterSchemaForCompaction ?
writeSchemaWithMetaFields : writeSchema;
+ Schema newSchema = preserveMetadata ? writeSchemaWithMetaFields :
writeSchema;
String key = oldRecord.getRecordKey(oldSchema, keyGeneratorOpt);
// To maintain overall sorted order across updates and inserts, write any
new inserts whose keys are less than
@@ -111,7 +111,7 @@ public class HoodieSortedMergeHandle<T, I, K, O> extends
HoodieMergeHandle<T, I,
String key = newRecordKeysSorted.poll();
HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key);
if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) {
- if (useWriterSchemaForCompaction) {
+ if (preserveMetadata) {
writeRecord(hoodieRecord, Option.of(hoodieRecord),
writeSchemaWithMetaFields, config.getProps());
} else {
writeRecord(hoodieRecord, Option.of(hoodieRecord), writeSchema,
config.getProps());
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
index 666c0a8f3fd..85fb5a43504 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
@@ -83,7 +83,7 @@ public class FlinkMergeAndReplaceHandleWithChangeLog<T, I, K,
O>
}
protected void writeInsertRecord(HoodieRecord<T> newRecord) throws
IOException {
- Schema schema = useWriterSchemaForCompaction ? writeSchemaWithMetaFields :
writeSchema;
+ Schema schema = preserveMetadata ? writeSchemaWithMetaFields : writeSchema;
// TODO Remove these unnecessary newInstance invocations
HoodieRecord<T> savedRecord = newRecord.newInstance();
super.writeInsertRecord(newRecord);
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
index 7d19f454a92..92335d0965d 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
@@ -81,7 +81,7 @@ public class FlinkMergeHandleWithChangeLog<T, I, K, O>
}
protected void writeInsertRecord(HoodieRecord<T> newRecord) throws
IOException {
- Schema schema = useWriterSchemaForCompaction ? writeSchemaWithMetaFields :
writeSchema;
+ Schema schema = preserveMetadata ? writeSchemaWithMetaFields : writeSchema;
// TODO Remove these unnecessary newInstance invocations
HoodieRecord<T> savedRecord = newRecord.newInstance();
super.writeInsertRecord(newRecord);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 2172c7b1ae0..be149a046bc 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -932,6 +932,10 @@ public class HoodieAvroUtils {
if (oldRecord == null) {
return null;
}
+ if (oldAvroSchema.equals(newSchema)) {
+ // there is no need to rewrite if the schema equals.
+ return oldRecord;
+ }
// try to get real schema for union type
Schema oldSchema = getActualSchemaFromUnion(oldAvroSchema, oldRecord);
Object newRecord = rewriteRecordWithNewSchemaInternal(oldRecord,
oldSchema, newSchema, renameCols, fieldNames);