alexeykudinkin commented on a change in pull request #4880:
URL: https://github.com/apache/hudi/pull/4880#discussion_r816301848



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/DeleteKey.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import java.util.Objects;
+
+/**
+ * Delete key is a combination of HoodieKey and ordering value.
+ * The key is used for {@link 
org.apache.hudi.common.table.log.block.HoodieDeleteBlock}
+ * to support per-record deletions. The deletion block is always appended 
after the data block,
+ * we need to keep the ordering val to combine with the data records when 
merging, or the data may
+ * be dropped if there are intermediate deletions for the inputs
+ * (a new INSERT comes after a DELETE in one input batch).
+ */
+public class DeleteKey extends HoodieKey {
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * For purposes of preCombining.
+   */
+  private Comparable orderingVal;
+
+  private DeleteKey(String recordKey, String partitionPath, Comparable 
orderingVal) {
+    super(recordKey, partitionPath);
+    this.orderingVal = orderingVal;
+  }
+
+  public static DeleteKey create(HoodieKey hoodieKey, Comparable orderingVal) {
+    return create(hoodieKey.getRecordKey(), hoodieKey.getPartitionPath(), 
orderingVal);
+  }
+
+  public static DeleteKey create(String recordKey, String partitionPath) {
+    return create(recordKey, partitionPath, 0);
+  }
+
+  public static DeleteKey create(String recordKey, String partitionPath, 
Comparable orderingVal) {
+    return new DeleteKey(recordKey, partitionPath, orderingVal);
+  }
+
+  public Comparable getOrderingVal() {
+    return orderingVal;
+  }
+
+  public void setOrderingVal(Comparable orderingVal) {
+    this.orderingVal = orderingVal;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!super.equals(o)) {
+      return false;
+    }
+    DeleteKey otherKey = (DeleteKey) o;
+    return Objects.equals(this.orderingVal, otherKey.orderingVal);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(getRecordKey(), getPartitionPath(), this.orderingVal);
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder("DeleteKey {");

Review comment:
       Why not combining all of these into a single expression?

##########
File path: 
hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
##########
@@ -1091,6 +1092,123 @@ public void 
testAvroLogRecordReaderWithDeleteAndRollback(ExternalSpillableMap.Di
     assertEquals(200, readKeys.size(), "Stream collect should return all 200 
records after rollback of delete");
   }
 
+  @ParameterizedTest
+  @MethodSource("testArguments")
+  public void 
testAvroLogRecordReaderWithDisorderDelete(ExternalSpillableMap.DiskMapType 
diskMapType,
+                                                        boolean 
isCompressionEnabled,
+                                                        boolean 
readBlocksLazily)
+      throws IOException, URISyntaxException, InterruptedException {
+    Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
+    // Set a small threshold so that every block is a new version
+    Writer writer =
+        
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+            
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
+
+    // Write 1
+    List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 
100);
+    List<IndexedRecord> copyOfRecords1 = records1.stream()
+        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema)).collect(Collectors.toList());
+    Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
+    header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
+    HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, 
records1, header);
+    writer.appendBlock(dataBlock);
+
+    // Write 2
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");

Review comment:
       Please avoid modifying existing map it makes it hard to track its state 
throughout the test, and also can bury unexpected side-effects given that the 
block-writing is async. Instead you can create a new map using 
`CollectionUtils.createImmutableMap` and combine with shared props using 
`combine`

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/DeleteKey.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import java.util.Objects;
+
+/**
+ * Delete key is a combination of HoodieKey and ordering value.
+ * The key is used for {@link 
org.apache.hudi.common.table.log.block.HoodieDeleteBlock}
+ * to support per-record deletions. The deletion block is always appended 
after the data block,
+ * we need to keep the ordering val to combine with the data records when 
merging, or the data may
+ * be dropped if there are intermediate deletions for the inputs
+ * (a new INSERT comes after a DELETE in one input batch).
+ */
+public class DeleteKey extends HoodieKey {
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * For purposes of preCombining.
+   */
+  private Comparable orderingVal;
+
+  private DeleteKey(String recordKey, String partitionPath, Comparable 
orderingVal) {
+    super(recordKey, partitionPath);
+    this.orderingVal = orderingVal;
+  }
+
+  public static DeleteKey create(HoodieKey hoodieKey, Comparable orderingVal) {
+    return create(hoodieKey.getRecordKey(), hoodieKey.getPartitionPath(), 
orderingVal);
+  }
+
+  public static DeleteKey create(String recordKey, String partitionPath) {
+    return create(recordKey, partitionPath, 0);

Review comment:
       Are we using 0 as unset value here? If so, i'd suggest to go with -1 
instead, as 0 seems to be totally legitimate value

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
##########
@@ -153,9 +154,29 @@ protected void processNextRecord(HoodieRecord<? extends 
HoodieRecordPayload> hoo
   }
 
   @Override
-  protected void processNextDeletedKey(HoodieKey hoodieKey) {
-    records.put(hoodieKey.getRecordKey(), 
SpillableMapUtils.generateEmptyPayload(hoodieKey.getRecordKey(),
-        hoodieKey.getPartitionPath(), getPayloadClassFQN()));
+  protected void processNextDeletedKey(DeleteKey deleteKey) {
+    String key = deleteKey.getRecordKey();
+    if (records.containsKey(key)) {
+      // Merge and store the merged record. The ordering val is taken to 
decide whether the same key record
+      // should be deleted or be kept. The old record is kept only if the 
DELETE record has smaller ordering val.
+      // For same ordering values, uses the natural order.
+
+      HoodieRecord<? extends HoodieRecordPayload> oldRecord = records.get(key);
+      Comparable oldOrderingVal = oldRecord.getData().getOrderingVal();
+      Comparable newOrderingVal = deleteKey.getOrderingVal();
+      // Checks the ordering value does not equal to 0
+      // because we use 0 as the default value which means natural order
+      boolean choosePrev = !newOrderingVal.equals(0)
+          && oldOrderingVal.getClass() == newOrderingVal.getClass()

Review comment:
       This should be an assertion rather than part of the conditional

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
##########
@@ -153,9 +154,29 @@ protected void processNextRecord(HoodieRecord<? extends 
HoodieRecordPayload> hoo
   }
 
   @Override
-  protected void processNextDeletedKey(HoodieKey hoodieKey) {
-    records.put(hoodieKey.getRecordKey(), 
SpillableMapUtils.generateEmptyPayload(hoodieKey.getRecordKey(),
-        hoodieKey.getPartitionPath(), getPayloadClassFQN()));
+  protected void processNextDeletedKey(DeleteKey deleteKey) {
+    String key = deleteKey.getRecordKey();
+    if (records.containsKey(key)) {

Review comment:
       Why making 2 hash-table lookups, instead of 1?

##########
File path: 
hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
##########
@@ -1091,6 +1092,123 @@ public void 
testAvroLogRecordReaderWithDeleteAndRollback(ExternalSpillableMap.Di
     assertEquals(200, readKeys.size(), "Stream collect should return all 200 
records after rollback of delete");
   }
 
+  @ParameterizedTest
+  @MethodSource("testArguments")
+  public void 
testAvroLogRecordReaderWithDisorderDelete(ExternalSpillableMap.DiskMapType 
diskMapType,
+                                                        boolean 
isCompressionEnabled,
+                                                        boolean 
readBlocksLazily)
+      throws IOException, URISyntaxException, InterruptedException {
+    Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
+    // Set a small threshold so that every block is a new version
+    Writer writer =
+        
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+            
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
+
+    // Write 1
+    List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 
100);
+    List<IndexedRecord> copyOfRecords1 = records1.stream()

Review comment:
       Why copying the records?

##########
File path: 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
##########
@@ -516,17 +516,14 @@ class TestMORDataSource extends HoodieClientTestBase {
     checkAnswer((1, "a0", 12, 101, false))
 
     writeData((1, "a0", 16, 97, true))
-    // Ordering value will not be honored for a delete record as the payload 
is sent as empty payload
-    checkAnswer((1, "a0", 16, 97, true))
+    // Ordering value will be honored, the delete record is considered as 
obsolete
+    // because it has smaller version number (97 < 101)
+    checkAnswer((1, "a0", 12, 101, false))
 
     writeData((1, "a0", 18, 96, false))
-    // Ideally, once a record is deleted, preCombine does not kick. So, any 
new record will be considered valid ignoring
-    // ordering val. But what happens ini hudi is, all records in log files 
are reconciled and then merged with base
-    // file. After reconciling all records from log files, it results in (1, 
"a0", 18, 96, false) and ths is merged with
-    // (1, "a0", 10, 100, false) in base file and hence we see (1, "a0", 10, 
100, false) as it has higher preComine value.
-    // the result might differ depending on whether compaction was triggered 
or not(after record is deleted). In this
-    // test, no compaction is triggered and hence we see the record from base 
file.
-    checkAnswer((1, "a0", 10, 100, false))
+    // Ordering value will be honored, the delete record is considered as 
obsolete

Review comment:
       This comment is misleading -- previous record isn't a deletion

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/DeleteKey.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import java.util.Objects;
+
+/**
+ * Delete key is a combination of HoodieKey and ordering value.
+ * The key is used for {@link 
org.apache.hudi.common.table.log.block.HoodieDeleteBlock}
+ * to support per-record deletions. The deletion block is always appended 
after the data block,
+ * we need to keep the ordering val to combine with the data records when 
merging, or the data may
+ * be dropped if there are intermediate deletions for the inputs
+ * (a new INSERT comes after a DELETE in one input batch).
+ */
+public class DeleteKey extends HoodieKey {

Review comment:
       Conceptually, `orderingVal` is not part of the key, it is rather part of 
the payload. I think it would be better to actually approach it that way:
   
   1. Introduce `DeletionRecordPayload` (carrying only value of `orderingVal`)
   2. Introduce new `DeleteLogBlock` format version appropriately 
persisting/reading new payload type
   
   This has following benefits: 
    - Takes care of backward-compatibility (BWC) out of the box
    - Make it compatible with Metadata Table also out of the box

##########
File path: 
hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
##########
@@ -1091,6 +1092,123 @@ public void 
testAvroLogRecordReaderWithDeleteAndRollback(ExternalSpillableMap.Di
     assertEquals(200, readKeys.size(), "Stream collect should return all 200 
records after rollback of delete");
   }
 
+  @ParameterizedTest
+  @MethodSource("testArguments")
+  public void 
testAvroLogRecordReaderWithDisorderDelete(ExternalSpillableMap.DiskMapType 
diskMapType,
+                                                        boolean 
isCompressionEnabled,
+                                                        boolean 
readBlocksLazily)
+      throws IOException, URISyntaxException, InterruptedException {
+    Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
+    // Set a small threshold so that every block is a new version
+    Writer writer =
+        
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+            
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
+
+    // Write 1
+    List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 
100);
+    List<IndexedRecord> copyOfRecords1 = records1.stream()
+        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema)).collect(Collectors.toList());
+    Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
+    header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
+    HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, 
records1, header);
+    writer.appendBlock(dataBlock);
+
+    // Write 2
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
+    List<IndexedRecord> records2 = SchemaTestUtil.generateHoodieTestRecords(0, 
100);
+    List<IndexedRecord> copyOfRecords2 = records2.stream()
+        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema)).collect(Collectors.toList());
+    dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records2, header);
+    writer.appendBlock(dataBlock);
+
+    copyOfRecords1.addAll(copyOfRecords2);
+    List<String> originalKeys =
+        copyOfRecords1.stream().map(s -> ((GenericRecord) 
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString())
+            .collect(Collectors.toList());
+
+    // Delete 10 keys
+    // Default orderingVal is 0, which means natural order, the DELETE records
+    // should overwrite the data records.
+    List<DeleteKey> deletedKeys1 = copyOfRecords1.subList(0, 10).stream()
+        .map(s -> (DeleteKey.create(((GenericRecord) 
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
+            ((GenericRecord) 
s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString())))
+        .collect(Collectors.toList());
+
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
+    HoodieDeleteBlock deleteBlock1 = new 
HoodieDeleteBlock(deletedKeys1.toArray(new DeleteKey[0]), header);
+    writer.appendBlock(deleteBlock1);
+
+    // Delete another 10 keys with -1 as orderingVal.
+    // The deletion should not work
+
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "103");
+    HoodieDeleteBlock deleteBlock2 = new 
HoodieDeleteBlock(copyOfRecords1.subList(10, 20).stream()
+        .map(s -> (DeleteKey.create(((GenericRecord) 
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
+            ((GenericRecord) 
s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(), 
-1))).toArray(DeleteKey[]::new), header);
+    writer.appendBlock(deleteBlock2);
+
+    // Delete another 10 keys with +1 as orderingVal.
+    // The deletion should work because the keys has greater ordering value.
+    List<DeleteKey> deletedKeys3 = copyOfRecords1.subList(20, 30).stream()
+        .map(s -> (DeleteKey.create(((GenericRecord) 
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
+            ((GenericRecord) 
s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(), 1)))
+        .collect(Collectors.toList());
+
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "104");
+    HoodieDeleteBlock deleteBlock3 = new 
HoodieDeleteBlock(deletedKeys3.toArray(new DeleteKey[0]), header);
+    writer.appendBlock(deleteBlock3);
+
+    List<String> allLogFiles =
+        FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", 
HoodieLogFile.DELTA_EXTENSION, "100")
+            .map(s -> s.getPath().toString()).collect(Collectors.toList());
+
+    FileCreateUtils.createDeltaCommit(basePath, "100", fs);
+    FileCreateUtils.createDeltaCommit(basePath, "101", fs);
+    FileCreateUtils.createDeltaCommit(basePath, "102", fs);
+    FileCreateUtils.createDeltaCommit(basePath, "103", fs);
+    FileCreateUtils.createDeltaCommit(basePath, "104", fs);
+
+    HoodieMergedLogRecordScanner scanner = 
HoodieMergedLogRecordScanner.newBuilder()
+        .withFileSystem(fs)
+        .withBasePath(basePath)
+        .withLogFilePaths(allLogFiles)
+        .withReaderSchema(schema)
+        .withLatestInstantTime("104")
+        .withMaxMemorySizeInBytes(10240L)
+        .withReadBlocksLazily(readBlocksLazily)
+        .withReverseReader(false)
+        .withBufferSize(bufferSize)
+        .withSpillableMapBasePath(BASE_OUTPUT_PATH)
+        .withDiskMapType(diskMapType)
+        .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
+        .build();
+
+    assertEquals(200, scanner.getTotalLogRecords(), "We still would read 200 
records");
+    final List<String> readKeys = new ArrayList<>(200);
+    final List<String> emptyPayloadKeys = new ArrayList<>();
+    scanner.forEach(s -> readKeys.add(s.getRecordKey()));
+    scanner.forEach(s -> {

Review comment:
       Can we combine both of these iterations?

##########
File path: 
hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
##########
@@ -1091,6 +1092,123 @@ public void 
testAvroLogRecordReaderWithDeleteAndRollback(ExternalSpillableMap.Di
     assertEquals(200, readKeys.size(), "Stream collect should return all 200 
records after rollback of delete");
   }
 
+  @ParameterizedTest
+  @MethodSource("testArguments")
+  public void 
testAvroLogRecordReaderWithDisorderDelete(ExternalSpillableMap.DiskMapType 
diskMapType,
+                                                        boolean 
isCompressionEnabled,
+                                                        boolean 
readBlocksLazily)
+      throws IOException, URISyntaxException, InterruptedException {
+    Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
+    // Set a small threshold so that every block is a new version
+    Writer writer =
+        
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+            
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
+
+    // Write 1
+    List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 
100);
+    List<IndexedRecord> copyOfRecords1 = records1.stream()
+        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema)).collect(Collectors.toList());
+    Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
+    header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
+    HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, 
records1, header);
+    writer.appendBlock(dataBlock);
+
+    // Write 2
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");

Review comment:
       Sorry, my bad, writing isn't async actually, only reading

##########
File path: 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
##########
@@ -516,17 +516,14 @@ class TestMORDataSource extends HoodieClientTestBase {
     checkAnswer((1, "a0", 12, 101, false))
 
     writeData((1, "a0", 16, 97, true))
-    // Ordering value will not be honored for a delete record as the payload 
is sent as empty payload
-    checkAnswer((1, "a0", 16, 97, true))
+    // Ordering value will be honored, the delete record is considered as 
obsolete
+    // because it has smaller version number (97 < 101)
+    checkAnswer((1, "a0", 12, 101, false))
 
     writeData((1, "a0", 18, 96, false))
-    // Ideally, once a record is deleted, preCombine does not kick. So, any 
new record will be considered valid ignoring
-    // ordering val. But what happens ini hudi is, all records in log files 
are reconciled and then merged with base

Review comment:
       What this test describes seems plainly an issue to me. 
   Let's 
    - Create a ticket to address it 
    - Keep the test flow (either w/in this test or separately) to make sure we 
have a reference point when we get to fix it

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
##########
@@ -114,4 +114,16 @@ default T preCombine(T oldValue, Properties properties) {
   default Option<Map<String, String>> getMetadata() {
     return Option.empty();
   }
+
+  /**
+   * This method can be used to extract the ordering value of the payload for 
combining/merging,
+   * or 0 if no value is specified which means natural order.

Review comment:
       What does "natural order" mean in the context of the record payloads?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
##########
@@ -153,9 +154,29 @@ protected void processNextRecord(HoodieRecord<? extends 
HoodieRecordPayload> hoo
   }
 
   @Override
-  protected void processNextDeletedKey(HoodieKey hoodieKey) {
-    records.put(hoodieKey.getRecordKey(), 
SpillableMapUtils.generateEmptyPayload(hoodieKey.getRecordKey(),
-        hoodieKey.getPartitionPath(), getPayloadClassFQN()));
+  protected void processNextDeletedKey(DeleteKey deleteKey) {
+    String key = deleteKey.getRecordKey();
+    if (records.containsKey(key)) {
+      // Merge and store the merged record. The ordering val is taken to 
decide whether the same key record
+      // should be deleted or be kept. The old record is kept only if the 
DELETE record has smaller ordering val.
+      // For same ordering values, uses the natural order.
+
+      HoodieRecord<? extends HoodieRecordPayload> oldRecord = records.get(key);
+      Comparable oldOrderingVal = oldRecord.getData().getOrderingVal();
+      Comparable newOrderingVal = deleteKey.getOrderingVal();

Review comment:
       Nit: `deletionOrderingVal` resolves the confusion

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
##########
@@ -86,7 +86,7 @@ public HoodieDeleteBlock(Option<byte[]> content, 
FSDataInputStream inputStream,
         int dataLength = dis.readInt();
         byte[] data = new byte[dataLength];
         dis.readFully(data);
-        this.keysToDelete = SerializationUtils.<HoodieKey[]>deserialize(data);
+        this.keysToDelete = SerializationUtils.<DeleteKey[]>deserialize(data);

Review comment:
       This is going to be non-BWC. 
   
   To avoid surprises i think we should do either of
    - Support both log-formats (adding new format version for deletion blocks)
    - Make sure we can enforce proper upgrading sequence (first compact, then 
upgrade)
    - Make it BWC




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to