nsivabalan commented on code in PR #13742:
URL: https://github.com/apache/hudi/pull/13742#discussion_r2301141732
##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveRecordContext.java:
##########
@@ -126,6 +125,25 @@ public Comparable convertValueToEngineType(Comparable
value) {
return value;
}
+ public Comparable convertValueFromEngineType(Comparable value) {
Review Comment:
can you confirm we have added UTs for these.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -465,46 +451,45 @@ public CustomPayloadRecordMerger(
}
@Override
- public Option<BufferedRecord<T>>
deltaMergeNonDeleteRecord(BufferedRecord<T> newRecord, BufferedRecord<T>
existingRecord) throws IOException {
- Option<Pair<HoodieRecord, Schema>> mergedRecordAndSchema =
getMergedRecord(existingRecord, newRecord, false);
- if (mergedRecordAndSchema.isEmpty()) {
- // An empty Option indicates that the output represents a delete.
- return Option.of(new BufferedRecord<>(newRecord.getRecordKey(),
OrderingValues.getDefault(), null, null, HoodieOperation.DELETE));
- }
- HoodieRecord mergedRecord = mergedRecordAndSchema.get().getLeft();
- Schema mergeResultSchema = mergedRecordAndSchema.get().getRight();
+ public Option<BufferedRecord<T>> deltaMergeRecords(BufferedRecord<T>
newRecord, BufferedRecord<T> existingRecord) throws IOException {
+ Pair<HoodieRecord, Schema> mergedRecordAndSchema =
getMergedRecord(existingRecord, newRecord, false);
+ HoodieRecord mergedRecord = mergedRecordAndSchema.getLeft();
+ Schema mergeResultSchema = mergedRecordAndSchema.getRight();
// Special handling for SENTINEL record in Expression Payload. This is
returned if the condition does not match.
if (mergedRecord.getData() == HoodieRecord.SENTINEL) {
return Option.empty();
}
- T combinedRecordData =
recordContext.convertAvroRecord(mergedRecord.toIndexedRecord(mergeResultSchema,
props).get().getData());
+ Option<T> combinedRecordData =
(mergedRecord.toIndexedRecord(mergeResultSchema, props).map(indexedRecord ->
recordContext.convertAvroRecord(indexedRecord.getData())));
// If pre-combine does not return existing record, update it
- if (combinedRecordData != existingRecord.getRecord()) {
+ if (combinedRecordData.map(record -> record !=
existingRecord.getRecord()).orElse(true)) {
// For pkless we need to use record key from existing record
- return Option.of(BufferedRecords.fromEngineRecord(combinedRecordData,
mergeResultSchema, recordContext, orderingFieldNames,
- existingRecord.getRecordKey(),
mergedRecord.isDelete(mergeResultSchema, props)));
+ boolean isDelete = mergedRecord.isDelete(readerSchema, props);
Review Comment:
readerSchema -> mergeResultSchema
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java:
##########
@@ -39,33 +41,33 @@ public HoodieRecord.HoodieRecordType getRecordType() {
* Basic handling of deletes that is used by many of the spark mergers
* returns null if merger specific logic should be used
*/
- protected Option<Pair<HoodieRecord, Schema>> handleDeletes(HoodieRecord
older, Schema oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties
props) {
+ protected Pair<HoodieRecord, Schema> handleDeletes(HoodieRecord older,
Schema oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) {
ValidationUtils.checkArgument(older.getRecordType() ==
HoodieRecord.HoodieRecordType.SPARK);
ValidationUtils.checkArgument(newer.getRecordType() ==
HoodieRecord.HoodieRecordType.SPARK);
if (newer instanceof HoodieSparkRecord) {
HoodieSparkRecord newSparkRecord = (HoodieSparkRecord) newer;
if (newSparkRecord.isDelete(newSchema, props)) {
// Delete record
- return Option.empty();
+ return Pair.of(new HoodieEmptyRecord<>(newer.getKey(),
HoodieOperation.DELETE, OrderingValues.getDefault(),
HoodieRecord.HoodieRecordType.SPARK), newSchema);
Review Comment:
can you track all these inconsistencies in some jira, so that we can revisit
at some point.
##########
hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java:
##########
@@ -74,11 +74,7 @@ public OverwriteWithLatestAvroPayload
preCombine(OverwriteWithLatestAvroPayload
@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord
currentValue, Schema schema, Properties properties) throws IOException {
- if (recordBytes.length == 0) {
- return Option.empty();
- }
-
- GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes,
schema);
+ Option<IndexedRecord> incomingRecord = recordBytes.length == 0 ?
Option.empty() : Option.of(HoodieAvroUtils.bytesToAvro(recordBytes, schema));
Review Comment:
danny: diff is, previously, we return right away if bytes is empty. but now,
we do not. we proceed after line 77. and thats where the fix applies that tim
responded above.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -567,42 +553,36 @@ public Option<BufferedRecord<T>>
deltaMerge(BufferedRecord<T> newRecord, Buffere
if (existingRecord == null) {
return Option.of(newRecord);
}
- if (existingRecord.isDelete() || newRecord.isDelete()) {
- if (shouldKeepNewerRecord(existingRecord, newRecord)) {
- // IMPORTANT:
- // this is needed when the fallback HoodieAvroRecordMerger got used,
the merger would
- // return Option.empty when the old payload data is empty(a delete)
and ignores its ordering value directly.
- return Option.of(newRecord);
- } else {
- return Option.empty();
- }
- }
- return deltaMergeNonDeleteRecord(newRecord, existingRecord);
+ return deltaMergeRecords(newRecord, existingRecord);
}
- public abstract Option<BufferedRecord<T>>
deltaMergeNonDeleteRecord(BufferedRecord<T> newRecord, BufferedRecord<T>
existingRecord) throws IOException;
+ public abstract Option<BufferedRecord<T>>
deltaMergeRecords(BufferedRecord<T> newRecord, BufferedRecord<T>
existingRecord) throws IOException;
@Override
public Option<DeleteRecord> deltaMerge(DeleteRecord deleteRecord,
BufferedRecord<T> existingRecord) {
- return deltaMergeDeleteRecord(deleteRecord, existingRecord);
+ BufferedRecord<T> deleteBufferedRecord =
BufferedRecords.fromDeleteRecord(deleteRecord, recordContext);
+ try {
+ Option<BufferedRecord<T>> merged = deltaMerge(deleteBufferedRecord,
existingRecord);
+ // If the delete record is chosen, return an option with the delete
record, otherwise return empty.
+ return merged.isPresent() ? Option.of(deleteRecord) : Option.empty();
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to process delete record", e);
+ }
}
@Override
public BufferedRecord<T> finalMerge(BufferedRecord<T> olderRecord,
BufferedRecord<T> newerRecord) throws IOException {
- if (olderRecord.isDelete() || newerRecord.isDelete()) {
- if (shouldKeepNewerRecord(olderRecord, newerRecord)) {
- // IMPORTANT:
- // this is needed when the fallback HoodieAvroRecordMerger got used,
the merger would
- // return Option.empty when the new payload data is empty(a delete)
and ignores its ordering value directly.
- return newerRecord;
- } else {
- return olderRecord;
- }
+ if (olderRecord == null) {
+ return newerRecord;
+ }
+ // handle special case for deletes that are sent to older partitions in
global-index, this delete takes precedence regardless of the previous value
+ if (newerRecord.getHoodieOperation() == HoodieOperation.UPDATE_BEFORE) {
Review Comment:
in case of global index, won't the delete record will have commit time
ordering value. So, merging w/ existing records for older partitions should
work w/o this special handling is what I thought.
can you help explain why we need this special handling?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -465,46 +451,45 @@ public CustomPayloadRecordMerger(
}
@Override
- public Option<BufferedRecord<T>>
deltaMergeNonDeleteRecord(BufferedRecord<T> newRecord, BufferedRecord<T>
existingRecord) throws IOException {
- Option<Pair<HoodieRecord, Schema>> mergedRecordAndSchema =
getMergedRecord(existingRecord, newRecord, false);
- if (mergedRecordAndSchema.isEmpty()) {
- // An empty Option indicates that the output represents a delete.
- return Option.of(new BufferedRecord<>(newRecord.getRecordKey(),
OrderingValues.getDefault(), null, null, HoodieOperation.DELETE));
- }
- HoodieRecord mergedRecord = mergedRecordAndSchema.get().getLeft();
- Schema mergeResultSchema = mergedRecordAndSchema.get().getRight();
+ public Option<BufferedRecord<T>> deltaMergeRecords(BufferedRecord<T>
newRecord, BufferedRecord<T> existingRecord) throws IOException {
+ Pair<HoodieRecord, Schema> mergedRecordAndSchema =
getMergedRecord(existingRecord, newRecord, false);
+ HoodieRecord mergedRecord = mergedRecordAndSchema.getLeft();
+ Schema mergeResultSchema = mergedRecordAndSchema.getRight();
// Special handling for SENTINEL record in Expression Payload. This is
returned if the condition does not match.
if (mergedRecord.getData() == HoodieRecord.SENTINEL) {
return Option.empty();
}
- T combinedRecordData =
recordContext.convertAvroRecord(mergedRecord.toIndexedRecord(mergeResultSchema,
props).get().getData());
+ Option<T> combinedRecordData =
(mergedRecord.toIndexedRecord(mergeResultSchema, props).map(indexedRecord ->
recordContext.convertAvroRecord(indexedRecord.getData())));
Review Comment:
essentially whereever we are handling SENTINEL, can we remove from other
mergers?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -465,46 +451,45 @@ public CustomPayloadRecordMerger(
}
@Override
- public Option<BufferedRecord<T>>
deltaMergeNonDeleteRecord(BufferedRecord<T> newRecord, BufferedRecord<T>
existingRecord) throws IOException {
- Option<Pair<HoodieRecord, Schema>> mergedRecordAndSchema =
getMergedRecord(existingRecord, newRecord, false);
- if (mergedRecordAndSchema.isEmpty()) {
- // An empty Option indicates that the output represents a delete.
- return Option.of(new BufferedRecord<>(newRecord.getRecordKey(),
OrderingValues.getDefault(), null, null, HoodieOperation.DELETE));
- }
- HoodieRecord mergedRecord = mergedRecordAndSchema.get().getLeft();
- Schema mergeResultSchema = mergedRecordAndSchema.get().getRight();
+ public Option<BufferedRecord<T>> deltaMergeRecords(BufferedRecord<T>
newRecord, BufferedRecord<T> existingRecord) throws IOException {
+ Pair<HoodieRecord, Schema> mergedRecordAndSchema =
getMergedRecord(existingRecord, newRecord, false);
+ HoodieRecord mergedRecord = mergedRecordAndSchema.getLeft();
+ Schema mergeResultSchema = mergedRecordAndSchema.getRight();
// Special handling for SENTINEL record in Expression Payload. This is
returned if the condition does not match.
if (mergedRecord.getData() == HoodieRecord.SENTINEL) {
return Option.empty();
}
- T combinedRecordData =
recordContext.convertAvroRecord(mergedRecord.toIndexedRecord(mergeResultSchema,
props).get().getData());
+ Option<T> combinedRecordData =
(mergedRecord.toIndexedRecord(mergeResultSchema, props).map(indexedRecord ->
recordContext.convertAvroRecord(indexedRecord.getData())));
// If pre-combine does not return existing record, update it
- if (combinedRecordData != existingRecord.getRecord()) {
+ if (combinedRecordData.map(record -> record !=
existingRecord.getRecord()).orElse(true)) {
// For pkless we need to use record key from existing record
- return Option.of(BufferedRecords.fromEngineRecord(combinedRecordData,
mergeResultSchema, recordContext, orderingFieldNames,
- existingRecord.getRecordKey(),
mergedRecord.isDelete(mergeResultSchema, props)));
+ boolean isDelete = mergedRecord.isDelete(readerSchema, props);
+ Comparable orderingValue =
mergedRecord.getOrderingValue(mergeResultSchema, props, orderingFieldNames);
+ T mergedEngineRecord = mergedRecord.toIndexedRecord(mergeResultSchema,
props)
+ .map(hoodieAvroIndexedRecord ->
recordContext.convertAvroRecord(hoodieAvroIndexedRecord.getData()))
+ .orElse(null);
+ return Option.of(BufferedRecords.fromEngineRecord(mergedEngineRecord,
readerSchema, recordContext, orderingValue, existingRecord.getRecordKey(),
isDelete));
+
}
return Option.empty();
}
@Override
- public BufferedRecord<T> mergeNonDeleteRecord(BufferedRecord<T>
olderRecord, BufferedRecord<T> newerRecord) throws IOException {
- Option<Pair<HoodieRecord, Schema>> mergedRecordAndSchema =
getMergedRecord(olderRecord, newerRecord, true);
- if (mergedRecordAndSchema.isEmpty()) {
- return BufferedRecords.createDelete(newerRecord.getRecordKey());
- }
- HoodieRecord mergedRecord = mergedRecordAndSchema.get().getLeft();
- Schema mergeResultSchema = mergedRecordAndSchema.get().getRight();
+ public BufferedRecord<T> mergeRecords(BufferedRecord<T> olderRecord,
BufferedRecord<T> newerRecord) throws IOException {
+ Pair<HoodieRecord, Schema> mergedRecordAndSchema =
getMergedRecord(olderRecord, newerRecord, true);
+
+ HoodieRecord mergedRecord = mergedRecordAndSchema.getLeft();
+ Schema mergeResultSchema = mergedRecordAndSchema.getRight();
// Special handling for SENTINEL record in Expression Payload
if (mergedRecord.getData() == HoodieRecord.SENTINEL) {
return olderRecord;
}
- if (!mergedRecord.isDelete(mergeResultSchema, props)) {
- IndexedRecord indexedRecord = (IndexedRecord) mergedRecord.getData();
- return BufferedRecords.fromEngineRecord(
- recordContext.convertAvroRecord(indexedRecord), mergeResultSchema,
recordContext, orderingFieldNames, newerRecord.getRecordKey(), false);
- }
- return BufferedRecords.createDelete(newerRecord.getRecordKey());
+ boolean isDelete = mergedRecord.isDelete(readerSchema, props);
+ Comparable orderingValue =
mergedRecord.getOrderingValue(mergeResultSchema, props, orderingFieldNames);
+ T mergedEngineRecord = mergedRecord.toIndexedRecord(mergeResultSchema,
props)
Review Comment:
I am with you Danny. we should def fix such unnecessary conversions.
one thing I am worried about is the time to get this patch merged and
release 1.1.
in general, lets categorize functional and performance, and within
performance, whether its regressing from existing behavior or not.
For perf enhancements which on par w/ existing code, I would recommend, we
track these in a follow up jira and attend to them post 1.1.
We wanted to get everything done by squeezing every performance issues that
we spot, it keeps delaying the release.
I am pretty sure if we introduce the changes you are proposing, we might
have to fix non trivial amount of places.
##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/DefaultHiveRecordMerger.java:
##########
@@ -39,33 +41,33 @@ public class DefaultHiveRecordMerger extends
HoodieHiveRecordMerger {
private String[] orderingFields;
@Override
- public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws
IOException {
+ public Pair<HoodieRecord, Schema> merge(HoodieRecord older, Schema
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws
IOException {
ValidationUtils.checkArgument(older.getRecordType() ==
HoodieRecord.HoodieRecordType.HIVE);
ValidationUtils.checkArgument(newer.getRecordType() ==
HoodieRecord.HoodieRecordType.HIVE);
if (newer instanceof HoodieHiveRecord) {
HoodieHiveRecord newHiveRecord = (HoodieHiveRecord) newer;
if (newHiveRecord.isDelete(newSchema, props)) {
- return Option.empty();
+ return Pair.of(new HoodieEmptyRecord<>(newer.getKey(),
HoodieOperation.DELETE, OrderingValues.getDefault(),
HoodieRecord.HoodieRecordType.HIVE), newSchema);
Review Comment:
I also noticed this gap some where else.
if this is not breaking existing behavior, can we file a follow up jira and
revisit entire deletes + ordering value gaps separately rather than trying to
fix it in the same patch.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -465,46 +451,45 @@ public CustomPayloadRecordMerger(
}
@Override
- public Option<BufferedRecord<T>>
deltaMergeNonDeleteRecord(BufferedRecord<T> newRecord, BufferedRecord<T>
existingRecord) throws IOException {
- Option<Pair<HoodieRecord, Schema>> mergedRecordAndSchema =
getMergedRecord(existingRecord, newRecord, false);
- if (mergedRecordAndSchema.isEmpty()) {
- // An empty Option indicates that the output represents a delete.
- return Option.of(new BufferedRecord<>(newRecord.getRecordKey(),
OrderingValues.getDefault(), null, null, HoodieOperation.DELETE));
- }
- HoodieRecord mergedRecord = mergedRecordAndSchema.get().getLeft();
- Schema mergeResultSchema = mergedRecordAndSchema.get().getRight();
+ public Option<BufferedRecord<T>> deltaMergeRecords(BufferedRecord<T>
newRecord, BufferedRecord<T> existingRecord) throws IOException {
+ Pair<HoodieRecord, Schema> mergedRecordAndSchema =
getMergedRecord(existingRecord, newRecord, false);
+ HoodieRecord mergedRecord = mergedRecordAndSchema.getLeft();
+ Schema mergeResultSchema = mergedRecordAndSchema.getRight();
// Special handling for SENTINEL record in Expression Payload. This is
returned if the condition does not match.
if (mergedRecord.getData() == HoodieRecord.SENTINEL) {
return Option.empty();
}
- T combinedRecordData =
recordContext.convertAvroRecord(mergedRecord.toIndexedRecord(mergeResultSchema,
props).get().getData());
+ Option<T> combinedRecordData =
(mergedRecord.toIndexedRecord(mergeResultSchema, props).map(indexedRecord ->
recordContext.convertAvroRecord(indexedRecord.getData())));
// If pre-combine does not return existing record, update it
- if (combinedRecordData != existingRecord.getRecord()) {
+ if (combinedRecordData.map(record -> record !=
existingRecord.getRecord()).orElse(true)) {
// For pkless we need to use record key from existing record
- return Option.of(BufferedRecords.fromEngineRecord(combinedRecordData,
mergeResultSchema, recordContext, orderingFieldNames,
- existingRecord.getRecordKey(),
mergedRecord.isDelete(mergeResultSchema, props)));
+ boolean isDelete = mergedRecord.isDelete(readerSchema, props);
+ Comparable orderingValue =
mergedRecord.getOrderingValue(mergeResultSchema, props, orderingFieldNames);
+ T mergedEngineRecord = mergedRecord.toIndexedRecord(mergeResultSchema,
props)
Review Comment:
yes. combinedRecordData at L462 and mergedEngineRecord at L468 looks to be
same. we should avoid it
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/CustomPayload.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.io.IOException;
+
+public class CustomPayload implements HoodieRecordPayload<CustomPayload> {
+ private final GenericRecord record;
+ private final Comparable orderingValue;
+
+ public CustomPayload(GenericRecord record, Comparable orderingValue) {
+ this.record = record;
+ this.orderingValue = orderingValue;
+ }
+
+ @Override
+ public CustomPayload preCombine(CustomPayload other) {
+ return this; // No-op for this test
+ }
+
+ @Override
+ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord
currentValue, Schema schema) {
+ Long olderTimestamp = (Long) ((GenericRecord)
currentValue).get("timestamp");
+ Long newerTimestamp = orderingValue == null ? (Long)
record.get("timestamp") : (Long) orderingValue;
+ if (olderTimestamp.equals(newerTimestamp)) {
+ // If the timestamps are the same, we do not update
+ return handleDeleteRecord(currentValue);
+ } else if (olderTimestamp < newerTimestamp) {
+ // Custom merger chooses record with lower ordering value
Review Comment:
minor. "Custom merger" -> "Custom payload"
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]