This is an automated email from the ASF dual-hosted git repository. vhs pushed a commit to branch release-1.0.2 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 0e05e3a1c7618b3f8ede81331c16c328b1abe346 Author: Danny Chan <[email protected]> AuthorDate: Fri Apr 11 07:32:53 2025 +0800 [HUDI-9267] Fix the file group reader log file read sequence (#13115) * [HUDI-9267] Fix the file group reader log file read sequence Fix the file group reader log file sequence to be in asending order, so that to keep the "processing_time" merging semantics for streaming scenarios: always choose the latest incoming if the ordering val are equals. This semantics works now for both `COMMIT_TIME` and `EVENT_TIME` merging modes after the fix. Also fix some other issues: * the unnecessary copy of rows for position based merging; * the event time merging sequence for CUSTOM merger. * the HoodieEmptyRecord default ordering value * the fallback strategy read for position based merging --------- Co-authored-by: sivabalan <[email protected]> (cherry picked from commit 0eac5551b91e8ac0fa8cdf7268677d277ceb4273) --- .../hudi/common/model/HoodieEmptyRecord.java | 5 +- .../table/log/BaseHoodieLogRecordReader.java | 4 +- .../common/table/read/FileGroupRecordBuffer.java | 106 ++++++++++++++------- .../read/PositionBasedFileGroupRecordBuffer.java | 22 +++-- .../hudi/common/table/read/TestCustomMerger.java | 3 +- .../TestPositionBasedFileGroupRecordBuffer.java | 6 +- .../read/TestHoodieFileGroupReaderOnSpark.scala | 2 +- 7 files changed, 102 insertions(+), 46 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java index bbe55171682..8183bde74d2 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java @@ -39,7 +39,10 @@ public class HoodieEmptyRecord<T> extends HoodieRecord<T> { public HoodieEmptyRecord(HoodieKey key, HoodieRecordType type) { super(key, null); this.type = type; - this.orderingVal = null; + // IMPORTANT: + // This should be kept in line with EmptyHoodieRecordPayload + // default natural order + this.orderingVal = 0; } public HoodieEmptyRecord(HoodieKey key, HoodieOperation operation, Comparable<?> orderingVal, HoodieRecordType type) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java index 71d4cd2842c..fe92f895843 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java @@ -223,13 +223,13 @@ public abstract class BaseHoodieLogRecordReader<T> { totalCorruptBlocks = new AtomicLong(0); totalLogBlocks = new AtomicLong(0); totalLogRecords = new AtomicLong(0); - HoodieLogFormatReverseReader logFormatReaderWrapper = null; + HoodieLogFormatReader logFormatReaderWrapper = null; HoodieTimeline commitsTimeline = this.hoodieTableMetaClient.getCommitsTimeline(); HoodieTimeline completedInstantsTimeline = commitsTimeline.filterCompletedInstants(); HoodieTimeline inflightInstantsTimeline = commitsTimeline.filterInflights(); try { // Iterate over the paths - logFormatReaderWrapper = new HoodieLogFormatReverseReader(storage, + logFormatReaderWrapper = new HoodieLogFormatReader(storage, logFilePaths.stream().map(logFile -> new HoodieLogFile(new StoragePath(logFile))).collect(Collectors.toList()), readerSchema, reverseReader, bufferSize, shouldLookupRecords(), recordKeyField, internalSchema); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java index 15c22883716..a5e7730534d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java @@ -233,13 +233,14 @@ public abstract class FileGroupRecordBuffer<T> implements HoodieFileGroupRecordB /** * Merge two log data records if needed. * - * @param record - * @param metadata - * @param existingRecordMetadataPair - * @return - * @throws IOException + * @param newRecord The new incoming record + * @param metadata The metadata + * @param existingRecordMetadataPair The existing record metadata pair + * + * @return The pair of the record that needs to be updated with and its metadata, + * returns empty to skip the update. */ - protected Option<Pair<Option<T>, Map<String, Object>>> doProcessNextDataRecord(T record, + protected Option<Pair<Option<T>, Map<String, Object>>> doProcessNextDataRecord(T newRecord, Map<String, Object> metadata, Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair) throws IOException { @@ -249,14 +250,12 @@ public abstract class FileGroupRecordBuffer<T> implements HoodieFileGroupRecordB // TODO(HUDI-7843): decouple the merging logic from the merger // and use the record merge mode to control how to merge partial updates // Merge and store the combined record - // Note that the incoming `record` is from an older commit, so it should be put as - // the `older` in the merge API Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt = recordMerger.get().partialMerge( - readerContext.constructHoodieRecord(Option.of(record), metadata), - readerContext.getSchemaFromMetadata(metadata), readerContext.constructHoodieRecord( existingRecordMetadataPair.getLeft(), existingRecordMetadataPair.getRight()), readerContext.getSchemaFromMetadata(existingRecordMetadataPair.getRight()), + readerContext.constructHoodieRecord(Option.of(newRecord), metadata), + readerContext.getSchemaFromMetadata(metadata), readerSchema, props); if (!combinedRecordAndSchemaOpt.isPresent()) { @@ -266,7 +265,7 @@ public abstract class FileGroupRecordBuffer<T> implements HoodieFileGroupRecordB HoodieRecord<T> combinedRecord = combinedRecordAndSchema.getLeft(); // If pre-combine returns existing record, no need to update it - if (combinedRecord.getData() != existingRecordMetadataPair.getLeft().get()) { + if (combinedRecord.getData() != existingRecordMetadataPair.getLeft().orElse(null)) { return Option.of(Pair.of( Option.ofNullable(combinedRecord.getData()), readerContext.updateSchemaAndResetOrderingValInMetadata(metadata, combinedRecordAndSchema.getRight()))); @@ -275,43 +274,47 @@ public abstract class FileGroupRecordBuffer<T> implements HoodieFileGroupRecordB } else { switch (recordMergeMode) { case COMMIT_TIME_ORDERING: - return Option.empty(); + return Option.of(Pair.of(Option.ofNullable(newRecord), metadata)); case EVENT_TIME_ORDERING: - Comparable existingOrderingValue = readerContext.getOrderingValue( - existingRecordMetadataPair.getLeft(), existingRecordMetadataPair.getRight(), - readerSchema, orderingFieldName); - if (isDeleteRecordWithNaturalOrder(existingRecordMetadataPair.getLeft(), existingOrderingValue)) { - return Option.empty(); - } - Comparable incomingOrderingValue = readerContext.getOrderingValue( - Option.of(record), metadata, readerSchema, orderingFieldName); - if (incomingOrderingValue.compareTo(existingOrderingValue) > 0) { - return Option.of(Pair.of(Option.of(record), metadata)); + if (shouldKeepNewerRecord(existingRecordMetadataPair.getLeft(), existingRecordMetadataPair.getRight(), Option.ofNullable(newRecord), metadata)) { + return Option.of(Pair.of(Option.of(newRecord), metadata)); } return Option.empty(); case CUSTOM: default: // Merge and store the combined record - // Note that the incoming `record` is from an older commit, so it should be put as - // the `older` in the merge API if (payloadClass.isPresent()) { + if (existingRecordMetadataPair.getLeft().isEmpty() + && shouldKeepNewerRecord(existingRecordMetadataPair.getLeft(), existingRecordMetadataPair.getRight(), Option.ofNullable(newRecord), metadata)) { + // IMPORTANT: + // this is needed when the fallback HoodieAvroRecordMerger got used, the merger would + // return Option.empty when the old payload data is empty(a delete) and ignores its ordering value directly. + return Option.of(Pair.of(Option.of(newRecord), metadata)); + } Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt = - getMergedRecord(Option.of(record), metadata, existingRecordMetadataPair.getLeft(), existingRecordMetadataPair.getRight()); + getMergedRecord(existingRecordMetadataPair.getLeft(), existingRecordMetadataPair.getRight(), Option.of(newRecord), metadata); if (combinedRecordAndSchemaOpt.isPresent()) { T combinedRecordData = readerContext.convertAvroRecord((IndexedRecord) combinedRecordAndSchemaOpt.get().getLeft().getData()); // If pre-combine does not return existing record, update it - if (combinedRecordData != existingRecordMetadataPair.getLeft().get()) { + if (combinedRecordData != existingRecordMetadataPair.getLeft().orElse(null)) { return Option.of(Pair.of(Option.ofNullable(combinedRecordData), metadata)); } } return Option.empty(); } else { + if (existingRecordMetadataPair.getLeft().isEmpty() + && shouldKeepNewerRecord(existingRecordMetadataPair.getLeft(), existingRecordMetadataPair.getRight(), Option.ofNullable(newRecord), metadata)) { + // IMPORTANT: + // this is needed when the fallback HoodieAvroRecordMerger got used, the merger would + // return Option.empty when the old payload data is empty(a delete) and ignores its ordering value directly. + return Option.of(Pair.of(Option.of(newRecord), metadata)); + } Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt = recordMerger.get().merge( - readerContext.constructHoodieRecord(Option.of(record), metadata), - readerContext.getSchemaFromMetadata(metadata), readerContext.constructHoodieRecord( existingRecordMetadataPair.getLeft(), existingRecordMetadataPair.getRight()), readerContext.getSchemaFromMetadata(existingRecordMetadataPair.getRight()), + readerContext.constructHoodieRecord(Option.of(newRecord), metadata), + readerContext.getSchemaFromMetadata(metadata), props); if (!combinedRecordAndSchemaOpt.isPresent()) { @@ -322,7 +325,7 @@ public abstract class FileGroupRecordBuffer<T> implements HoodieFileGroupRecordB HoodieRecord<T> combinedRecord = combinedRecordAndSchema.getLeft(); // If pre-combine returns existing record, no need to update it - if (combinedRecord.getData() != existingRecordMetadataPair.getLeft().get()) { + if (combinedRecord.getData() != existingRecordMetadataPair.getLeft().orElse(null)) { return Option.of(Pair.of(Option.ofNullable(combinedRecord.getData()), metadata)); } return Option.empty(); @@ -334,16 +337,17 @@ public abstract class FileGroupRecordBuffer<T> implements HoodieFileGroupRecordB // NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific // payload pointing into a shared, mutable (underlying) buffer we get a clean copy of // it since these records will be put into records(Map). - return Option.of(Pair.of(Option.ofNullable(record), metadata)); + return Option.of(Pair.of(Option.ofNullable(newRecord), metadata)); } } /** * Merge a delete record with another record (data, or delete). * - * @param deleteRecord - * @param existingRecordMetadataPair - * @return + * @param deleteRecord The delete record + * @param existingRecordMetadataPair The existing record metadata pair + * + * @return The option of new delete record that needs to be updated with. */ protected Option<DeleteRecord> doProcessNextDeletedRecord(DeleteRecord deleteRecord, Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair) { @@ -351,7 +355,7 @@ public abstract class FileGroupRecordBuffer<T> implements HoodieFileGroupRecordB if (existingRecordMetadataPair != null) { switch (recordMergeMode) { case COMMIT_TIME_ORDERING: - return Option.empty(); + return Option.of(deleteRecord); case EVENT_TIME_ORDERING: case CUSTOM: default: @@ -473,6 +477,17 @@ public abstract class FileGroupRecordBuffer<T> implements HoodieFileGroupRecordB case CUSTOM: default: if (payloadClass.isPresent()) { + if (older.isEmpty() || newer.isEmpty()) { + if (shouldKeepNewerRecord(older, olderInfoMap, newer, newerInfoMap)) { + // IMPORTANT: + // this is needed when the fallback HoodieAvroRecordMerger got used, the merger would + // return Option.empty when the new payload data is empty(a delete) and ignores its ordering value directly. + return newer; + } else { + return older; + } + } + Option<Pair<HoodieRecord, Schema>> mergedRecord = getMergedRecord(older, olderInfoMap, newer, newerInfoMap); if (mergedRecord.isPresent() @@ -487,6 +502,16 @@ public abstract class FileGroupRecordBuffer<T> implements HoodieFileGroupRecordB } return Option.empty(); } else { + if (older.isEmpty() || newer.isEmpty()) { + if (shouldKeepNewerRecord(older, olderInfoMap, newer, newerInfoMap)) { + // IMPORTANT: + // this is needed when the fallback HoodieAvroRecordMerger got used, the merger would + // return Option.empty when the new payload data is empty(a delete) and ignores its ordering value directly. + return newer; + } else { + return older; + } + } Option<Pair<HoodieRecord, Schema>> mergedRecord = recordMerger.get().merge( readerContext.constructHoodieRecord(older, olderInfoMap), readerContext.getSchemaFromMetadata(olderInfoMap), readerContext.constructHoodieRecord(newer, newerInfoMap), readerContext.getSchemaFromMetadata(newerInfoMap), props); @@ -504,6 +529,19 @@ public abstract class FileGroupRecordBuffer<T> implements HoodieFileGroupRecordB } } + /** + * Decides whether to keep the incoming record with ordering value comparison. + */ + private boolean shouldKeepNewerRecord(Option<T> oldVal, Map<String, Object> oldMetadata, Option<T> newVal, Map<String, Object> newMetadata) { + Comparable newOrderingVal = readerContext.getOrderingValue(newVal, newMetadata, readerSchema, orderingFieldName); + if (isDeleteRecordWithNaturalOrder(newVal, newOrderingVal)) { + // handle records coming from DELETE statements(the orderingVal is constant 0) + return true; + } + Comparable oldOrderingVal = readerContext.getOrderingValue(oldVal, oldMetadata, readerSchema, orderingFieldName); + return newOrderingVal.compareTo(oldOrderingVal) >= 0; + } + private Option<Pair<HoodieRecord, Schema>> getMergedRecord(Option<T> older, Map<String, Object> olderInfoMap, Option<T> newer, Map<String, Object> newerInfoMap) throws IOException { ValidationUtils.checkArgument(!Objects.equals(payloadClass, OverwriteWithLatestAvroPayload.class.getCanonicalName()) && !Objects.equals(payloadClass, DefaultHoodieRecordPayload.class.getCanonicalName())); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java index bd450b77d31..6b2f4d2c656 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java @@ -54,7 +54,6 @@ import java.util.Set; import java.util.function.Function; import static org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_RECORD_KEY; -import static org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE; /** * A buffer that is used to store log records by {@link org.apache.hudi.common.table.log.HoodieMergedLogRecordReader} @@ -184,10 +183,22 @@ public class PositionBasedFileGroupRecordBuffer<T> extends KeyBasedFileGroupReco switch (recordMergeMode) { case COMMIT_TIME_ORDERING: + int commitTimeBasedRecordIndex = 0; + DeleteRecord[] deleteRecords = deleteBlock.getRecordsToDelete(); for (Long recordPosition : recordPositions) { - records.putIfAbsent(recordPosition, + // IMPORTANT: + // use #put for log files with regular order(see HoodieLogFile.LOG_FILE_COMPARATOR); + // use #putIfAbsent for log files with reverse order(see HoodieLogFile.LOG_FILE_COMPARATOR_REVERSED), + // the delete block would be parsed ahead of a data block if they are in different log files. + + // set up the record key for key-based fallback handling, this is needed + // because under hybrid strategy in #doHasNextFallbackBaseRecord, if the record keys are not set up, + // this delete-vector could be kept in the records cache(see the check in #fallbackToKeyBasedBuffer), + // and these keys would be deleted no matter whether there are following-up inserts/updates. + DeleteRecord deleteRecord = deleteRecords[commitTimeBasedRecordIndex++]; + records.put(recordPosition, Pair.of(Option.empty(), readerContext.generateMetadataForRecord( - null, "", DEFAULT_ORDERING_VALUE))); + deleteRecord.getRecordKey(), "", deleteRecord.getOrderingValue()))); } return; case EVENT_TIME_ORDERING: @@ -246,12 +257,11 @@ public class PositionBasedFileGroupRecordBuffer<T> extends KeyBasedFileGroupReco Map<String, Object> metadata = readerContext.generateMetadataForRecord( baseRecord, readerSchema); - Option<T> resultRecord = Option.empty(); + final Option<T> resultRecord; if (logRecordInfo != null) { resultRecord = merge( Option.of(baseRecord), metadata, logRecordInfo.getLeft(), logRecordInfo.getRight()); if (resultRecord.isPresent()) { - nextRecord = readerContext.seal(resultRecord.get()); readStats.incrementNumUpdates(); } else { readStats.incrementNumDeletes(); @@ -275,7 +285,7 @@ public class PositionBasedFileGroupRecordBuffer<T> extends KeyBasedFileGroupReco ROW_INDEX_TEMPORARY_COLUMN_NAME, nextRecordPosition); Pair<Option<T>, Map<String, Object>> logRecordInfo = records.remove(nextRecordPosition++); if (logRecordInfo != null) { - //we have a delete that was not able to be converted. Since it is the newest version, the record is deleted + //we have a delete that was not to be able to be converted. Since it is the newest version, the record is deleted //remove a key based record if it exists records.remove(readerContext.getRecordKey(baseRecord, readerSchema)); return false; diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java index 31d0700935b..6d7f5a468d3 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java @@ -168,8 +168,9 @@ public class TestCustomMerger extends HoodieFileGroupReaderTestHarness { public void testWithThreeLogFiles(boolean useRecordPositions) throws IOException, InterruptedException { shouldWritePositions = Arrays.asList(useRecordPositions, useRecordPositions, useRecordPositions, useRecordPositions); ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(4, useRecordPositions); + // The records with keys 6 and 8 are deletes with lower ordering val List<String> leftKeysExpected = - Arrays.asList("1", "3", "7", "9", "10"); + Arrays.asList("1", "3", "6", "7", "8", "9", "10"); List<String> leftKeysActual = new ArrayList<>(); while (iterator.hasNext()) { leftKeysActual.add(iterator.next() diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java index c927893a115..64e5bf1500c 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java @@ -62,6 +62,7 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import static org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_ORDERING_FIELD; import static org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_RECORD_KEY; import static org.apache.hudi.common.model.WriteOperationType.INSERT; import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.BASE_FILE_INSTANT_TIME_OF_RECORD_POSITIONS; @@ -203,7 +204,10 @@ public class TestPositionBasedFileGroupRecordBuffer extends TestHoodieFileGroupR if (sameBaseInstantTime) { // If the log block's base instant time of record positions match the base file // to merge, the log records are stored based on the position - assertNull(buffer.getLogRecords().get(0L).getRight().get(INTERNAL_META_RECORD_KEY)); + assertNotNull(buffer.getLogRecords().get(0L).getRight().get(INTERNAL_META_RECORD_KEY), + "the record key is set up for fallback handling"); + assertNotNull(buffer.getLogRecords().get(0L).getRight().get(INTERNAL_META_ORDERING_FIELD), + "the ordering value is set up for fallback handling"); } else { // If the log block's base instant time of record positions does not match the // base file to merge, the log records are stored based on the record key diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala index 4120956b4ce..b9f71c687d2 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala @@ -269,7 +269,7 @@ class TestHoodieFileGroupReaderOnSpark extends TestHoodieFileGroupReaderBase[Int val columnsToCompare = Set("ts", "key", "rider", "driver", "fare", "op") val df = spark.read.options(readOpts).format("hudi").load(getBasePath) val finalDf = df.select("ts", "key", "rider", "driver", "fare", "op").sort("key") - val expected = if (mergeMode == RecordMergeMode.EVENT_TIME_ORDERING.name()) { + val expected = if (mergeMode != RecordMergeMode.COMMIT_TIME_ORDERING.name()) { expectedEventTimeBased } else { expectedCommitTimeBased
