alexeykudinkin commented on code in PR #4676:
URL: https://github.com/apache/hudi/pull/4676#discussion_r970068953
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java:
##########
@@ -36,7 +36,7 @@
protected final transient HoodieEngineContext context;
protected final transient Configuration hadoopConf;
- protected final HoodieWriteConfig config;
+ public final HoodieWriteConfig config;
Review Comment:
Please expose the field via getter
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java:
##########
@@ -51,16 +55,20 @@ protected HoodieData<HoodieRecord<T>>
tag(HoodieData<HoodieRecord<T>> dedupedRec
@Override
public HoodieData<HoodieRecord<T>> deduplicateRecords(
- HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int
parallelism) {
+ HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int
parallelism, String jsonSchema) {
boolean isIndexingGlobal = index.isGlobal();
+ final Schema[] schema = {null};
return records.mapToPair(record -> {
HoodieKey hoodieKey = record.getKey();
// If index used is global, then records are expected to differ in their
partitionPath
Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey;
return Pair.of(key, record);
}).reduceByKey((rec1, rec2) -> {
+ if (schema[0] == null) {
+ schema[0] = new Schema.Parser().parse(jsonSchema);
Review Comment:
We can use a simple local var, no need for that wrapping
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java:
##########
@@ -88,16 +91,20 @@ protected List<HoodieRecord<T>> tag(List<HoodieRecord<T>>
dedupedRecords, Hoodie
@Override
public List<HoodieRecord<T>> deduplicateRecords(
- List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism)
{
+ List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism,
String jsonSchema) {
Review Comment:
Same comment as above
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java:
##########
@@ -51,16 +55,20 @@ protected HoodieData<HoodieRecord<T>>
tag(HoodieData<HoodieRecord<T>> dedupedRec
@Override
public HoodieData<HoodieRecord<T>> deduplicateRecords(
- HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int
parallelism) {
+ HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int
parallelism, String jsonSchema) {
Review Comment:
This is Avro schema, right? Let's name it accordingly
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java:
##########
@@ -74,6 +75,15 @@ private static Stream<Arguments> writeLogTest() {
return Stream.of(data).map(Arguments::of);
}
+ private static Stream<Arguments> writePayloadTest() {
+ // Payload class
+ Object[] data = new Object[] {
Review Comment:
You can inline this var
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java:
##########
@@ -88,16 +91,20 @@ protected List<HoodieRecord<T>> tag(List<HoodieRecord<T>>
dedupedRecords, Hoodie
@Override
public List<HoodieRecord<T>> deduplicateRecords(
- List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism)
{
+ List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism,
String jsonSchema) {
// If index used is global, then records are expected to differ in their
partitionPath
Map<Object, List<HoodieRecord<T>>> keyedRecords = records.stream()
.collect(Collectors.groupingBy(record ->
record.getKey().getRecordKey()));
+ final Schema[] schema = {null};
return keyedRecords.values().stream().map(x -> x.stream().reduce((rec1,
rec2) -> {
final T data1 = rec1.getData();
final T data2 = rec2.getData();
- @SuppressWarnings("unchecked") final T reducedData = (T)
data2.preCombine(data1);
+ if (schema[0] == null) {
Review Comment:
Same comment as above
##########
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java:
##########
@@ -64,9 +67,13 @@ public List<HoodieRecord<T>> deduplicateRecords(
return Pair.of(key, record);
}).collect(Collectors.groupingBy(Pair::getLeft));
+ final Schema[] schema = {null};
return keyedRecords.values().stream().map(x ->
x.stream().map(Pair::getRight).reduce((rec1, rec2) -> {
+ if (schema[0] == null) {
Review Comment:
Please address this across the board
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java:
##########
@@ -58,6 +58,20 @@ default T preCombine(T oldValue, Properties properties) {
return preCombine(oldValue);
}
+ /**
+ * When more than one HoodieRecord have the same HoodieKey in the incoming
batch, this function combines them before attempting to insert/upsert by taking
in a schema.
+ * Implementation can leverage the schema to decide their business logic to
do preCombine.
+ *
+ * @param oldValue instance of the old {@link HoodieRecordPayload} to be
combined with.
+ * @param properties Payload related properties. For example pass the
ordering field(s) name to extract from value in storage.
+ * @param schema Payload related schema. For example use schema to
overwrite old instance for specified fields that doesn't equal to default value.
+ * @return the combined value
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
Review Comment:
Please mark this as `EVOLVING`
##########
hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * subclass of OverwriteNonDefaultsWithLatestAvroPayload used for delta
streamer.
+ *
+ * <ol>
+ * <li>preCombine - Picks the latest delta record for a key, based on an
ordering field;
+ * <li>combineAndGetUpdateValue/getInsertValue - overwrite storage for
specified fields
+ * that doesn't equal defaultValue.
+ * </ol>
+ */
+public class PartialUpdateAvroPayload extends
OverwriteNonDefaultsWithLatestAvroPayload {
+
+ public PartialUpdateAvroPayload(GenericRecord record, Comparable
orderingVal) {
+ super(record, orderingVal);
+ }
+
+ public PartialUpdateAvroPayload(Option<GenericRecord> record) {
+ super(record); // natural order
+ }
+
+ @Override
+ public PartialUpdateAvroPayload preCombine(OverwriteWithLatestAvroPayload
oldValue, Properties properties, Schema schema) {
+ if (oldValue.recordBytes.length == 0) {
+ // use natural order for delete record
+ return this;
+ }
+ boolean isBaseRecordForMerge = false;
+ if (oldValue.orderingVal.compareTo(orderingVal) > 0) {
+ // pick the payload with greatest ordering value as insert record
+ isBaseRecordForMerge = true;
+ }
+ try {
+ GenericRecord indexedOldValue = (GenericRecord)
oldValue.getInsertValue(schema).get();
+ Option<IndexedRecord> optValue =
combineAndGetUpdateValue(indexedOldValue, schema, isBaseRecordForMerge);
+ if (optValue.isPresent()) {
+ return new PartialUpdateAvroPayload((GenericRecord) optValue.get(),
+ isBaseRecordForMerge ? oldValue.orderingVal : this.orderingVal);
+ }
+ } catch (Exception ex) {
+ return this;
+ }
+ return this;
+ }
+
+ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord
currentValue, Schema schema, boolean isBaseRecordForMerge) throws IOException {
+ Option<IndexedRecord> recordOption = getInsertValue(schema);
+
+ if (!recordOption.isPresent()) {
+ return Option.empty();
+ }
+
+ GenericRecord insertRecord;
+ GenericRecord currentRecord;
+ if (isBaseRecordForMerge) {
+ insertRecord = (GenericRecord) currentValue;
+ currentRecord = (GenericRecord) recordOption.get();
+ } else {
+ insertRecord = (GenericRecord) recordOption.get();
+ currentRecord = (GenericRecord) currentValue;
+ }
+
+ return getMergedIndexedRecordOption(schema, insertRecord, currentRecord);
+ }
+
+ @Override
+ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord
currentValue, Schema schema) throws IOException {
+ return this.combineAndGetUpdateValue(currentValue, schema, false);
+ }
+
+ @Override
+ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord
currentValue, Schema schema, Properties prop) throws IOException {
+ String orderingField =
prop.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY);
+ boolean isBaseRecordForMerge = false;
+
+ if (!StringUtils.isNullOrEmpty(orderingField)) {
+ String oldOrderingVal = HoodieAvroUtils.getNestedFieldValAsString(
+ (GenericRecord) currentValue, orderingField, false, false);
+ if (oldOrderingVal.compareTo(orderingVal.toString()) > 0) {
Review Comment:
We can't compare arbitrary data-types converted into strings (10 > 2, but
"10" < "2")
##########
hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * subclass of OverwriteNonDefaultsWithLatestAvroPayload used for delta
streamer.
+ *
+ * <ol>
+ * <li>preCombine - Picks the latest delta record for a key, based on an
ordering field;
+ * <li>combineAndGetUpdateValue/getInsertValue - overwrite storage for
specified fields
+ * that doesn't equal defaultValue.
+ * </ol>
+ */
+public class PartialUpdateAvroPayload extends
OverwriteNonDefaultsWithLatestAvroPayload {
+
+ public PartialUpdateAvroPayload(GenericRecord record, Comparable
orderingVal) {
+ super(record, orderingVal);
+ }
+
+ public PartialUpdateAvroPayload(Option<GenericRecord> record) {
+ super(record); // natural order
+ }
+
+ @Override
+ public PartialUpdateAvroPayload preCombine(OverwriteWithLatestAvroPayload
oldValue, Properties properties, Schema schema) {
+ if (oldValue.recordBytes.length == 0) {
+ // use natural order for delete record
+ return this;
+ }
+ boolean isBaseRecordForMerge = false;
+ if (oldValue.orderingVal.compareTo(orderingVal) > 0) {
+ // pick the payload with greatest ordering value as insert record
+ isBaseRecordForMerge = true;
+ }
+ try {
+ GenericRecord indexedOldValue = (GenericRecord)
oldValue.getInsertValue(schema).get();
+ Option<IndexedRecord> optValue =
combineAndGetUpdateValue(indexedOldValue, schema, isBaseRecordForMerge);
+ if (optValue.isPresent()) {
+ return new PartialUpdateAvroPayload((GenericRecord) optValue.get(),
+ isBaseRecordForMerge ? oldValue.orderingVal : this.orderingVal);
+ }
+ } catch (Exception ex) {
+ return this;
+ }
+ return this;
+ }
+
+ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord
currentValue, Schema schema, boolean isBaseRecordForMerge) throws IOException {
Review Comment:
Why is this method public?
##########
hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * subclass of OverwriteNonDefaultsWithLatestAvroPayload used for delta
streamer.
+ *
+ * <ol>
+ * <li>preCombine - Picks the latest delta record for a key, based on an
ordering field;
+ * <li>combineAndGetUpdateValue/getInsertValue - overwrite storage for
specified fields
+ * that doesn't equal defaultValue.
+ * </ol>
+ */
+public class PartialUpdateAvroPayload extends
OverwriteNonDefaultsWithLatestAvroPayload {
+
+ public PartialUpdateAvroPayload(GenericRecord record, Comparable
orderingVal) {
+ super(record, orderingVal);
+ }
+
+ public PartialUpdateAvroPayload(Option<GenericRecord> record) {
+ super(record); // natural order
+ }
+
+ @Override
+ public PartialUpdateAvroPayload preCombine(OverwriteWithLatestAvroPayload
oldValue, Properties properties, Schema schema) {
+ if (oldValue.recordBytes.length == 0) {
+ // use natural order for delete record
+ return this;
+ }
+ boolean isBaseRecordForMerge = false;
+ if (oldValue.orderingVal.compareTo(orderingVal) > 0) {
+ // pick the payload with greatest ordering value as insert record
+ isBaseRecordForMerge = true;
+ }
+ try {
+ GenericRecord indexedOldValue = (GenericRecord)
oldValue.getInsertValue(schema).get();
+ Option<IndexedRecord> optValue =
combineAndGetUpdateValue(indexedOldValue, schema, isBaseRecordForMerge);
+ if (optValue.isPresent()) {
+ return new PartialUpdateAvroPayload((GenericRecord) optValue.get(),
+ isBaseRecordForMerge ? oldValue.orderingVal : this.orderingVal);
+ }
+ } catch (Exception ex) {
+ return this;
+ }
+ return this;
+ }
+
+ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord
currentValue, Schema schema, boolean isBaseRecordForMerge) throws IOException {
Review Comment:
`isBaseRecordForMerge` is not very intuitive (it doesn't point which record
is a "base-record")
I'd suggest `shouldInsertCurrentValue`
##########
hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java:
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.avro.JsonProperties;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+
+import org.apache.hudi.common.util.Option;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+
+/**
+ * Unit tests {@link TestPartialUpdateAvroPayload}.
+ */
+public class TestPartialUpdateAvroPayload {
+ private Schema schema;
+
+ String jsonSchema = "{\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"partialRecord\", \"namespace\":\"org.apache.hudi\",\n"
+ + " \"fields\": [\n"
+ + " {\"name\": \"id\", \"type\": [\"null\", \"string\"]},\n"
+ + " {\"name\": \"partition\", \"type\": [\"null\", \"string\"]},\n"
+ + " {\"name\": \"ts\", \"type\": [\"null\", \"long\"]},\n"
+ + " {\"name\": \"_hoodie_is_deleted\", \"type\": [\"null\",
\"boolean\"], \"default\":false},\n"
+ + " {\"name\": \"city\", \"type\": [\"null\", \"string\"]},\n"
+ + " {\"name\": \"child\", \"type\": [\"null\", {\"type\": \"array\",
\"items\": \"string\"}]}\n"
+ + " ]\n"
+ + "}";
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ schema = new Schema.Parser().parse(jsonSchema);
+ }
+
+ @Test
+ public void testActiveRecords() throws IOException {
+ GenericRecord record1 = new GenericData.Record(schema);
+ record1.put("id", "1");
+ record1.put("partition", "partition1");
+ record1.put("ts", null);
+ record1.put("_hoodie_is_deleted", false);
+ record1.put("city", "NY0");
+ record1.put("child", Arrays.asList("A"));
+
+ GenericRecord record2 = new GenericData.Record(schema);
+ record2.put("id", "1");
+ record2.put("partition", "partition1");
+ record2.put("ts", 0L);
+ record2.put("_hoodie_is_deleted", false);
+ record2.put("city", null);
+ record2.put("child", Arrays.asList("B"));
+
+ GenericRecord record3 = new GenericData.Record(schema);
+ record3.put("id", "1");
+ record3.put("partition", "partition1");
+ record3.put("ts", 0L);
+ record3.put("_hoodie_is_deleted", false);
+ record3.put("city", "NY0");
+ record3.put("child", Arrays.asList("A"));
+
+ GenericRecord record4 = new GenericData.Record(schema);
+ record4.put("id", "1");
+ record4.put("partition", "partition1");
+ record4.put("ts", 0L);
+ record4.put("_hoodie_is_deleted", false);
+ record4.put("city", "NY0");
+ record4.put("child", Arrays.asList("B"));
+
+
+ PartialUpdateAvroPayload payload1 = new PartialUpdateAvroPayload(record1,
1);
+ PartialUpdateAvroPayload payload2 = new PartialUpdateAvroPayload(record2,
2);
+ assertArrayEquals(payload1.preCombine(payload2, new Properties() ,
schema).recordBytes, new PartialUpdateAvroPayload(record4, 2).recordBytes);
+ assertArrayEquals(payload2.preCombine(payload1, new Properties(),
schema).recordBytes, new PartialUpdateAvroPayload(record4, 2).recordBytes);
+
+ assertEquals(record1, payload1.getInsertValue(schema).get());
+ assertEquals(record2, payload2.getInsertValue(schema).get());
+
+ assertEquals(payload1.combineAndGetUpdateValue(record2, schema).get(),
record3);
+ assertEquals(payload2.combineAndGetUpdateValue(record1, schema).get(),
record4);
+
+ // regenerate
+ record1 = new GenericData.Record(schema);
+ record1.put("id", "1");
+ record1.put("partition", "partition1");
+ record1.put("ts", null);
+ record1.put("_hoodie_is_deleted", false);
+ record1.put("city", "NY0");
+ record1.put("child", Arrays.asList("A"));
+
+ record2 = new GenericData.Record(schema);
+ record2.put("id", "1");
+ record2.put("partition", "partition1");
+ record2.put("ts", 0L);
+ record2.put("_hoodie_is_deleted", false);
+ record2.put("city", null);
+ record2.put("child", Arrays.asList("B"));
+
+ payload1 = new PartialUpdateAvroPayload(record1, 2);
+ payload2 = new PartialUpdateAvroPayload(record2, 1);
+ assertArrayEquals(payload1.preCombine(payload2, new Properties() ,
schema).recordBytes, new PartialUpdateAvroPayload(record3, 2).recordBytes);
+ assertArrayEquals(payload2.preCombine(payload1, new Properties() ,
schema).recordBytes, new PartialUpdateAvroPayload(record3, 2).recordBytes);
+ }
+
+ @Test
+ public void testDeletedRecord() throws IOException {
+ GenericRecord record1 = new GenericData.Record(schema);
+ record1.put("id", "1");
+ record1.put("partition", "partition0");
+ record1.put("ts", 0L);
+ record1.put("_hoodie_is_deleted", false);
+ record1.put("city", "NY0");
+ record1.put("child", Collections.emptyList());
+
+ GenericRecord delRecord1 = new GenericData.Record(schema);
+ delRecord1.put("id", "2");
+ delRecord1.put("partition", "partition1");
+ delRecord1.put("ts", 1L);
+ delRecord1.put("_hoodie_is_deleted", true);
+ delRecord1.put("city", "NY0");
+ delRecord1.put("child", Collections.emptyList());
+
+ GenericRecord record2 = new GenericData.Record(schema);
+ record2.put("id", "1");
+ record2.put("partition", "partition0");
+ record2.put("ts", 0L);
+ record2.put("_hoodie_is_deleted", true);
+ record2.put("city", "NY0");
+ record2.put("child", Collections.emptyList());
+
+ PartialUpdateAvroPayload payload1 = new PartialUpdateAvroPayload(record1,
0);
+ PartialUpdateAvroPayload payload2 = new
PartialUpdateAvroPayload(delRecord1, 1);
+
+ assertEquals(payload1.preCombine(payload2), payload2);
+ assertEquals(payload2.preCombine(payload1), payload2);
+
+ assertEquals(record1, payload1.getInsertValue(schema).get());
+ assertFalse(payload2.getInsertValue(schema).isPresent());
+
+ Properties properties = new Properties();
+ properties.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, "ts");
+ assertEquals(payload1.combineAndGetUpdateValue(delRecord1, schema,
properties), Option.empty());
+ assertFalse(payload2.combineAndGetUpdateValue(record1, schema,
properties).isPresent());
+ }
+
+ @Test
+ public void testNullColumn() throws IOException {
+ Schema avroSchema = Schema.createRecord(Arrays.asList(
Review Comment:
Let's represent this as a string rather
##########
hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java:
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.avro.JsonProperties;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+
+import org.apache.hudi.common.util.Option;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+
+/**
+ * Unit tests {@link TestPartialUpdateAvroPayload}.
+ */
+public class TestPartialUpdateAvroPayload {
+ private Schema schema;
+
+ String jsonSchema = "{\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"partialRecord\", \"namespace\":\"org.apache.hudi\",\n"
+ + " \"fields\": [\n"
+ + " {\"name\": \"id\", \"type\": [\"null\", \"string\"]},\n"
+ + " {\"name\": \"partition\", \"type\": [\"null\", \"string\"]},\n"
+ + " {\"name\": \"ts\", \"type\": [\"null\", \"long\"]},\n"
+ + " {\"name\": \"_hoodie_is_deleted\", \"type\": [\"null\",
\"boolean\"], \"default\":false},\n"
+ + " {\"name\": \"city\", \"type\": [\"null\", \"string\"]},\n"
+ + " {\"name\": \"child\", \"type\": [\"null\", {\"type\": \"array\",
\"items\": \"string\"}]}\n"
+ + " ]\n"
+ + "}";
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ schema = new Schema.Parser().parse(jsonSchema);
+ }
+
+ @Test
+ public void testActiveRecords() throws IOException {
+ GenericRecord record1 = new GenericData.Record(schema);
+ record1.put("id", "1");
+ record1.put("partition", "partition1");
+ record1.put("ts", null);
+ record1.put("_hoodie_is_deleted", false);
+ record1.put("city", "NY0");
+ record1.put("child", Arrays.asList("A"));
+
+ GenericRecord record2 = new GenericData.Record(schema);
+ record2.put("id", "1");
+ record2.put("partition", "partition1");
+ record2.put("ts", 0L);
+ record2.put("_hoodie_is_deleted", false);
+ record2.put("city", null);
+ record2.put("child", Arrays.asList("B"));
+
+ GenericRecord record3 = new GenericData.Record(schema);
+ record3.put("id", "1");
+ record3.put("partition", "partition1");
+ record3.put("ts", 0L);
+ record3.put("_hoodie_is_deleted", false);
+ record3.put("city", "NY0");
+ record3.put("child", Arrays.asList("A"));
+
+ GenericRecord record4 = new GenericData.Record(schema);
+ record4.put("id", "1");
+ record4.put("partition", "partition1");
+ record4.put("ts", 0L);
+ record4.put("_hoodie_is_deleted", false);
+ record4.put("city", "NY0");
+ record4.put("child", Arrays.asList("B"));
+
+
+ PartialUpdateAvroPayload payload1 = new PartialUpdateAvroPayload(record1,
1);
+ PartialUpdateAvroPayload payload2 = new PartialUpdateAvroPayload(record2,
2);
+ assertArrayEquals(payload1.preCombine(payload2, new Properties() ,
schema).recordBytes, new PartialUpdateAvroPayload(record4, 2).recordBytes);
+ assertArrayEquals(payload2.preCombine(payload1, new Properties(),
schema).recordBytes, new PartialUpdateAvroPayload(record4, 2).recordBytes);
+
+ assertEquals(record1, payload1.getInsertValue(schema).get());
+ assertEquals(record2, payload2.getInsertValue(schema).get());
+
+ assertEquals(payload1.combineAndGetUpdateValue(record2, schema).get(),
record3);
+ assertEquals(payload2.combineAndGetUpdateValue(record1, schema).get(),
record4);
+
+ // regenerate
+ record1 = new GenericData.Record(schema);
+ record1.put("id", "1");
+ record1.put("partition", "partition1");
+ record1.put("ts", null);
+ record1.put("_hoodie_is_deleted", false);
+ record1.put("city", "NY0");
+ record1.put("child", Arrays.asList("A"));
+
+ record2 = new GenericData.Record(schema);
+ record2.put("id", "1");
+ record2.put("partition", "partition1");
+ record2.put("ts", 0L);
+ record2.put("_hoodie_is_deleted", false);
+ record2.put("city", null);
+ record2.put("child", Arrays.asList("B"));
+
+ payload1 = new PartialUpdateAvroPayload(record1, 2);
+ payload2 = new PartialUpdateAvroPayload(record2, 1);
+ assertArrayEquals(payload1.preCombine(payload2, new Properties() ,
schema).recordBytes, new PartialUpdateAvroPayload(record3, 2).recordBytes);
+ assertArrayEquals(payload2.preCombine(payload1, new Properties() ,
schema).recordBytes, new PartialUpdateAvroPayload(record3, 2).recordBytes);
+ }
+
+ @Test
+ public void testDeletedRecord() throws IOException {
+ GenericRecord record1 = new GenericData.Record(schema);
+ record1.put("id", "1");
+ record1.put("partition", "partition0");
+ record1.put("ts", 0L);
+ record1.put("_hoodie_is_deleted", false);
+ record1.put("city", "NY0");
+ record1.put("child", Collections.emptyList());
+
+ GenericRecord delRecord1 = new GenericData.Record(schema);
+ delRecord1.put("id", "2");
+ delRecord1.put("partition", "partition1");
+ delRecord1.put("ts", 1L);
+ delRecord1.put("_hoodie_is_deleted", true);
+ delRecord1.put("city", "NY0");
+ delRecord1.put("child", Collections.emptyList());
+
+ GenericRecord record2 = new GenericData.Record(schema);
+ record2.put("id", "1");
+ record2.put("partition", "partition0");
+ record2.put("ts", 0L);
+ record2.put("_hoodie_is_deleted", true);
+ record2.put("city", "NY0");
+ record2.put("child", Collections.emptyList());
+
+ PartialUpdateAvroPayload payload1 = new PartialUpdateAvroPayload(record1,
0);
+ PartialUpdateAvroPayload payload2 = new
PartialUpdateAvroPayload(delRecord1, 1);
+
+ assertEquals(payload1.preCombine(payload2), payload2);
+ assertEquals(payload2.preCombine(payload1), payload2);
+
+ assertEquals(record1, payload1.getInsertValue(schema).get());
+ assertFalse(payload2.getInsertValue(schema).isPresent());
+
+ Properties properties = new Properties();
+ properties.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, "ts");
+ assertEquals(payload1.combineAndGetUpdateValue(delRecord1, schema,
properties), Option.empty());
+ assertFalse(payload2.combineAndGetUpdateValue(record1, schema,
properties).isPresent());
+ }
+
+ @Test
+ public void testNullColumn() throws IOException {
+ Schema avroSchema = Schema.createRecord(Arrays.asList(
+ new Schema.Field("id",
Schema.createUnion(Schema.create(Schema.Type.STRING),
Schema.create(Schema.Type.NULL)), "", JsonProperties.NULL_VALUE),
+ new Schema.Field("name",
Schema.createUnion(Schema.create(Schema.Type.STRING),
Schema.create(Schema.Type.NULL)), "", JsonProperties.NULL_VALUE),
+ new Schema.Field("age",
Schema.createUnion(Schema.create(Schema.Type.STRING),
Schema.create(Schema.Type.NULL)), "", JsonProperties.NULL_VALUE),
+ new Schema.Field("job",
Schema.createUnion(Schema.create(Schema.Type.STRING),
Schema.create(Schema.Type.NULL)), "", JsonProperties.NULL_VALUE)
+ ));
+ GenericRecord record1 = new GenericData.Record(avroSchema);
+ record1.put("id", "1");
+ record1.put("name", "aa");
+ record1.put("age", "1");
+ record1.put("job", "1");
+
+ GenericRecord record2 = new GenericData.Record(avroSchema);
+ record2.put("id", "1");
+ record2.put("name", "bb");
+ record2.put("age", "2");
+ record2.put("job", null);
+
+ GenericRecord record3 = new GenericData.Record(avroSchema);
+ record3.put("id", "1");
+ record3.put("name", "bb");
+ record3.put("age", "2");
+ record3.put("job", "1");
+
+ OverwriteNonDefaultsWithLatestAvroPayload payload2 = new
OverwriteNonDefaultsWithLatestAvroPayload(record2, 1);
Review Comment:
This overlaps w/ the first test, doesn't it? Do we need it separately?
##########
hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java:
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.avro.JsonProperties;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+
+import org.apache.hudi.common.util.Option;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+
+/**
+ * Unit tests {@link TestPartialUpdateAvroPayload}.
+ */
+public class TestPartialUpdateAvroPayload {
+ private Schema schema;
+
+ String jsonSchema = "{\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"partialRecord\", \"namespace\":\"org.apache.hudi\",\n"
+ + " \"fields\": [\n"
+ + " {\"name\": \"id\", \"type\": [\"null\", \"string\"]},\n"
+ + " {\"name\": \"partition\", \"type\": [\"null\", \"string\"]},\n"
+ + " {\"name\": \"ts\", \"type\": [\"null\", \"long\"]},\n"
+ + " {\"name\": \"_hoodie_is_deleted\", \"type\": [\"null\",
\"boolean\"], \"default\":false},\n"
+ + " {\"name\": \"city\", \"type\": [\"null\", \"string\"]},\n"
+ + " {\"name\": \"child\", \"type\": [\"null\", {\"type\": \"array\",
\"items\": \"string\"}]}\n"
+ + " ]\n"
+ + "}";
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ schema = new Schema.Parser().parse(jsonSchema);
+ }
+
+ @Test
+ public void testActiveRecords() throws IOException {
+ GenericRecord record1 = new GenericData.Record(schema);
+ record1.put("id", "1");
+ record1.put("partition", "partition1");
+ record1.put("ts", null);
+ record1.put("_hoodie_is_deleted", false);
+ record1.put("city", "NY0");
+ record1.put("child", Arrays.asList("A"));
+
+ GenericRecord record2 = new GenericData.Record(schema);
+ record2.put("id", "1");
+ record2.put("partition", "partition1");
+ record2.put("ts", 0L);
+ record2.put("_hoodie_is_deleted", false);
+ record2.put("city", null);
+ record2.put("child", Arrays.asList("B"));
+
+ GenericRecord record3 = new GenericData.Record(schema);
+ record3.put("id", "1");
+ record3.put("partition", "partition1");
+ record3.put("ts", 0L);
+ record3.put("_hoodie_is_deleted", false);
+ record3.put("city", "NY0");
+ record3.put("child", Arrays.asList("A"));
+
+ GenericRecord record4 = new GenericData.Record(schema);
+ record4.put("id", "1");
+ record4.put("partition", "partition1");
+ record4.put("ts", 0L);
+ record4.put("_hoodie_is_deleted", false);
+ record4.put("city", "NY0");
+ record4.put("child", Arrays.asList("B"));
+
+
+ PartialUpdateAvroPayload payload1 = new PartialUpdateAvroPayload(record1,
1);
+ PartialUpdateAvroPayload payload2 = new PartialUpdateAvroPayload(record2,
2);
+ assertArrayEquals(payload1.preCombine(payload2, new Properties() ,
schema).recordBytes, new PartialUpdateAvroPayload(record4, 2).recordBytes);
+ assertArrayEquals(payload2.preCombine(payload1, new Properties(),
schema).recordBytes, new PartialUpdateAvroPayload(record4, 2).recordBytes);
+
+ assertEquals(record1, payload1.getInsertValue(schema).get());
+ assertEquals(record2, payload2.getInsertValue(schema).get());
+
+ assertEquals(payload1.combineAndGetUpdateValue(record2, schema).get(),
record3);
+ assertEquals(payload2.combineAndGetUpdateValue(record1, schema).get(),
record4);
+
+ // regenerate
+ record1 = new GenericData.Record(schema);
Review Comment:
What's the diff from original values?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]