alexeykudinkin commented on a change in pull request #4880:
URL: https://github.com/apache/hudi/pull/4880#discussion_r817279089



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
##########
@@ -114,4 +114,16 @@ default T preCombine(T oldValue, Properties properties) {
   default Option<Map<String, String>> getMetadata() {
     return Option.empty();
   }
+
+  /**
+   * This method can be used to extract the ordering value of the payload for 
combining/merging,
+   * or 0 if no value is specified which means natural order.

Review comment:
       But what does "later" mean in that sense? Isn't `orderingVal` supposed 
to be ordering those?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/DeleteKey.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import java.util.Objects;
+
+/**
+ * Delete key is a combination of HoodieKey and ordering value.
+ * The key is used for {@link 
org.apache.hudi.common.table.log.block.HoodieDeleteBlock}
+ * to support per-record deletions. The deletion block is always appended 
after the data block,
+ * we need to keep the ordering val to combine with the data records when 
merging, or the data may
+ * be dropped if there are intermediate deletions for the inputs
+ * (a new INSERT comes after a DELETE in one input batch).
+ */
+public class DeleteKey extends HoodieKey {
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * For purposes of preCombining.
+   */
+  private Comparable orderingVal;
+
+  private DeleteKey(String recordKey, String partitionPath, Comparable 
orderingVal) {
+    super(recordKey, partitionPath);
+    this.orderingVal = orderingVal;
+  }
+
+  public static DeleteKey create(HoodieKey hoodieKey, Comparable orderingVal) {
+    return create(hoodieKey.getRecordKey(), hoodieKey.getPartitionPath(), 
orderingVal);
+  }
+
+  public static DeleteKey create(String recordKey, String partitionPath) {
+    return create(recordKey, partitionPath, 0);

Review comment:
       This is not a version though, this is an `orderingVal` and this is very 
confusing -- does 0 mean that this should have highest priority?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
##########
@@ -153,9 +154,29 @@ protected void processNextRecord(HoodieRecord<? extends 
HoodieRecordPayload> hoo
   }
 
   @Override
-  protected void processNextDeletedKey(HoodieKey hoodieKey) {
-    records.put(hoodieKey.getRecordKey(), 
SpillableMapUtils.generateEmptyPayload(hoodieKey.getRecordKey(),
-        hoodieKey.getPartitionPath(), getPayloadClassFQN()));
+  protected void processNextDeletedKey(DeleteKey deleteKey) {
+    String key = deleteKey.getRecordKey();
+    if (records.containsKey(key)) {
+      // Merge and store the merged record. The ordering val is taken to 
decide whether the same key record
+      // should be deleted or be kept. The old record is kept only if the 
DELETE record has smaller ordering val.
+      // For same ordering values, uses the natural order.
+
+      HoodieRecord<? extends HoodieRecordPayload> oldRecord = records.get(key);
+      Comparable oldOrderingVal = oldRecord.getData().getOrderingVal();
+      Comparable newOrderingVal = deleteKey.getOrderingVal();
+      // Checks the ordering value does not equal to 0
+      // because we use 0 as the default value which means natural order
+      boolean choosePrev = !newOrderingVal.equals(0)
+          && oldOrderingVal.getClass() == newOrderingVal.getClass()

Review comment:
       That default value then should be passed as long (otherwise `0 (int) != 
0 (long)` which doesn't make sense)

##########
File path: 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
##########
@@ -516,17 +516,14 @@ class TestMORDataSource extends HoodieClientTestBase {
     checkAnswer((1, "a0", 12, 101, false))
 
     writeData((1, "a0", 16, 97, true))
-    // Ordering value will not be honored for a delete record as the payload 
is sent as empty payload
-    checkAnswer((1, "a0", 16, 97, true))
+    // Ordering value will be honored, the delete record is considered as 
obsolete
+    // because it has smaller version number (97 < 101)
+    checkAnswer((1, "a0", 12, 101, false))
 
     writeData((1, "a0", 18, 96, false))
-    // Ideally, once a record is deleted, preCombine does not kick. So, any 
new record will be considered valid ignoring
-    // ordering val. But what happens ini hudi is, all records in log files 
are reconciled and then merged with base

Review comment:
       It's not about your fix -- the previous test was describing following 
issue: if in the _logs_ we have U1 (update), then D2 (deletion), then U3, 
currently U1 and D1 will cancel each other out and we will only carry U3. This 
is incorrect, instead we should be carrying both D1 and U3, b/c D1 will have to 
be applied against base as well, before applying U3

##########
File path: 
hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
##########
@@ -1091,6 +1092,123 @@ public void 
testAvroLogRecordReaderWithDeleteAndRollback(ExternalSpillableMap.Di
     assertEquals(200, readKeys.size(), "Stream collect should return all 200 
records after rollback of delete");
   }
 
+  @ParameterizedTest
+  @MethodSource("testArguments")
+  public void 
testAvroLogRecordReaderWithDisorderDelete(ExternalSpillableMap.DiskMapType 
diskMapType,
+                                                        boolean 
isCompressionEnabled,
+                                                        boolean 
readBlocksLazily)
+      throws IOException, URISyntaxException, InterruptedException {
+    Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
+    // Set a small threshold so that every block is a new version
+    Writer writer =
+        
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+            
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
+
+    // Write 1
+    List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 
100);
+    List<IndexedRecord> copyOfRecords1 = records1.stream()
+        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema)).collect(Collectors.toList());
+    Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
+    header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
+    HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, 
records1, header);
+    writer.appendBlock(dataBlock);
+
+    // Write 2
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
+    List<IndexedRecord> records2 = SchemaTestUtil.generateHoodieTestRecords(0, 
100);
+    List<IndexedRecord> copyOfRecords2 = records2.stream()
+        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema)).collect(Collectors.toList());
+    dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records2, header);
+    writer.appendBlock(dataBlock);
+
+    copyOfRecords1.addAll(copyOfRecords2);
+    List<String> originalKeys =
+        copyOfRecords1.stream().map(s -> ((GenericRecord) 
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString())
+            .collect(Collectors.toList());
+
+    // Delete 10 keys
+    // Default orderingVal is 0, which means natural order, the DELETE records
+    // should overwrite the data records.
+    List<DeleteKey> deletedKeys1 = copyOfRecords1.subList(0, 10).stream()
+        .map(s -> (DeleteKey.create(((GenericRecord) 
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
+            ((GenericRecord) 
s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString())))
+        .collect(Collectors.toList());
+
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
+    HoodieDeleteBlock deleteBlock1 = new 
HoodieDeleteBlock(deletedKeys1.toArray(new DeleteKey[0]), header);
+    writer.appendBlock(deleteBlock1);
+
+    // Delete another 10 keys with -1 as orderingVal.
+    // The deletion should not work
+
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "103");
+    HoodieDeleteBlock deleteBlock2 = new 
HoodieDeleteBlock(copyOfRecords1.subList(10, 20).stream()
+        .map(s -> (DeleteKey.create(((GenericRecord) 
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
+            ((GenericRecord) 
s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(), 
-1))).toArray(DeleteKey[]::new), header);
+    writer.appendBlock(deleteBlock2);
+
+    // Delete another 10 keys with +1 as orderingVal.
+    // The deletion should work because the keys has greater ordering value.
+    List<DeleteKey> deletedKeys3 = copyOfRecords1.subList(20, 30).stream()
+        .map(s -> (DeleteKey.create(((GenericRecord) 
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
+            ((GenericRecord) 
s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(), 1)))
+        .collect(Collectors.toList());
+
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "104");
+    HoodieDeleteBlock deleteBlock3 = new 
HoodieDeleteBlock(deletedKeys3.toArray(new DeleteKey[0]), header);
+    writer.appendBlock(deleteBlock3);
+
+    List<String> allLogFiles =
+        FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", 
HoodieLogFile.DELTA_EXTENSION, "100")
+            .map(s -> s.getPath().toString()).collect(Collectors.toList());
+
+    FileCreateUtils.createDeltaCommit(basePath, "100", fs);
+    FileCreateUtils.createDeltaCommit(basePath, "101", fs);
+    FileCreateUtils.createDeltaCommit(basePath, "102", fs);
+    FileCreateUtils.createDeltaCommit(basePath, "103", fs);
+    FileCreateUtils.createDeltaCommit(basePath, "104", fs);
+
+    HoodieMergedLogRecordScanner scanner = 
HoodieMergedLogRecordScanner.newBuilder()
+        .withFileSystem(fs)
+        .withBasePath(basePath)
+        .withLogFilePaths(allLogFiles)
+        .withReaderSchema(schema)
+        .withLatestInstantTime("104")
+        .withMaxMemorySizeInBytes(10240L)
+        .withReadBlocksLazily(readBlocksLazily)
+        .withReverseReader(false)
+        .withBufferSize(bufferSize)
+        .withSpillableMapBasePath(BASE_OUTPUT_PATH)
+        .withDiskMapType(diskMapType)
+        .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
+        .build();
+
+    assertEquals(200, scanner.getTotalLogRecords(), "We still would read 200 
records");
+    final List<String> readKeys = new ArrayList<>(200);
+    final List<String> emptyPayloadKeys = new ArrayList<>();
+    scanner.forEach(s -> readKeys.add(s.getRecordKey()));
+    scanner.forEach(s -> {

Review comment:
       Can you please elaborate how's it more clear? 
   
   > Combining is not the gold rule in all of the places.
   
   It's not about gold rules, it's about whether it makes sense or not. 
Iterating twice over the same collection doesn't really make sense when we can 
do it in one iteration, right? Why burn CI/workstation compute for no reason? 
It might be fine in the context of one test but keep in mind that we have 
thousands of tests that we need to run.

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/DeleteKey.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import java.util.Objects;
+
+/**
+ * Delete key is a combination of HoodieKey and ordering value.
+ * The key is used for {@link 
org.apache.hudi.common.table.log.block.HoodieDeleteBlock}
+ * to support per-record deletions. The deletion block is always appended 
after the data block,
+ * we need to keep the ordering val to combine with the data records when 
merging, or the data may
+ * be dropped if there are intermediate deletions for the inputs
+ * (a new INSERT comes after a DELETE in one input batch).
+ */
+public class DeleteKey extends HoodieKey {

Review comment:
       > For BWC, i would expect for other contributors can address this in 
following PR.
   
   I don't think we can treat BWC as optional here. What do you think 
@vinothchandar? 

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/DeleteKey.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import java.util.Objects;
+
+/**
+ * Delete key is a combination of HoodieKey and ordering value.
+ * The key is used for {@link 
org.apache.hudi.common.table.log.block.HoodieDeleteBlock}
+ * to support per-record deletions. The deletion block is always appended 
after the data block,
+ * we need to keep the ordering val to combine with the data records when 
merging, or the data may
+ * be dropped if there are intermediate deletions for the inputs
+ * (a new INSERT comes after a DELETE in one input batch).
+ */
+public class DeleteKey extends HoodieKey {

Review comment:
       > The deletion payload need to execute combineAndGetUpdateValue with the 
records in base file so it should be the same payload class with the data 
record.
   
   This is an interesting point you're bringing up. First of all, i think we 
have to implement deletions in a uniform way across both COW/MOR, i see no 
reason for us to duplicate the logic since use-cases are comparable (we simply 
need to carry a tombstone w/in the payload either in the log or in the batch) 
   
   Second, we need to keep in mind that the `RecordPayload` is just the facade 
carrying the semantic of how we a) resolve "duplicates" w/in written batch 
(`preCombine`) or b) merge records (`combineAndGetUpdateValue`). Therefore i 
don't see an issue with 2 records having different `RecordPayload` classes if 
they require different merge semantics (tombstone have to clump preceding 
value, w/o any merging)
   
   > There is already an EmptyHoodieRecordPayload used for SQL deletions.
   
   SG, my proposal was more of a concept. We should def re-use existing 
facilities.
   

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
##########
@@ -114,4 +114,16 @@ default T preCombine(T oldValue, Properties properties) {
   default Option<Map<String, String>> getMetadata() {
     return Option.empty();
   }
+
+  /**
+   * This method can be used to extract the ordering value of the payload for 
combining/merging,
+   * or 0 if no value is specified which means natural order.

Review comment:
       Oh i see, it's referring to "event" time vs "arrival" time. Can we 
instead use event/arrival terminology to make things clear?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to