This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-1.1.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit ae90b950f20d47f2a0d813ae74bdea6d620cac78
Author: Y Ethan Guo <[email protected]>
AuthorDate: Fri Oct 24 00:58:59 2025 +0800

    perf: Reduce memory usage of writing HFile log block (#14078)
    
    
    ---------
    
    Co-authored-by: sivabalan <[email protected]>
---
 .../org/apache/hudi/io/HoodieAppendHandle.java     |  2 +
 .../hudi/common/model/HoodieAvroIndexedRecord.java |  2 +-
 .../common/table/log/block/HoodieDataBlock.java    |  2 +-
 .../org/apache/hudi/common/util/HFileUtils.java    | 92 ++++++++++------------
 4 files changed, 46 insertions(+), 52 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 5b49cc4c8f8e..1c2a07c752c6 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -73,6 +73,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -718,6 +719,7 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
       case HFILE_DATA_BLOCK:
         // Not supporting positions in HFile data blocks
         
header.remove(HeaderMetadataType.BASE_FILE_INSTANT_TIME_OF_RECORD_POSITIONS);
+        records.sort(Comparator.comparing(HoodieRecord::getRecordKey));
         return new HoodieHFileDataBlock(
             records, header, writeConfig.getHFileCompressionAlgorithm(), new 
StoragePath(writeConfig.getBasePath()));
       case PARQUET_DATA_BLOCK:
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
index 8039ee9d46ec..5f9002b1bc77 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
@@ -167,10 +167,10 @@ public class HoodieAvroIndexedRecord extends 
HoodieRecord<IndexedRecord> {
 
   @Override
   public String getRecordKey(Schema recordSchema, String keyFieldName) {
-    decodeRecord(recordSchema);
     if (key != null) {
       return key.getRecordKey();
     }
+    decodeRecord(recordSchema);
     return Option.ofNullable(data.getSchema().getField(keyFieldName))
         .map(keyField -> data.get(keyField.pos()))
         .map(Object::toString).orElse(null);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
index 625c678da6bc..f5327e2318b7 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
@@ -62,7 +62,7 @@ public abstract class HoodieDataBlock extends HoodieLogBlock {
 
   // TODO rebase records/content to leverage Either to warrant
   //      that they are mutex (used by read/write flows respectively)
-  private final Option<List<HoodieRecord>> records;
+  protected Option<List<HoodieRecord>> records;
 
   /**
    * Key field's name w/in the record's schema
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
index 465667fe3c25..cca6410579b1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
@@ -56,7 +56,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
-import java.util.TreeMap;
 
 import static 
org.apache.hudi.common.config.HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
@@ -191,58 +190,51 @@ public class HFileUtils extends FileFormatUtils {
                                                           Map<String, String> 
paramsMap) throws IOException {
     CompressionCodec compressionCodec = 
getHFileCompressionAlgorithm(paramsMap);
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    OutputStream ostream = new DataOutputStream(baos);
-
-    // Use simple incrementing counter as a key
-    boolean useIntegerKey = !getRecordKey(records.get(0), readerSchema, 
keyFieldName).isPresent();
-    // This is set here to avoid re-computing this in the loop
-    int keyWidth = useIntegerKey ? (int) Math.ceil(Math.log(records.size())) + 
1 : -1;
-
-    // Serialize records into bytes
-    Map<String, byte[]> sortedRecordsMap = new TreeMap<>();
-
-    Iterator<HoodieRecord> itr = records.iterator();
-    int id = 0;
-    Option<Schema.Field> keyField = 
Option.ofNullable(writerSchema.getField(keyFieldName));
-    while (itr.hasNext()) {
-      HoodieRecord<?> record = itr.next();
-      String recordKey;
-      if (useIntegerKey) {
-        recordKey = String.format("%" + keyWidth + "s", id++);
-      } else {
-        recordKey = getRecordKey(record, readerSchema, keyFieldName).get();
-      }
+    try (OutputStream ostream = new DataOutputStream(baos)) {
+      HFileContext context = HFileContext.builder()
+          .blockSize(DEFAULT_BLOCK_SIZE_FOR_LOG_FILE)
+          .compressionCodec(compressionCodec)
+          .build();
+      // Use simple incrementing counter as a key
+      boolean useIntegerKey = !getRecordKey(records.get(0), readerSchema, 
keyFieldName).isPresent();
+      // This is set here to avoid re-computing this in the loop
+      int keyWidth = useIntegerKey ? (int) Math.ceil(Math.log(records.size())) 
+ 1 : -1;
+      int id = 0;
+      Option<Schema.Field> keyField = 
Option.ofNullable(writerSchema.getField(keyFieldName));
+      try (HFileWriter writer = new HFileWriterImpl(context, ostream)) {
+        String previousRecordKey = null;
+        // It is assumed that the input records are sorted based on the record 
key
+        // for HFile block
+        for (int i = 0; i < records.size(); i++) {
+          HoodieRecord<?> record = records.get(i);
+          String recordKey = useIntegerKey
+              ? String.format("%" + keyWidth + "s", id++)
+              : getRecordKey(record, readerSchema, keyFieldName).get();
+          final byte[] recordBytes = serializeRecord(record, writerSchema, 
keyField);
+          // Since the list is sorted, duplicates will be adjacent.
+          if (i > 0 && recordKey.equals(previousRecordKey)) {
+            LOG.error("Found duplicate record with recordKey: {}", recordKey);
+            logRecordMetadata("Previous record",
+                serializeRecord(records.get(i - 1), writerSchema, keyField), 
writerSchema);
+            logRecordMetadata("Current record",
+                serializeRecord(record, writerSchema, keyField), writerSchema);
+            throw new HoodieException(String.format(
+                "Writing multiple records with same key %s not supported for 
Hfile format with Metadata table", recordKey));
+          }
+          try {
+            writer.append(recordKey, recordBytes);
+          } catch (IOException e) {
+            throw new HoodieIOException("IOException serializing records", e);
+          }
+          previousRecordKey = recordKey;
+        }
 
-      final byte[] recordBytes = serializeRecord(record, writerSchema, 
keyField);
-      if (sortedRecordsMap.containsKey(recordKey)) {
-        LOG.error("Found duplicate record with recordKey: {} ", recordKey);
-        logRecordMetadata("Previous record", sortedRecordsMap.get(recordKey), 
writerSchema);
-        logRecordMetadata("Current record", recordBytes, writerSchema);
-        throw new HoodieException(String.format("Writing multiple records with 
same key %s not supported for Hfile format with Metadata table",
-            recordKey));
+        writer.appendFileInfo(
+            HoodieAvroHFileReaderImplBase.SCHEMA_KEY,
+            getUTF8Bytes(readerSchema.toString()));
       }
-      sortedRecordsMap.put(recordKey, recordBytes);
+      ostream.flush();
     }
-
-    HFileContext context = HFileContext.builder()
-        .blockSize(DEFAULT_BLOCK_SIZE_FOR_LOG_FILE)
-        .compressionCodec(compressionCodec)
-        .build();
-    try (HFileWriter writer = new HFileWriterImpl(context, ostream)) {
-      sortedRecordsMap.forEach((recordKey,recordBytes) -> {
-        try {
-          writer.append(recordKey, recordBytes);
-        } catch (IOException e) {
-          throw new HoodieIOException("IOException serializing records", e);
-        }
-      });
-      writer.appendFileInfo(
-          HoodieAvroHFileReaderImplBase.SCHEMA_KEY,
-          getUTF8Bytes(readerSchema.toString()));
-    }
-
-    ostream.flush();
-    ostream.close();
     return baos;
   }
 

Reply via email to