This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e331141232c [MINOR] Added docs on gotchas when using
PartialUpdateAvroPayload (#8579)
e331141232c is described below
commit e331141232ccef12336a6ebd8db4c0355426a015
Author: voonhous <[email protected]>
AuthorDate: Thu May 11 18:15:12 2023 +0800
[MINOR] Added docs on gotchas when using PartialUpdateAvroPayload (#8579)
---
.../common/model/PartialUpdateAvroPayload.java | 27 +++++++
.../common/model/TestPartialUpdateAvroPayload.java | 91 +++++++++++++++++++++-
2 files changed, 114 insertions(+), 4 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java
index 1c13a11ee59..27e744c4925 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java
@@ -87,6 +87,33 @@ import java.util.Properties;
* id ts name price
* 1 2 name_1 price_1
* </pre>
+ *
+ * <p>Gotchas:
+ * <p>In cases where a batch of records is preCombine before
combineAndGetUpdateValue with the underlying records to be updated located in
parquet files, the end states of records might not be as how
+ * one will expect when applying a straightforward partial update.
+ *
+ * <p>Gotchas-Example:
+ * <pre>
+ * -- Insertion order of records:
+ * INSERT INTO t1 VALUES (1, 'a1', 10, 1000); -- (1)
+ * INSERT INTO t1 VALUES (1, 'a1', 11, 999), (1, 'a1_0', null, 1001); -- (2)
+ *
+ * SELECT id, name, price, _ts FROM t1;
+ * -- One would the results to return:
+ * -- 1 a1_0 10.0 1001
+
+ * -- However, the results returned are:
+ * -- 1 a1_0 11.0 1001
+ *
+ * -- This occurs as preCombine is applied on (2) first to return:
+ * -- 1 a1_0 11.0 1001
+ *
+ * -- And this then combineAndGetUpdateValue with the existing oldValue:
+ * -- 1 a1_0 10.0 1000
+ *
+ * -- To return:
+ * -- 1 a1_0 11.0 1001
+ * </pre>
*/
public class PartialUpdateAvroPayload extends
OverwriteNonDefaultsWithLatestAvroPayload {
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java
index 6431b63899f..28313f150c8 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java
@@ -18,13 +18,13 @@
package org.apache.hudi.common.model;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -223,4 +223,87 @@ public class TestPartialUpdateAvroPayload {
assertEquals(mergedRecord2.get("_hoodie_commit_time").toString(),
record2.get("_hoodie_commit_time").toString());
assertEquals(mergedRecord2.get("_hoodie_commit_seqno").toString(),
record2.get("_hoodie_commit_seqno").toString());
}
+
+ /**
+ * This test is to highlight the gotcha, where there are differences in
result of the two queries on the same input data below:
+ * <pre>
+ * Query A (No precombine):
+ *
+ * INSERT INTO t1 VALUES (1, 'partition1', 1, false, NY0, ['A']);
+ * INSERT INTO t1 VALUES (1, 'partition1', 0, false, NY1, ['A']);
+ * INSERT INTO t1 VALUES (1, 'partition1', 2, false, NULL, ['A']);
+ *
+ * Final output of Query A:
+ * (1, 'partition1', 2, false, NY0, ['A'])
+ *
+ * Query B (preCombine invoked)
+ * INSERT INTO t1 VALUES (1, 'partition1', 1, false, NULL, ['A']);
+ * INSERT INTO t1 VALUES (1, 'partition1', 0, false, NY1, ['A']), (1,
'partition1', 2, false, NULL, ['A']);
+ *
+ * Final output of Query B:
+ * (1, 'partition1', 2, false, NY1, ['A'])
+ * </pre>
+ *
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testPartialUpdateGotchas() throws IOException {
+ Properties properties = new Properties();
+ properties.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, "ts");
+
+ GenericRecord record1 = new GenericData.Record(schema);
+ record1.put("id", "1");
+ record1.put("partition", "partition1");
+ record1.put("ts", 1L);
+ record1.put("_hoodie_is_deleted", false);
+ record1.put("city", "NY0");
+ record1.put("child", Arrays.asList("A"));
+
+ GenericRecord record2 = new GenericData.Record(schema);
+ record2.put("id", "1");
+ record2.put("partition", "partition1");
+ record2.put("ts", 0L);
+ record2.put("_hoodie_is_deleted", false);
+ record2.put("city", "NY1");
+ record2.put("child", Arrays.asList("B"));
+
+ GenericRecord record3 = new GenericData.Record(schema);
+ record3.put("id", "1");
+ record3.put("partition", "partition1");
+ record3.put("ts", 2L);
+ record3.put("_hoodie_is_deleted", false);
+ record3.put("city", null);
+ record3.put("child", Arrays.asList("A"));
+
+ // define expected outputs
+ GenericRecord pureCombineOutput = new GenericData.Record(schema);
+ pureCombineOutput.put("id", "1");
+ pureCombineOutput.put("partition", "partition1");
+ pureCombineOutput.put("ts", 2L);
+ pureCombineOutput.put("_hoodie_is_deleted", false);
+ pureCombineOutput.put("city", "NY0");
+ pureCombineOutput.put("child", Arrays.asList("A"));
+
+ GenericRecord outputWithPreCombineUsed = new GenericData.Record(schema);
+ outputWithPreCombineUsed.put("id", "1");
+ outputWithPreCombineUsed.put("partition", "partition1");
+ outputWithPreCombineUsed.put("ts", 2L);
+ outputWithPreCombineUsed.put("_hoodie_is_deleted", false);
+ outputWithPreCombineUsed.put("city", "NY1");
+ outputWithPreCombineUsed.put("child", Arrays.asList("A"));
+
+ PartialUpdateAvroPayload payload2 = new PartialUpdateAvroPayload(record2,
0L);
+ PartialUpdateAvroPayload payload3 = new PartialUpdateAvroPayload(record3,
2L);
+
+ // query A (no preCombine)
+ IndexedRecord firstCombineOutput =
payload2.combineAndGetUpdateValue(record1, schema, properties).get();
+ IndexedRecord secondCombineOutput =
payload3.combineAndGetUpdateValue(firstCombineOutput, schema, properties).get();
+ assertEquals(pureCombineOutput, secondCombineOutput);
+
+ // query B (preCombine invoked)
+ PartialUpdateAvroPayload payloadAfterPreCombine =
payload3.preCombine(payload2, schema, properties);
+ IndexedRecord finalOutputWithPreCombine =
payloadAfterPreCombine.combineAndGetUpdateValue(record1, schema,
properties).get();
+ assertEquals(outputWithPreCombineUsed, finalOutputWithPreCombine);
+ }
}