nsivabalan commented on code in PR #13631:
URL: https://github.com/apache/hudi/pull/13631#discussion_r2234025977
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateStrategy.java:
##########
@@ -78,8 +81,12 @@ BufferedRecord<T> partialMerge(BufferedRecord<T> newRecord,
case FILL_DEFAULTS:
return newRecord;
case IGNORE_DEFAULTS:
- return reconcileDefaultValues(
- newRecord, oldRecord, newSchema, oldSchema,
keepOldMetadataColumns);
+ try {
+ return reconcileDefaultValues(
+ newRecord, oldRecord, newSchema, oldSchema,
keepOldMetadataColumns);
+ } catch (IOException e) {
+ throw new HoodieException("Failed to merge two records", e);
Review Comment:
why no move the try catch within `reconcileDefaultValues`
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBufferedRecordMerger.java:
##########
@@ -0,0 +1,747 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.functional;
+
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.DefaultSparkRecordMerger;
+import org.apache.hudi.OverwriteWithLatestSparkRecordMerger;
+import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.model.HoodieEmptyRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.PartialUpdateMode;
+import org.apache.hudi.common.table.log.InstantRange;
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.BufferedRecordMerger;
+import org.apache.hudi.common.table.read.BufferedRecordMergerFactory;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.expression.Predicate;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.function.UnaryOperator;
+
+import static
org.apache.hudi.BaseSparkInternalRowReaderContext.getFieldValueFromInternalRow;
+import static
org.apache.hudi.common.config.RecordMergeMode.COMMIT_TIME_ORDERING;
+import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+import static org.apache.hudi.common.util.OrderingValues.DEFAULT_VALUE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class TestBufferedRecordMerger extends SparkClientFunctionalTestHarness {
+ private static final String RECORD_KEY = "initial_id";
+ private static final String PARTITION_PATH = "test_partition";
+ private static final long ORDERING_VALUE = 100L;
+ private static final String IGNORE_MARKERS_VALUE = "__HUDI_DEFAULT_MARKER__";
+ private static final List<Schema> schemas = Arrays.asList(
+ getSchema1(), getSchema2(), getSchema3(), getSchema4(), getSchema5());
+ private static final Schema readerSchema = getSchema1();
+ private HoodieTableConfig tableConfig;
+ private StorageConfiguration<?> storageConfig;
+ private TypedProperties props;
+ private HoodieReaderContext<InternalRow> readerContext;
+
+ @BeforeEach
+ void setUp() throws IOException {
+ // Setup mocks
+ tableConfig = mock(HoodieTableConfig.class);
+ storageConfig = mock(StorageConfiguration.class);
+ when(tableConfig.getPayloadClass()).thenReturn(
+ "org.apache.hudi.common.model.DefaultHoodieRecordPayload");
+ when(tableConfig.populateMetaFields()).thenReturn(false);
+ when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new
String[]{"id"}));
+
+ // Create reader context.
+ props = new TypedProperties();
+ readerContext = new DummyInternalRowReaderContext(
+ storageConfig, tableConfig, Option.empty(), Option.empty());
+ }
+
+ //
============================================================================
+ // Test Set 1: enablePartialMerging = false
+ //
============================================================================
+ @ParameterizedTest
+ @EnumSource(value = RecordMergeMode.class, names = {"COMMIT_TIME_ORDERING",
"EVENT_TIME_ORDERING"})
+ void testRegularMerging(RecordMergeMode mergeMode) throws IOException {
+ // Test 1: NONE partial update mode (regular path)
+ BufferedRecordMerger<InternalRow> noneMerger = createMerger(readerContext,
mergeMode, PartialUpdateMode.NONE);
+ // Create records with all columns
+ InternalRow oldRecord = createFullRecord("old_id", "Old Name", 25, "Old
City", 1000L);
+ InternalRow newRecord = createFullRecord("new_id", "New Name", 30, "New
City", 2000L);
+ BufferedRecord<InternalRow> oldBufferedRecord =
+ new BufferedRecord<>(RECORD_KEY, ORDERING_VALUE, oldRecord, 1, false);
+ BufferedRecord<InternalRow> newBufferedRecord =
+ new BufferedRecord<>(RECORD_KEY, ORDERING_VALUE - 1, newRecord, 1,
false);
Review Comment:
for regular updates, lets test both cases.
- new record has higher ordering value.
- new record has lower ordering value
looks like we are testing only for lower ordering value.
Also, lets do the above for both `deltaMerge` and `finalMerge`
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBufferedRecordMerger.java:
##########
@@ -0,0 +1,747 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.functional;
+
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.DefaultSparkRecordMerger;
+import org.apache.hudi.OverwriteWithLatestSparkRecordMerger;
+import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.model.HoodieEmptyRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.PartialUpdateMode;
+import org.apache.hudi.common.table.log.InstantRange;
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.BufferedRecordMerger;
+import org.apache.hudi.common.table.read.BufferedRecordMergerFactory;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.expression.Predicate;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.function.UnaryOperator;
+
+import static
org.apache.hudi.BaseSparkInternalRowReaderContext.getFieldValueFromInternalRow;
+import static
org.apache.hudi.common.config.RecordMergeMode.COMMIT_TIME_ORDERING;
+import static
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+import static org.apache.hudi.common.util.OrderingValues.DEFAULT_VALUE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class TestBufferedRecordMerger extends SparkClientFunctionalTestHarness {
+ private static final String RECORD_KEY = "initial_id";
+ private static final String PARTITION_PATH = "test_partition";
+ private static final long ORDERING_VALUE = 100L;
+ private static final String IGNORE_MARKERS_VALUE = "__HUDI_DEFAULT_MARKER__";
+ private static final List<Schema> schemas = Arrays.asList(
+ getSchema1(), getSchema2(), getSchema3(), getSchema4(), getSchema5());
+ private static final Schema readerSchema = getSchema1();
+ private HoodieTableConfig tableConfig;
+ private StorageConfiguration<?> storageConfig;
+ private TypedProperties props;
+ private HoodieReaderContext<InternalRow> readerContext;
+
+ @BeforeEach
+ void setUp() throws IOException {
+ // Setup mocks
+ tableConfig = mock(HoodieTableConfig.class);
+ storageConfig = mock(StorageConfiguration.class);
+ when(tableConfig.getPayloadClass()).thenReturn(
+ "org.apache.hudi.common.model.DefaultHoodieRecordPayload");
+ when(tableConfig.populateMetaFields()).thenReturn(false);
+ when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new
String[]{"id"}));
+
+ // Create reader context.
+ props = new TypedProperties();
+ readerContext = new DummyInternalRowReaderContext(
+ storageConfig, tableConfig, Option.empty(), Option.empty());
+ }
+
+ //
============================================================================
+ // Test Set 1: enablePartialMerging = false
+ //
============================================================================
+ @ParameterizedTest
+ @EnumSource(value = RecordMergeMode.class, names = {"COMMIT_TIME_ORDERING",
"EVENT_TIME_ORDERING"})
+ void testRegularMerging(RecordMergeMode mergeMode) throws IOException {
+ // Test 1: NONE partial update mode (regular path)
+ BufferedRecordMerger<InternalRow> noneMerger = createMerger(readerContext,
mergeMode, PartialUpdateMode.NONE);
+ // Create records with all columns
+ InternalRow oldRecord = createFullRecord("old_id", "Old Name", 25, "Old
City", 1000L);
+ InternalRow newRecord = createFullRecord("new_id", "New Name", 30, "New
City", 2000L);
+ BufferedRecord<InternalRow> oldBufferedRecord =
+ new BufferedRecord<>(RECORD_KEY, ORDERING_VALUE, oldRecord, 1, false);
+ BufferedRecord<InternalRow> newBufferedRecord =
+ new BufferedRecord<>(RECORD_KEY, ORDERING_VALUE - 1, newRecord, 1,
false);
Review Comment:
Let me list the cases we wanted to cover here.
```
for each (Event time based merge mode, CommitTime based merge mode){
for each (deltaMerge, finalMerge) {
case 1: old record valid w/ ordering value 100, new record is
valid w/ ordering value 200. (new has higher ordering value)
case 2: old record valid w/ ordering value 100, new record is
valid w/ ordering value 50. (new has lower ordering value)
case 3: old record valid w/ ordering value 100, new record is
valid w/ no ordering value. (new has no ordering value)
case 4: old record valid w/ no ordering value, new record is
valid w/ ordering value 200. (old no ordering value, new has a valid ordering
value)
case 5: old record valid w/ no ordering value, new record is
valid w/ no ordering value. (old no ordering value, new has no ordering value)
case 6: old record valid w/ ordering value 100, new record is
deleted w/ no ordering value (new record is deleted)
case 7: old record valid w/ ordering value 100, new record is
deleted w/ higher ordering value 200 (new record is deleted w/ higher ordering
value)
case 8: old record valid w/ ordering value 100, new record is
deleted w/ lower ordering value 50 (new record is deleted w/ lower ordering
value)
case 9: old record is deleted w/ no ordering value, new record is
valid w/ no ordering value (old record is deleted)
case 10: old record is deleted w/ no ordering value, new record
is valid w/ a valid ordering value (old record is deleted, new w/ valid
ordering value)
case 11: old record is deleted w/ no ordering value, new record
is also deleted (old record is deleted, new is also deleted)
case 12: old record is deleted w/ ordering value 100, new record
is valid w/ no ordering value (old record is deleted)
case 13: old record is deleted w/ ordering value 100, new record
is valid w/ a valid ordering value 200 (old record is deleted, new w/ valid
ordering value)
case 14: old record is deleted w/ ordering value 100, new record
is valid w/ a lower ordering value 50 (old record is deleted, new w/ lower
ordering value)
case 15: old record is deleted w/ ordering value 100, new record
is also deleted (old record is deleted, new is also deleted)
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]