This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-1.1.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit ae90b950f20d47f2a0d813ae74bdea6d620cac78 Author: Y Ethan Guo <[email protected]> AuthorDate: Fri Oct 24 00:58:59 2025 +0800 perf: Reduce memory usage of writing HFile log block (#14078) --------- Co-authored-by: sivabalan <[email protected]> --- .../org/apache/hudi/io/HoodieAppendHandle.java | 2 + .../hudi/common/model/HoodieAvroIndexedRecord.java | 2 +- .../common/table/log/block/HoodieDataBlock.java | 2 +- .../org/apache/hudi/common/util/HFileUtils.java | 92 ++++++++++------------ 4 files changed, 46 insertions(+), 52 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index 5b49cc4c8f8e..1c2a07c752c6 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -73,6 +73,7 @@ import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -718,6 +719,7 @@ public class HoodieAppendHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O case HFILE_DATA_BLOCK: // Not supporting positions in HFile data blocks header.remove(HeaderMetadataType.BASE_FILE_INSTANT_TIME_OF_RECORD_POSITIONS); + records.sort(Comparator.comparing(HoodieRecord::getRecordKey)); return new HoodieHFileDataBlock( records, header, writeConfig.getHFileCompressionAlgorithm(), new StoragePath(writeConfig.getBasePath())); case PARQUET_DATA_BLOCK: diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java index 8039ee9d46ec..5f9002b1bc77 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java @@ -167,10 +167,10 @@ public class HoodieAvroIndexedRecord extends HoodieRecord<IndexedRecord> { @Override public String getRecordKey(Schema recordSchema, String keyFieldName) { - decodeRecord(recordSchema); if (key != null) { return key.getRecordKey(); } + decodeRecord(recordSchema); return Option.ofNullable(data.getSchema().getField(keyFieldName)) .map(keyField -> data.get(keyField.pos())) .map(Object::toString).orElse(null); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java index 625c678da6bc..f5327e2318b7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java @@ -62,7 +62,7 @@ public abstract class HoodieDataBlock extends HoodieLogBlock { // TODO rebase records/content to leverage Either to warrant // that they are mutex (used by read/write flows respectively) - private final Option<List<HoodieRecord>> records; + protected Option<List<HoodieRecord>> records; /** * Key field's name w/in the record's schema diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java index 465667fe3c25..cca6410579b1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java @@ -56,7 +56,6 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; -import java.util.TreeMap; import static org.apache.hudi.common.config.HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME; import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes; @@ -191,58 +190,51 @@ public class HFileUtils extends FileFormatUtils { Map<String, String> paramsMap) throws IOException { CompressionCodec compressionCodec = getHFileCompressionAlgorithm(paramsMap); ByteArrayOutputStream baos = new ByteArrayOutputStream(); - OutputStream ostream = new DataOutputStream(baos); - - // Use simple incrementing counter as a key - boolean useIntegerKey = !getRecordKey(records.get(0), readerSchema, keyFieldName).isPresent(); - // This is set here to avoid re-computing this in the loop - int keyWidth = useIntegerKey ? (int) Math.ceil(Math.log(records.size())) + 1 : -1; - - // Serialize records into bytes - Map<String, byte[]> sortedRecordsMap = new TreeMap<>(); - - Iterator<HoodieRecord> itr = records.iterator(); - int id = 0; - Option<Schema.Field> keyField = Option.ofNullable(writerSchema.getField(keyFieldName)); - while (itr.hasNext()) { - HoodieRecord<?> record = itr.next(); - String recordKey; - if (useIntegerKey) { - recordKey = String.format("%" + keyWidth + "s", id++); - } else { - recordKey = getRecordKey(record, readerSchema, keyFieldName).get(); - } + try (OutputStream ostream = new DataOutputStream(baos)) { + HFileContext context = HFileContext.builder() + .blockSize(DEFAULT_BLOCK_SIZE_FOR_LOG_FILE) + .compressionCodec(compressionCodec) + .build(); + // Use simple incrementing counter as a key + boolean useIntegerKey = !getRecordKey(records.get(0), readerSchema, keyFieldName).isPresent(); + // This is set here to avoid re-computing this in the loop + int keyWidth = useIntegerKey ? (int) Math.ceil(Math.log(records.size())) + 1 : -1; + int id = 0; + Option<Schema.Field> keyField = Option.ofNullable(writerSchema.getField(keyFieldName)); + try (HFileWriter writer = new HFileWriterImpl(context, ostream)) { + String previousRecordKey = null; + // It is assumed that the input records are sorted based on the record key + // for HFile block + for (int i = 0; i < records.size(); i++) { + HoodieRecord<?> record = records.get(i); + String recordKey = useIntegerKey + ? String.format("%" + keyWidth + "s", id++) + : getRecordKey(record, readerSchema, keyFieldName).get(); + final byte[] recordBytes = serializeRecord(record, writerSchema, keyField); + // Since the list is sorted, duplicates will be adjacent. + if (i > 0 && recordKey.equals(previousRecordKey)) { + LOG.error("Found duplicate record with recordKey: {}", recordKey); + logRecordMetadata("Previous record", + serializeRecord(records.get(i - 1), writerSchema, keyField), writerSchema); + logRecordMetadata("Current record", + serializeRecord(record, writerSchema, keyField), writerSchema); + throw new HoodieException(String.format( + "Writing multiple records with same key %s not supported for Hfile format with Metadata table", recordKey)); + } + try { + writer.append(recordKey, recordBytes); + } catch (IOException e) { + throw new HoodieIOException("IOException serializing records", e); + } + previousRecordKey = recordKey; + } - final byte[] recordBytes = serializeRecord(record, writerSchema, keyField); - if (sortedRecordsMap.containsKey(recordKey)) { - LOG.error("Found duplicate record with recordKey: {} ", recordKey); - logRecordMetadata("Previous record", sortedRecordsMap.get(recordKey), writerSchema); - logRecordMetadata("Current record", recordBytes, writerSchema); - throw new HoodieException(String.format("Writing multiple records with same key %s not supported for Hfile format with Metadata table", - recordKey)); + writer.appendFileInfo( + HoodieAvroHFileReaderImplBase.SCHEMA_KEY, + getUTF8Bytes(readerSchema.toString())); } - sortedRecordsMap.put(recordKey, recordBytes); + ostream.flush(); } - - HFileContext context = HFileContext.builder() - .blockSize(DEFAULT_BLOCK_SIZE_FOR_LOG_FILE) - .compressionCodec(compressionCodec) - .build(); - try (HFileWriter writer = new HFileWriterImpl(context, ostream)) { - sortedRecordsMap.forEach((recordKey,recordBytes) -> { - try { - writer.append(recordKey, recordBytes); - } catch (IOException e) { - throw new HoodieIOException("IOException serializing records", e); - } - }); - writer.appendFileInfo( - HoodieAvroHFileReaderImplBase.SCHEMA_KEY, - getUTF8Bytes(readerSchema.toString())); - } - - ostream.flush(); - ostream.close(); return baos; }
