This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit f1f6f93fe0f6e88b68e51b9b5703e9ae34030a95 Author: Danny Chan <[email protected]> AuthorDate: Wed Apr 17 11:31:17 2024 +0800 [HUDI-7578] Avoid unnecessary rewriting to improve performance (#11028) --- .../src/main/java/org/apache/hudi/io/HoodieMergeHandle.java | 13 +++++-------- .../org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java | 2 +- .../java/org/apache/hudi/io/HoodieSortedMergeHandle.java | 4 ++-- .../hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java | 2 +- .../org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java | 2 +- .../src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java | 4 ++++ 6 files changed, 14 insertions(+), 13 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index e40a5585067..749b08c3e7e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -103,7 +103,7 @@ public class HoodieMergeHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O> protected Map<String, HoodieRecord<T>> keyToNewRecords; protected Set<String> writtenRecordKeys; protected HoodieFileWriter fileWriter; - private boolean preserveMetadata = false; + protected boolean preserveMetadata = false; protected Path newFilePath; protected Path oldFilePath; @@ -111,7 +111,6 @@ public class HoodieMergeHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O> protected long recordsDeleted = 0; protected long updatedRecordsWritten = 0; protected long insertRecordsWritten = 0; - protected boolean useWriterSchemaForCompaction; protected Option<BaseKeyGenerator> keyGeneratorOpt; private HoodieBaseFile baseFileToMerge; @@ -142,7 +141,6 @@ public class HoodieMergeHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O> HoodieBaseFile dataFileToBeMerged, TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> keyGeneratorOpt) { super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier); this.keyToNewRecords = keyToNewRecords; - this.useWriterSchemaForCompaction = true; this.preserveMetadata = true; init(fileId, this.partitionPath, dataFileToBeMerged); validateAndSetAndKeyGenProps(keyGeneratorOpt, config.populateMetaFields()); @@ -279,7 +277,7 @@ public class HoodieMergeHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O> } protected void writeInsertRecord(HoodieRecord<T> newRecord) throws IOException { - Schema schema = useWriterSchemaForCompaction ? writeSchemaWithMetaFields : writeSchema; + Schema schema = preserveMetadata ? writeSchemaWithMetaFields : writeSchema; // just skip the ignored record if (newRecord.shouldIgnore(schema, config.getProps())) { return; @@ -308,7 +306,7 @@ public class HoodieMergeHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O> } try { if (combineRecord.isPresent() && !combineRecord.get().isDelete(schema, config.getProps()) && !isDelete) { - writeToFile(newRecord.getKey(), combineRecord.get(), schema, prop, preserveMetadata && useWriterSchemaForCompaction); + writeToFile(newRecord.getKey(), combineRecord.get(), schema, prop, preserveMetadata); recordsWritten++; } else { recordsDeleted++; @@ -335,7 +333,7 @@ public class HoodieMergeHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O> */ public void write(HoodieRecord<T> oldRecord) { Schema oldSchema = config.populateMetaFields() ? writeSchemaWithMetaFields : writeSchema; - Schema newSchema = useWriterSchemaForCompaction ? writeSchemaWithMetaFields : writeSchema; + Schema newSchema = preserveMetadata ? writeSchemaWithMetaFields : writeSchema; boolean copyOldRecord = true; String key = oldRecord.getRecordKey(oldSchema, keyGeneratorOpt); TypedProperties props = config.getPayloadConfig().getProps(); @@ -384,8 +382,7 @@ public class HoodieMergeHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O> // NOTE: `FILENAME_METADATA_FIELD` has to be rewritten to correctly point to the // file holding this record even in cases when overall metadata is preserved MetadataValues metadataValues = new MetadataValues().setFileName(newFilePath.getName()); - HoodieRecord populatedRecord = - record.prependMetaFields(schema, writeSchemaWithMetaFields, metadataValues, prop); + HoodieRecord populatedRecord = record.prependMetaFields(schema, writeSchemaWithMetaFields, metadataValues, prop); if (shouldPreserveRecordMetadata) { fileWriter.write(key.getRecordKey(), populatedRecord, writeSchemaWithMetaFields); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java index f8669416f0c..fba72310513 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java @@ -99,7 +99,7 @@ public class HoodieMergeHandleWithChangeLog<T, I, K, O> extends HoodieMergeHandl } protected void writeInsertRecord(HoodieRecord<T> newRecord) throws IOException { - Schema schema = useWriterSchemaForCompaction ? writeSchemaWithMetaFields : writeSchema; + Schema schema = preserveMetadata ? writeSchemaWithMetaFields : writeSchema; // TODO Remove these unnecessary newInstance invocations HoodieRecord<T> savedRecord = newRecord.newInstance(); super.writeInsertRecord(newRecord); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java index 3d3a7308bb3..ee0ee914e19 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java @@ -74,7 +74,7 @@ public class HoodieSortedMergeHandle<T, I, K, O> extends HoodieMergeHandle<T, I, @Override public void write(HoodieRecord oldRecord) { Schema oldSchema = config.populateMetaFields() ? writeSchemaWithMetaFields : writeSchema; - Schema newSchema = useWriterSchemaForCompaction ? writeSchemaWithMetaFields : writeSchema; + Schema newSchema = preserveMetadata ? writeSchemaWithMetaFields : writeSchema; String key = oldRecord.getRecordKey(oldSchema, keyGeneratorOpt); // To maintain overall sorted order across updates and inserts, write any new inserts whose keys are less than @@ -111,7 +111,7 @@ public class HoodieSortedMergeHandle<T, I, K, O> extends HoodieMergeHandle<T, I, String key = newRecordKeysSorted.poll(); HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key); if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) { - if (useWriterSchemaForCompaction) { + if (preserveMetadata) { writeRecord(hoodieRecord, Option.of(hoodieRecord), writeSchemaWithMetaFields, config.getProps()); } else { writeRecord(hoodieRecord, Option.of(hoodieRecord), writeSchema, config.getProps()); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java index 666c0a8f3fd..85fb5a43504 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java @@ -83,7 +83,7 @@ public class FlinkMergeAndReplaceHandleWithChangeLog<T, I, K, O> } protected void writeInsertRecord(HoodieRecord<T> newRecord) throws IOException { - Schema schema = useWriterSchemaForCompaction ? writeSchemaWithMetaFields : writeSchema; + Schema schema = preserveMetadata ? writeSchemaWithMetaFields : writeSchema; // TODO Remove these unnecessary newInstance invocations HoodieRecord<T> savedRecord = newRecord.newInstance(); super.writeInsertRecord(newRecord); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java index 7d19f454a92..92335d0965d 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java @@ -81,7 +81,7 @@ public class FlinkMergeHandleWithChangeLog<T, I, K, O> } protected void writeInsertRecord(HoodieRecord<T> newRecord) throws IOException { - Schema schema = useWriterSchemaForCompaction ? writeSchemaWithMetaFields : writeSchema; + Schema schema = preserveMetadata ? writeSchemaWithMetaFields : writeSchema; // TODO Remove these unnecessary newInstance invocations HoodieRecord<T> savedRecord = newRecord.newInstance(); super.writeInsertRecord(newRecord); diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index 189c988dbc3..70ec37639d8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -934,6 +934,10 @@ public class HoodieAvroUtils { if (oldRecord == null) { return null; } + if (oldAvroSchema.equals(newSchema)) { + // there is no need to rewrite if the schema equals. + return oldRecord; + } // try to get real schema for union type Schema oldSchema = getActualSchemaFromUnion(oldAvroSchema, oldRecord); Object newRecord = rewriteRecordWithNewSchemaInternal(oldRecord, oldSchema, newSchema, renameCols, fieldNames);
