danny0405 commented on code in PR #11077:
URL: https://github.com/apache/hudi/pull/11077#discussion_r1577299394
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java:
##########
@@ -243,34 +250,57 @@ public static HoodieMergedLogRecordScanner.Builder
newBuilder() {
@Override
public <T> void processNextRecord(HoodieRecord<T> newRecord) throws
IOException {
String key = newRecord.getRecordKey();
- HoodieRecord<T> prevRecord = records.get(key);
+ HoodieMergeKey mergeKey = newRecord.getMergeKey();
+ if (mergeKey == null) {
+ // If mergeKey is null, then create a simple merge key using record key
+ mergeKey = new HoodieSimpleMergeKey(newRecord.getKey());
+ }
+ HoodieRecord<T> prevRecord = records.get(mergeKey);
if (prevRecord != null) {
// Merge and store the combined record
- HoodieRecord<T> combinedRecord = (HoodieRecord<T>)
recordMerger.merge(prevRecord, readerSchema,
- newRecord, readerSchema, this.getPayloadProps()).get().getLeft();
- // If pre-combine returns existing record, no need to update it
- if (combinedRecord.getData() != prevRecord.getData()) {
- HoodieRecord latestHoodieRecord =
- combinedRecord.newInstance(new HoodieKey(key,
newRecord.getPartitionPath()), newRecord.getOperation());
-
- latestHoodieRecord.unseal();
- latestHoodieRecord.setCurrentLocation(newRecord.getCurrentLocation());
- latestHoodieRecord.seal();
-
- // NOTE: Record have to be cloned here to make sure if it holds
low-level engine-specific
- // payload pointing into a shared, mutable (underlying) buffer
we get a clean copy of
- // it since these records will be put into records(Map).
- records.put(key, latestHoodieRecord.copy());
+ try {
+ HoodieMergeKey finalMergeKey = mergeKey;
+ recordMerger.fullOuterMerge(prevRecord, readerSchema, newRecord,
readerSchema, this.getPayloadProps()).forEach(
+ mergedRecord -> {
+ HoodieRecord<T> combinedRecord = mergedRecord.getLeft();
+ if (combinedRecord.getData() != prevRecord.getData()) {
+ HoodieRecord latestHoodieRecord =
getLatestHoodieRecord(newRecord, combinedRecord, key);
+ records.put(finalMergeKey, latestHoodieRecord.copy());
+ }
+ });
+ } catch (UnsupportedOperationException e) {
+ LOG.warn("Error merging records: Full outer merge not supported,
falling back to default merge for key {}", key);
+ HoodieRecord<T> combinedRecord = (HoodieRecord<T>)
recordMerger.merge(prevRecord, readerSchema,
+ newRecord, readerSchema, this.getPayloadProps()).get().getLeft();
+ // If pre-combine returns existing record, no need to update it
+ if (combinedRecord.getData() != prevRecord.getData()) {
Review Comment:
I don't think we should put any regular code path into a catch block.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]