nsivabalan commented on a change in pull request #4880:
URL: https://github.com/apache/hudi/pull/4880#discussion_r839260523
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
##########
@@ -153,9 +154,29 @@ protected void processNextRecord(HoodieRecord<? extends
HoodieRecordPayload> hoo
}
@Override
- protected void processNextDeletedKey(HoodieKey hoodieKey) {
- records.put(hoodieKey.getRecordKey(),
SpillableMapUtils.generateEmptyPayload(hoodieKey.getRecordKey(),
- hoodieKey.getPartitionPath(), getPayloadClassFQN()));
+ protected void processNextDeletedRecord(DeleteRecord deleteRecord) {
+ String key = deleteRecord.getRecordKey();
+ if (records.containsKey(key)) {
+ // Merge and store the merged record. The ordering val is taken to
decide whether the same key record
+ // should be deleted or be kept. The old record is kept only if the
DELETE record has smaller ordering val.
+ // For same ordering values, uses the natural order(arrival time
semantics).
+
+ HoodieRecord<? extends HoodieRecordPayload> oldRecord = records.get(key);
+ Comparable curOrderingVal = oldRecord.getData().getCombineValue();
+ Comparable deleteOrderingVal = deleteRecord.getOrderingVal();
+ // Checks the ordering value does not equal to 0
+ // because we use 0 as the default value which means natural order
+ boolean choosePrev = !deleteOrderingVal.equals(0)
+ && curOrderingVal.getClass() == deleteOrderingVal.getClass()
Review comment:
can you help me understand how this would pan out for records written
with 0.10.0 or before.
I would assume DeleteRecord will have 0 as ordering value, where as, an
insert record will have proper ordering value, lets say 10.
so, if we get an insert followed by delete, we will choose delete record as
final value.
and if we get a delete followed by an insert, we will choose insert record
as the final value.
##########
File path:
hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
##########
@@ -1097,6 +1097,123 @@ public void
testAvroLogRecordReaderWithDeleteAndRollback(ExternalSpillableMap.Di
assertEquals(200, readKeys.size(), "Stream collect should return all 200
records after rollback of delete");
}
+ @ParameterizedTest
+ @MethodSource("testArguments")
+ public void
testAvroLogRecordReaderWithDisorderDelete(ExternalSpillableMap.DiskMapType
diskMapType,
+ boolean
isCompressionEnabled,
+ boolean
readBlocksLazily)
+ throws IOException, URISyntaxException, InterruptedException {
+ Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
+ // Set a small threshold so that every block is a new version
+ Writer writer =
+
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
+
+ // Write 1
+ List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0,
100);
+ List<IndexedRecord> copyOfRecords1 = records1.stream()
+ .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema)).collect(Collectors.toList());
+ Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
+ header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
+ header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
+ HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE,
records1, header);
+ writer.appendBlock(dataBlock);
+
+ // Write 2
+ header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
+ List<IndexedRecord> records2 = SchemaTestUtil.generateHoodieTestRecords(0,
100);
+ List<IndexedRecord> copyOfRecords2 = records2.stream()
+ .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema)).collect(Collectors.toList());
+ dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records2, header);
+ writer.appendBlock(dataBlock);
+
+ copyOfRecords1.addAll(copyOfRecords2);
+ List<String> originalKeys =
+ copyOfRecords1.stream().map(s -> ((GenericRecord)
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString())
+ .collect(Collectors.toList());
+
+ // Delete 10 keys
+ // Default orderingVal is 0, which means natural order, the DELETE records
+ // should overwrite the data records.
+ List<DeleteRecord> deleteRecords1 = copyOfRecords1.subList(0, 10).stream()
+ .map(s -> (DeleteRecord.create(((GenericRecord)
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
+ ((GenericRecord)
s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString())))
+ .collect(Collectors.toList());
+
+ header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
+ HoodieDeleteBlock deleteBlock1 = new
HoodieDeleteBlock(deleteRecords1.toArray(new DeleteRecord[0]), header);
+ writer.appendBlock(deleteBlock1);
+
+ // Delete another 10 keys with -1 as orderingVal.
+ // The deletion should not work
+
+ header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "103");
+ HoodieDeleteBlock deleteBlock2 = new
HoodieDeleteBlock(copyOfRecords1.subList(10, 20).stream()
+ .map(s -> (DeleteRecord.create(((GenericRecord)
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
+ ((GenericRecord)
s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(),
-1))).toArray(DeleteRecord[]::new), header);
+ writer.appendBlock(deleteBlock2);
+
+ // Delete another 10 keys with +1 as orderingVal.
+ // The deletion should work because the keys has greater ordering value.
+ List<DeleteRecord> deletedRecords3 = copyOfRecords1.subList(20,
30).stream()
+ .map(s -> (DeleteRecord.create(((GenericRecord)
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
+ ((GenericRecord)
s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(), 1)))
+ .collect(Collectors.toList());
+
+ header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "104");
+ HoodieDeleteBlock deleteBlock3 = new
HoodieDeleteBlock(deletedRecords3.toArray(new DeleteRecord[0]), header);
+ writer.appendBlock(deleteBlock3);
+
Review comment:
can we add another data block following this deleted block, with higher
ordering value compared to those records in deleteBlock3. may be 5 of them. And
assert that, new inserts are honored.
Also, add few more inserts w/ lower ordering value compared to those in
deleteblock3 and ensure deletes take precedence.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]