yihua commented on code in PR #13213:
URL: https://github.com/apache/hudi/pull/13213#discussion_r2057903288
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -337,34 +332,31 @@ &&
shouldKeepNewerRecord(existingRecordMetadataPair.getLeft(), existingRecordMet
// NOTE: Record have to be cloned here to make sure if it holds
low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer we
get a clean copy of
// it since these records will be put into records(Map).
- return Option.of(Pair.of(Option.ofNullable(newRecord), metadata));
+ return Option.of(newRecord);
}
}
/**
* Merge a delete record with another record (data, or delete).
*
* @param deleteRecord The delete record
- * @param existingRecordMetadataPair The existing record metadata pair
+ * @param existingRecord The existing record metadata pair
Review Comment:
nit: update Javadocs since it's changed to buffered record
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -498,32 +482,32 @@ protected Option<T> merge(Option<T> older, Map<String,
Object> olderInfoMap,
} else {
indexedRecord = (IndexedRecord)
mergedRecord.get().getLeft().getData();
}
- return
Option.ofNullable(readerContext.convertAvroRecord(indexedRecord));
+ return Pair.of(false,
readerContext.convertAvroRecord(indexedRecord));
}
- return Option.empty();
+ return null;
Review Comment:
Similar here
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -498,32 +482,32 @@ protected Option<T> merge(Option<T> older, Map<String,
Object> olderInfoMap,
} else {
indexedRecord = (IndexedRecord)
mergedRecord.get().getLeft().getData();
}
- return
Option.ofNullable(readerContext.convertAvroRecord(indexedRecord));
+ return Pair.of(false,
readerContext.convertAvroRecord(indexedRecord));
}
- return Option.empty();
+ return null;
} else {
- if (older.isEmpty() || newer.isEmpty()) {
- if (shouldKeepNewerRecord(older, olderInfoMap, newer,
newerInfoMap)) {
+ if (olderRecord.isDelete() || newerRecord.isDelete()) {
+ if (shouldKeepNewerRecord(olderRecord, newerRecord)) {
// IMPORTANT:
// this is needed when the fallback HoodieAvroRecordMerger got
used, the merger would
// return Option.empty when the new payload data is empty(a
delete) and ignores its ordering value directly.
- return newer;
+ return Pair.of(newerRecord.isDelete(),
newerRecord.getRecord());
} else {
- return older;
+ return Pair.of(olderRecord.isDelete(),
olderRecord.getRecord());
}
}
Option<Pair<HoodieRecord, Schema>> mergedRecord =
recordMerger.get().merge(
- readerContext.constructHoodieRecord(older, olderInfoMap),
readerContext.getSchemaFromMetadata(olderInfoMap),
- readerContext.constructHoodieRecord(newer, newerInfoMap),
readerContext.getSchemaFromMetadata(newerInfoMap), props);
+ readerContext.constructHoodieRecord(olderRecord),
readerContext.getSchemaFromBufferRecord(olderRecord),
+ readerContext.constructHoodieRecord(newerRecord),
readerContext.getSchemaFromBufferRecord(newerRecord), props);
if (mergedRecord.isPresent()
&&
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(), props)) {
+ HoodieRecord hoodieRecord = mergedRecord.get().getLeft();
if (!mergedRecord.get().getRight().equals(readerSchema)) {
- return Option.ofNullable((T)
mergedRecord.get().getLeft().rewriteRecordWithNewSchema(mergedRecord.get().getRight(),
null, readerSchema).getData());
+ return Pair.of(false, (T)
hoodieRecord.rewriteRecordWithNewSchema(mergedRecord.get().getRight(), null,
readerSchema).getData());
}
- return Option.ofNullable((T)
mergedRecord.get().getLeft().getData());
+ return Pair.of(false, (T) hoodieRecord.getData());
}
-
- return Option.empty();
+ return null;
Review Comment:
Similar here on avoiding `null` if possible
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -498,32 +482,32 @@ protected Option<T> merge(Option<T> older, Map<String,
Object> olderInfoMap,
} else {
indexedRecord = (IndexedRecord)
mergedRecord.get().getLeft().getData();
}
- return
Option.ofNullable(readerContext.convertAvroRecord(indexedRecord));
+ return Pair.of(false,
readerContext.convertAvroRecord(indexedRecord));
}
- return Option.empty();
+ return null;
} else {
- if (older.isEmpty() || newer.isEmpty()) {
- if (shouldKeepNewerRecord(older, olderInfoMap, newer,
newerInfoMap)) {
+ if (olderRecord.isDelete() || newerRecord.isDelete()) {
+ if (shouldKeepNewerRecord(olderRecord, newerRecord)) {
// IMPORTANT:
// this is needed when the fallback HoodieAvroRecordMerger got
used, the merger would
// return Option.empty when the new payload data is empty(a
delete) and ignores its ordering value directly.
- return newer;
+ return Pair.of(newerRecord.isDelete(),
newerRecord.getRecord());
} else {
- return older;
+ return Pair.of(olderRecord.isDelete(),
olderRecord.getRecord());
}
}
Option<Pair<HoodieRecord, Schema>> mergedRecord =
recordMerger.get().merge(
- readerContext.constructHoodieRecord(older, olderInfoMap),
readerContext.getSchemaFromMetadata(olderInfoMap),
- readerContext.constructHoodieRecord(newer, newerInfoMap),
readerContext.getSchemaFromMetadata(newerInfoMap), props);
+ readerContext.constructHoodieRecord(olderRecord),
readerContext.getSchemaFromBufferRecord(olderRecord),
Review Comment:
One thing to validate later is that custom merger implementation can return
`HoodieEmptyRecord` to indicate deletes. That should be properly handled in
the CUSTOM merge mode.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -428,68 +420,60 @@ protected Option<Pair<Function<T, T>, Schema>>
composeEvolvedSchemaTransformer(
/**
* Merge two records using the configured record merger.
*
- * @param older
- * @param olderInfoMap
- * @param newer
- * @param newerInfoMap
+ * @param olderRecord
+ * @param newerRecord
* @return
* @throws IOException
*/
- protected Option<T> merge(Option<T> older, Map<String, Object> olderInfoMap,
- Option<T> newer, Map<String, Object> newerInfoMap)
throws IOException {
- if (!older.isPresent()) {
- return isDeleteRecord(newer,
readerContext.getSchemaFromMetadata(newerInfoMap)) ? Option.empty() : newer;
- }
-
+ protected Pair<Boolean, T> merge(BufferedRecord<T> olderRecord,
BufferedRecord<T> newerRecord) throws IOException {
if (enablePartialMerging) {
// TODO(HUDI-7843): decouple the merging logic from the merger
// and use the record merge mode to control how to merge partial updates
Option<Pair<HoodieRecord, Schema>> mergedRecord =
recordMerger.get().partialMerge(
- readerContext.constructHoodieRecord(older, olderInfoMap),
readerContext.getSchemaFromMetadata(olderInfoMap),
- readerContext.constructHoodieRecord(newer, newerInfoMap),
readerContext.getSchemaFromMetadata(newerInfoMap),
+ readerContext.constructHoodieRecord(olderRecord),
readerContext.getSchemaFromBufferRecord(olderRecord),
+ readerContext.constructHoodieRecord(newerRecord),
readerContext.getSchemaFromBufferRecord(newerRecord),
readerSchema, props);
if (mergedRecord.isPresent()
&&
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(), props)) {
+ HoodieRecord hoodieRecord = mergedRecord.get().getLeft();
if (!mergedRecord.get().getRight().equals(readerSchema)) {
- return Option.ofNullable((T)
mergedRecord.get().getLeft().rewriteRecordWithNewSchema(mergedRecord.get().getRight(),
null, readerSchema).getData());
+ T data = (T)
hoodieRecord.rewriteRecordWithNewSchema(mergedRecord.get().getRight(), null,
readerSchema).getData();
+ return Pair.of(false, data);
}
- return Option.ofNullable((T) mergedRecord.get().getLeft().getData());
+ return Pair.of(false, (T) hoodieRecord.getData());
}
- return Option.empty();
+ return null;
Review Comment:
Should this be `Pair.of(true, null)` so that return value is never `null`?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -428,68 +420,60 @@ protected Option<Pair<Function<T, T>, Schema>>
composeEvolvedSchemaTransformer(
/**
* Merge two records using the configured record merger.
*
- * @param older
- * @param olderInfoMap
- * @param newer
- * @param newerInfoMap
+ * @param olderRecord
+ * @param newerRecord
* @return
Review Comment:
Update Javadocs on return value
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -265,56 +257,59 @@ protected Option<Pair<Option<T>, Map<String, Object>>>
doProcessNextDataRecord(T
HoodieRecord<T> combinedRecord = combinedRecordAndSchema.getLeft();
// If pre-combine returns existing record, no need to update it
- if (combinedRecord.getData() !=
existingRecordMetadataPair.getLeft().orElse(null)) {
- return Option.of(Pair.of(
- Option.ofNullable(combinedRecord.getData()),
-
readerContext.updateSchemaAndResetOrderingValInMetadata(metadata,
combinedRecordAndSchema.getRight())));
+ if (combinedRecord.getData() != existingRecord.getRecord()) {
+ return Option.of(BufferedRecord.forRecordWithContext(combinedRecord,
combinedRecordAndSchema.getRight(), readerContext, props));
}
return Option.empty();
} else {
switch (recordMergeMode) {
case COMMIT_TIME_ORDERING:
- return Option.of(Pair.of(Option.ofNullable(newRecord), metadata));
+ return Option.of(newRecord);
case EVENT_TIME_ORDERING:
- if (shouldKeepNewerRecord(existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight(), Option.ofNullable(newRecord), metadata))
{
- return Option.of(Pair.of(Option.of(newRecord), metadata));
+ if (shouldKeepNewerRecord(existingRecord, newRecord)) {
+ return Option.of(newRecord);
}
return Option.empty();
case CUSTOM:
default:
// Merge and store the combined record
if (payloadClass.isPresent()) {
- if (existingRecordMetadataPair.getLeft().isEmpty()
- &&
shouldKeepNewerRecord(existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight(), Option.ofNullable(newRecord), metadata))
{
- // IMPORTANT:
- // this is needed when the fallback HoodieAvroRecordMerger got
used, the merger would
- // return Option.empty when the old payload data is empty(a
delete) and ignores its ordering value directly.
- return Option.of(Pair.of(Option.of(newRecord), metadata));
+ if (existingRecord.isDelete() || newRecord.isDelete()) {
+ if (shouldKeepNewerRecord(existingRecord, newRecord)) {
+ // IMPORTANT:
+ // this is needed when the fallback HoodieAvroRecordMerger
got used, the merger would
+ // return Option.empty when the old payload data is empty(a
delete) and ignores its ordering value directly.
+ return Option.of(newRecord);
+ } else {
+ return Option.empty();
+ }
Review Comment:
This part can be extracted out since it is the same in both payload-based
and merger-based logic (if-else branches).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]