nsivabalan commented on a change in pull request #4880:
URL: https://github.com/apache/hudi/pull/4880#discussion_r839260523



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
##########
@@ -153,9 +154,29 @@ protected void processNextRecord(HoodieRecord<? extends 
HoodieRecordPayload> hoo
   }
 
   @Override
-  protected void processNextDeletedKey(HoodieKey hoodieKey) {
-    records.put(hoodieKey.getRecordKey(), 
SpillableMapUtils.generateEmptyPayload(hoodieKey.getRecordKey(),
-        hoodieKey.getPartitionPath(), getPayloadClassFQN()));
+  protected void processNextDeletedRecord(DeleteRecord deleteRecord) {
+    String key = deleteRecord.getRecordKey();
+    if (records.containsKey(key)) {
+      // Merge and store the merged record. The ordering val is taken to 
decide whether the same key record
+      // should be deleted or be kept. The old record is kept only if the 
DELETE record has smaller ordering val.
+      // For same ordering values, uses the natural order(arrival time 
semantics).
+
+      HoodieRecord<? extends HoodieRecordPayload> oldRecord = records.get(key);
+      Comparable curOrderingVal = oldRecord.getData().getCombineValue();
+      Comparable deleteOrderingVal = deleteRecord.getOrderingVal();
+      // Checks the ordering value does not equal to 0
+      // because we use 0 as the default value which means natural order
+      boolean choosePrev = !deleteOrderingVal.equals(0)
+          && curOrderingVal.getClass() == deleteOrderingVal.getClass()

Review comment:
       can you help me understand how this would pan out for records written 
with 0.10.0 or before. 
   I would assume DeleteRecord will have 0 as ordering value, where as, an 
insert record will have proper ordering value, lets say 10. 
   
   so, if we get an insert followed by delete, we will choose delete record as 
final value. 
   and if we get a delete followed by an insert, we will choose insert record 
as the final value. 
   
   
   
   
   

##########
File path: 
hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
##########
@@ -1097,6 +1097,123 @@ public void 
testAvroLogRecordReaderWithDeleteAndRollback(ExternalSpillableMap.Di
     assertEquals(200, readKeys.size(), "Stream collect should return all 200 
records after rollback of delete");
   }
 
+  @ParameterizedTest
+  @MethodSource("testArguments")
+  public void 
testAvroLogRecordReaderWithDisorderDelete(ExternalSpillableMap.DiskMapType 
diskMapType,
+                                                        boolean 
isCompressionEnabled,
+                                                        boolean 
readBlocksLazily)
+      throws IOException, URISyntaxException, InterruptedException {
+    Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
+    // Set a small threshold so that every block is a new version
+    Writer writer =
+        
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+            
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
+
+    // Write 1
+    List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 
100);
+    List<IndexedRecord> copyOfRecords1 = records1.stream()
+        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema)).collect(Collectors.toList());
+    Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
+    header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
+    HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, 
records1, header);
+    writer.appendBlock(dataBlock);
+
+    // Write 2
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
+    List<IndexedRecord> records2 = SchemaTestUtil.generateHoodieTestRecords(0, 
100);
+    List<IndexedRecord> copyOfRecords2 = records2.stream()
+        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema)).collect(Collectors.toList());
+    dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records2, header);
+    writer.appendBlock(dataBlock);
+
+    copyOfRecords1.addAll(copyOfRecords2);
+    List<String> originalKeys =
+        copyOfRecords1.stream().map(s -> ((GenericRecord) 
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString())
+            .collect(Collectors.toList());
+
+    // Delete 10 keys
+    // Default orderingVal is 0, which means natural order, the DELETE records
+    // should overwrite the data records.
+    List<DeleteRecord> deleteRecords1 = copyOfRecords1.subList(0, 10).stream()
+        .map(s -> (DeleteRecord.create(((GenericRecord) 
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
+            ((GenericRecord) 
s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString())))
+        .collect(Collectors.toList());
+
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
+    HoodieDeleteBlock deleteBlock1 = new 
HoodieDeleteBlock(deleteRecords1.toArray(new DeleteRecord[0]), header);
+    writer.appendBlock(deleteBlock1);
+
+    // Delete another 10 keys with -1 as orderingVal.
+    // The deletion should not work
+
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "103");
+    HoodieDeleteBlock deleteBlock2 = new 
HoodieDeleteBlock(copyOfRecords1.subList(10, 20).stream()
+        .map(s -> (DeleteRecord.create(((GenericRecord) 
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
+            ((GenericRecord) 
s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(), 
-1))).toArray(DeleteRecord[]::new), header);
+    writer.appendBlock(deleteBlock2);
+
+    // Delete another 10 keys with +1 as orderingVal.
+    // The deletion should work because the keys has greater ordering value.
+    List<DeleteRecord> deletedRecords3 = copyOfRecords1.subList(20, 
30).stream()
+        .map(s -> (DeleteRecord.create(((GenericRecord) 
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
+            ((GenericRecord) 
s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(), 1)))
+        .collect(Collectors.toList());
+
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "104");
+    HoodieDeleteBlock deleteBlock3 = new 
HoodieDeleteBlock(deletedRecords3.toArray(new DeleteRecord[0]), header);
+    writer.appendBlock(deleteBlock3);
+

Review comment:
       can we add another data block following this deleted block, with higher 
ordering value compared to those records in deleteBlock3. may be 5 of them. And 
assert that, new inserts are honored. 
   
   Also, add few more inserts w/ lower ordering value compared to those in 
deleteblock3 and ensure deletes take precedence. 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to