This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e331141232c [MINOR] Added docs on gotchas when using 
PartialUpdateAvroPayload (#8579)
e331141232c is described below

commit e331141232ccef12336a6ebd8db4c0355426a015
Author: voonhous <[email protected]>
AuthorDate: Thu May 11 18:15:12 2023 +0800

    [MINOR] Added docs on gotchas when using PartialUpdateAvroPayload (#8579)
---
 .../common/model/PartialUpdateAvroPayload.java     | 27 +++++++
 .../common/model/TestPartialUpdateAvroPayload.java | 91 +++++++++++++++++++++-
 2 files changed, 114 insertions(+), 4 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java
index 1c13a11ee59..27e744c4925 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java
@@ -87,6 +87,33 @@ import java.util.Properties;
  *      id      ts      name    price
  *      1       2       name_1  price_1
  * </pre>
+ *
+ * <p>Gotchas:
+ * <p>In cases where a batch of records is preCombine before 
combineAndGetUpdateValue with the underlying records to be updated located in 
parquet files, the end states of records might not be as how
+ * one will expect when applying a straightforward partial update.
+ *
+ * <p>Gotchas-Example:
+ * <pre>
+ *  -- Insertion order of records:
+ *  INSERT INTO t1 VALUES (1, 'a1', 10, 1000);                          -- (1)
+ *  INSERT INTO t1 VALUES (1, 'a1', 11, 999), (1, 'a1_0', null, 1001);  -- (2)
+ *
+ *  SELECT id, name, price, _ts FROM t1;
+ *  -- One would the results to return:
+ *  -- 1    a1_0    10.0    1001
+
+ *  -- However, the results returned are:
+ *  -- 1    a1_0    11.0    1001
+ *
+ *  -- This occurs as preCombine is applied on (2) first to return:
+ *  -- 1    a1_0    11.0    1001
+ *
+ *  -- And this then combineAndGetUpdateValue with the existing oldValue:
+ *  -- 1    a1_0    10.0    1000
+ *
+ *  -- To return:
+ *  -- 1    a1_0    11.0    1001
+ * </pre>
  */
 public class PartialUpdateAvroPayload extends 
OverwriteNonDefaultsWithLatestAvroPayload {
 
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java
index 6431b63899f..28313f150c8 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java
@@ -18,13 +18,13 @@
 
 package org.apache.hudi.common.model;
 
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.util.Option;
 
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
@@ -223,4 +223,87 @@ public class TestPartialUpdateAvroPayload {
     assertEquals(mergedRecord2.get("_hoodie_commit_time").toString(), 
record2.get("_hoodie_commit_time").toString());
     assertEquals(mergedRecord2.get("_hoodie_commit_seqno").toString(), 
record2.get("_hoodie_commit_seqno").toString());
   }
+
+  /**
+   * This test is to highlight the gotcha, where there are differences in 
result of the two queries on the same input data below:
+   * <pre>
+   *   Query A (No precombine):
+   *
+   *   INSERT INTO t1 VALUES (1, 'partition1', 1, false, NY0, ['A']);
+   *   INSERT INTO t1 VALUES (1, 'partition1', 0, false, NY1, ['A']);
+   *   INSERT INTO t1 VALUES (1, 'partition1', 2, false, NULL, ['A']);
+   *
+   *   Final output of Query A:
+   *   (1, 'partition1', 2, false, NY0, ['A'])
+   *
+   *   Query B (preCombine invoked)
+   *   INSERT INTO t1 VALUES (1, 'partition1', 1, false, NULL, ['A']);
+   *   INSERT INTO t1 VALUES (1, 'partition1', 0, false, NY1, ['A']), (1, 
'partition1', 2, false, NULL, ['A']);
+   *
+   *   Final output of Query B:
+   *   (1, 'partition1', 2, false, NY1, ['A'])
+   * </pre>
+   *
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testPartialUpdateGotchas() throws IOException {
+    Properties properties = new Properties();
+    properties.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, "ts");
+
+    GenericRecord record1 = new GenericData.Record(schema);
+    record1.put("id", "1");
+    record1.put("partition", "partition1");
+    record1.put("ts", 1L);
+    record1.put("_hoodie_is_deleted", false);
+    record1.put("city", "NY0");
+    record1.put("child", Arrays.asList("A"));
+
+    GenericRecord record2 = new GenericData.Record(schema);
+    record2.put("id", "1");
+    record2.put("partition", "partition1");
+    record2.put("ts", 0L);
+    record2.put("_hoodie_is_deleted", false);
+    record2.put("city", "NY1");
+    record2.put("child", Arrays.asList("B"));
+
+    GenericRecord record3 = new GenericData.Record(schema);
+    record3.put("id", "1");
+    record3.put("partition", "partition1");
+    record3.put("ts", 2L);
+    record3.put("_hoodie_is_deleted", false);
+    record3.put("city", null);
+    record3.put("child", Arrays.asList("A"));
+
+    // define expected outputs
+    GenericRecord pureCombineOutput = new GenericData.Record(schema);
+    pureCombineOutput.put("id", "1");
+    pureCombineOutput.put("partition", "partition1");
+    pureCombineOutput.put("ts", 2L);
+    pureCombineOutput.put("_hoodie_is_deleted", false);
+    pureCombineOutput.put("city", "NY0");
+    pureCombineOutput.put("child", Arrays.asList("A"));
+
+    GenericRecord outputWithPreCombineUsed = new GenericData.Record(schema);
+    outputWithPreCombineUsed.put("id", "1");
+    outputWithPreCombineUsed.put("partition", "partition1");
+    outputWithPreCombineUsed.put("ts", 2L);
+    outputWithPreCombineUsed.put("_hoodie_is_deleted", false);
+    outputWithPreCombineUsed.put("city", "NY1");
+    outputWithPreCombineUsed.put("child", Arrays.asList("A"));
+
+    PartialUpdateAvroPayload payload2 = new PartialUpdateAvroPayload(record2, 
0L);
+    PartialUpdateAvroPayload payload3 = new PartialUpdateAvroPayload(record3, 
2L);
+
+    // query A (no preCombine)
+    IndexedRecord firstCombineOutput = 
payload2.combineAndGetUpdateValue(record1, schema, properties).get();
+    IndexedRecord secondCombineOutput = 
payload3.combineAndGetUpdateValue(firstCombineOutput, schema, properties).get();
+    assertEquals(pureCombineOutput, secondCombineOutput);
+
+    // query B (preCombine invoked)
+    PartialUpdateAvroPayload payloadAfterPreCombine = 
payload3.preCombine(payload2, schema, properties);
+    IndexedRecord finalOutputWithPreCombine = 
payloadAfterPreCombine.combineAndGetUpdateValue(record1, schema, 
properties).get();
+    assertEquals(outputWithPreCombineUsed, finalOutputWithPreCombine);
+  }
 }

Reply via email to