vinothchandar commented on a change in pull request #4880:
URL: https://github.com/apache/hudi/pull/4880#discussion_r838962101
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroPayload.java
##########
@@ -71,4 +70,9 @@ public HoodieAvroPayload preCombine(HoodieAvroPayload
oldValue) {
public byte[] getRecordBytes() {
return recordBytes;
}
+
+ @Override
+ public Comparable<?> getCombineValue() {
Review comment:
I would name this getOrderingValue()
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
##########
@@ -153,9 +154,29 @@ protected void processNextRecord(HoodieRecord<? extends
HoodieRecordPayload> hoo
}
@Override
- protected void processNextDeletedKey(HoodieKey hoodieKey) {
- records.put(hoodieKey.getRecordKey(),
SpillableMapUtils.generateEmptyPayload(hoodieKey.getRecordKey(),
- hoodieKey.getPartitionPath(), getPayloadClassFQN()));
+ protected void processNextDeletedKey(DeleteKey deleteKey) {
+ String key = deleteKey.getRecordKey();
+ if (records.containsKey(key)) {
Review comment:
+1 . Don't want to harp on this though.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
##########
@@ -153,9 +154,29 @@ protected void processNextRecord(HoodieRecord<? extends
HoodieRecordPayload> hoo
}
@Override
- protected void processNextDeletedKey(HoodieKey hoodieKey) {
- records.put(hoodieKey.getRecordKey(),
SpillableMapUtils.generateEmptyPayload(hoodieKey.getRecordKey(),
- hoodieKey.getPartitionPath(), getPayloadClassFQN()));
+ protected void processNextDeletedKey(DeleteKey deleteKey) {
+ String key = deleteKey.getRecordKey();
+ if (records.containsKey(key)) {
+ // Merge and store the merged record. The ordering val is taken to
decide whether the same key record
+ // should be deleted or be kept. The old record is kept only if the
DELETE record has smaller ordering val.
+ // For same ordering values, uses the natural order.
+
+ HoodieRecord<? extends HoodieRecordPayload> oldRecord = records.get(key);
Review comment:
i think we check for existence now, above?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
##########
@@ -153,9 +154,29 @@ protected void processNextRecord(HoodieRecord<? extends
HoodieRecordPayload> hoo
}
@Override
- protected void processNextDeletedKey(HoodieKey hoodieKey) {
- records.put(hoodieKey.getRecordKey(),
SpillableMapUtils.generateEmptyPayload(hoodieKey.getRecordKey(),
- hoodieKey.getPartitionPath(), getPayloadClassFQN()));
+ protected void processNextDeletedRecord(DeleteRecord deleteRecord) {
+ String key = deleteRecord.getRecordKey();
+ if (records.containsKey(key)) {
+ // Merge and store the merged record. The ordering val is taken to
decide whether the same key record
+ // should be deleted or be kept. The old record is kept only if the
DELETE record has smaller ordering val.
+ // For same ordering values, uses the natural order(arrival time
semantics).
+
+ HoodieRecord<? extends HoodieRecordPayload> oldRecord = records.get(key);
+ Comparable curOrderingVal = oldRecord.getData().getCombineValue();
+ Comparable deleteOrderingVal = deleteRecord.getOrderingVal();
+ // Checks the ordering value does not equal to 0
+ // because we use 0 as the default value which means natural order
+ boolean choosePrev = !deleteOrderingVal.equals(0)
+ && curOrderingVal.getClass() == deleteOrderingVal.getClass()
Review comment:
+1. we can add a utiltiy to ReflectUtils or ClassUtils for the comparison
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
##########
@@ -153,9 +154,29 @@ protected void processNextRecord(HoodieRecord<? extends
HoodieRecordPayload> hoo
}
@Override
- protected void processNextDeletedKey(HoodieKey hoodieKey) {
- records.put(hoodieKey.getRecordKey(),
SpillableMapUtils.generateEmptyPayload(hoodieKey.getRecordKey(),
- hoodieKey.getPartitionPath(), getPayloadClassFQN()));
+ protected void processNextDeletedRecord(DeleteRecord deleteRecord) {
+ String key = deleteRecord.getRecordKey();
+ if (records.containsKey(key)) {
+ // Merge and store the merged record. The ordering val is taken to
decide whether the same key record
+ // should be deleted or be kept. The old record is kept only if the
DELETE record has smaller ordering val.
+ // For same ordering values, uses the natural order(arrival time
semantics).
+
+ HoodieRecord<? extends HoodieRecordPayload> oldRecord = records.get(key);
+ Comparable curOrderingVal = oldRecord.getData().getCombineValue();
+ Comparable deleteOrderingVal = deleteRecord.getOrderingVal();
+ // Checks the ordering value does not equal to 0
+ // because we use 0 as the default value which means natural order
+ boolean choosePrev = !deleteOrderingVal.equals(0)
+ && curOrderingVal.getClass() == deleteOrderingVal.getClass()
+ && curOrderingVal.compareTo(deleteOrderingVal) > 0;
Review comment:
for e.g Debezium, we use lsn. that will be unique for three operations
right? @rmahindra123 ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]