codope commented on code in PR #11415:
URL: https://github.com/apache/hudi/pull/11415#discussion_r1633697715


##########
hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestMerger.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*

Review Comment:
   remove redundant licenses here and below.



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java:
##########
@@ -163,4 +166,18 @@ default String[] 
getMandatoryFieldsForMerging(HoodieTableConfig cfg) {
    * The kind of merging strategy this recordMerger belongs to. An UUID 
represents merging strategy.
    */
   String getMergingStrategy();
+
+  /**
+   * The record merge mode that corresponds to this record merger
+   */
+  default RecordMergeMode getRecordMergeMode() {
+    switch (getMergingStrategy()) {
+      case DEFAULT_MERGER_STRATEGY_UUID:
+        return RecordMergeMode.EVENT_TIME_ORDERING;
+      case OVERWRITE_MERGER_STRATEGY_UUID:
+        return RecordMergeMode.OVERWRITE_WITH_LATEST;
+      default:
+        return RecordMergeMode.CUSTOM;

Review Comment:
   Can we reuse `inferRecordMergeMode` in `HoodieTableMetaClient` in some way? 
Even if we cannot, let's ensure that we keep the logic conistent, e.g. strategy 
could be `DEFAULT_MERGER_STRATEGY_UUID` while payload class could be 
`OverwriteWithLatestAvroPayload`, in that case merge mode should be 
`OVERWRITE_WITH_LATEST`.



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPositionBasedMergingFallback.scala:
##########
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.DataSourceWriteOptions.{PRECOMBINE_FIELD, 
RECORDKEY_FIELD, TABLE_TYPE}
+import org.apache.hudi.HoodieConversionUtils.toJavaOption
+import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieReaderConfig, 
HoodieStorageConfig, RecordMergeMode}
+import org.apache.hudi.common.model.{HoodieRecordMerger, 
OverwriteWithLatestAvroPayload, OverwriteWithLatestMerger}
+import org.apache.hudi.common.util
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.testutils.HoodieSparkClientTestBase
+import org.apache.hudi.util.JFunction
+import org.apache.spark.sql.SaveMode.{Append, Overwrite}
+import org.apache.spark.sql.SparkSessionExtensions
+import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
+import org.apache.spark.sql.internal.SQLConf
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.{AfterEach, BeforeEach}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.{Arguments, MethodSource}
+
+import java.util.function.Consumer
+
+class TestPositionBasedMergingFallback extends HoodieSparkClientTestBase {

Review Comment:
   yes please go ahead and remove it in that case



##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java:
##########
@@ -219,21 +223,22 @@ public static HoodieDeleteBlock getDeleteBlock(
       List<IndexedRecord> records,
       Map<HoodieLogBlock.HeaderMetadataType, String> header,
       Schema schema,
-      Properties props
+      Properties props,
+      boolean writePositions
   ) {
     List<HoodieRecord> hoodieRecords = records.stream()
         .map(r -> {
           String rowKey = (String) 
r.get(r.getSchema().getField(ROW_KEY).pos());
           String partitionPath = (String) 
r.get(r.getSchema().getField(PARTITION_PATH).pos());
-          return new HoodieAvroIndexedRecord(new HoodieKey(rowKey, 
partitionPath), r);
+          return new HoodieAvroIndexedRecord(new HoodieKey(rowKey, 
partitionPath), r, new HoodieRecordLocation("", "", Long.parseLong((String) 
r.get(1)) - 1));

Review Comment:
   same here.. please also add a comment for future reference as to why this is 
being done.



##########
hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestOverwriteWithLatestMerger.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.model.OverwriteWithLatestMerger;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import 
org.apache.hudi.common.testutils.reader.HoodieFileGroupReaderTestHarness;
+import org.apache.hudi.common.testutils.reader.HoodieFileSliceTestUtils;
+import org.apache.hudi.common.testutils.reader.HoodieTestReaderContext;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA;
+import static 
org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.DELETE;
+import static 
org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.INSERT;
+import static 
org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.UPDATE;
+import static 
org.apache.hudi.common.testutils.reader.HoodieFileSliceTestUtils.ROW_KEY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestOverwriteWithLatestMerger extends 
HoodieFileGroupReaderTestHarness {
+  @BeforeAll
+  public static void setUp() throws IOException {
+    HoodieRecordMerger merger = new OverwriteWithLatestMerger();
+    readerContext = new HoodieTestReaderContext(
+        Option.of(merger),
+        Option.of(OverwriteWithLatestAvroPayload.class.getName()));
+    properties.setProperty(
+        HoodieCommonConfig.RECORD_MERGE_MODE.key(), 
RecordMergeMode.OVERWRITE_WITH_LATEST.name());
+
+    // -------------------------------------------------------------
+    // The test logic is as follows:
+    // 1. Base file contains 10 records,
+    //    whose key values are from 1 to 10,
+    //    whose instant time is "001" and ordering value is 2.
+    // 2. After adding the first log file,
+    //    we delete the records with keys from 1 to 5
+    //    with ordering value 3.
+    //    Current existing keys: [6, 7, 8, 9, 10]
+    // 3. After adding the second log file,
+    //    we tried to add the records with keys from 1 to 3 back,
+    //    Current existing keys: [1, 2, 3, 6, 7, 8, 9, 10]
+    // 4. After adding the third log file,
+    //    we tried to delete records with keys from 6 to 8,
+    //    but we cannot since their ordering value is 1 < 2.
+    //    Current existing keys: [1, 2, 3, 9, 10]
+    // 5. After adding the fourth log file,
+    //    we tried to add the records with keys from 2 to 4
+    //    Current existing keys: [1, 2, 3, 4, 9, 10]
+    // -------------------------------------------------------------
+
+    // Specify the key column values for each file.
+    keyRanges = Arrays.asList(
+        new HoodieFileSliceTestUtils.KeyRange(1, 10),
+        new HoodieFileSliceTestUtils.KeyRange(1, 5),
+        new HoodieFileSliceTestUtils.KeyRange(1, 3),
+        new HoodieFileSliceTestUtils.KeyRange(6, 8),
+        new HoodieFileSliceTestUtils.KeyRange(2, 4));
+    // Specify the value of `timestamp` column for each file.
+    timestamps = Arrays.asList(
+        2L, 3L, 1L, 1L, 4L);
+    // Specify the operation type for each file.
+    operationTypes = Arrays.asList(
+        INSERT, DELETE, UPDATE, DELETE, UPDATE);
+    // Specify the instant time for each file.
+    instantTimes = Arrays.asList(
+        "001", "002", "003", "004", "005");
+    shouldWritePositions = Arrays.asList(false, false, false, false, false);
+  }
+
+  @BeforeEach
+  public void initialize() throws Exception {
+    setTableName(TestOverwriteWithLatestMerger.class.getName());
+    initPath(tableName);
+    initMetaClient();
+    initTestDataGenerator(new String[]{PARTITION_PATH});
+    testTable = HoodieTestTable.of(metaClient);
+    setUpMockCommits();
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testWithOneLogFile(boolean useRecordPositions) throws 
IOException, InterruptedException {
+    shouldWritePositions = Arrays.asList(useRecordPositions, 
useRecordPositions);
+    // The FileSlice contains a base file and a log file.
+    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(2, 
useRecordPositions);
+    List<String> leftKeysExpected = Arrays.asList("6", "7", "8", "9", "10");
+    List<Long> leftTimestampsExpected = Arrays.asList(2L, 2L, 2L, 2L, 2L);
+    List<String> leftKeysActual = new ArrayList<>();
+    List<Long> leftTimestampsActual = new ArrayList<>();
+    while (iterator.hasNext()) {
+      IndexedRecord record = iterator.next();
+      
leftKeysActual.add(record.get(AVRO_SCHEMA.getField(ROW_KEY).pos()).toString());
+      leftTimestampsActual.add((Long) 
record.get(AVRO_SCHEMA.getField("timestamp").pos()));
+    }
+    assertEquals(leftKeysExpected, leftKeysActual);
+    assertEquals(leftTimestampsExpected, leftTimestampsActual);
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testWithTwoLogFiles(boolean useRecordPositions) throws 
IOException, InterruptedException {
+    shouldWritePositions = Arrays.asList(useRecordPositions, 
useRecordPositions, useRecordPositions);
+    // The FileSlice contains a base file and two log files.
+    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(3, 
useRecordPositions);
+    List<String> leftKeysExpected = Arrays.asList("1", "2", "3", "6", "7", 
"8", "9", "10");
+    List<Long> leftTimestampsExpected = Arrays.asList(1L, 1L, 1L, 2L, 2L, 2L, 
2L, 2L);
+    List<String> leftKeysActual = new ArrayList<>();
+    List<Long> leftTimestampsActual = new ArrayList<>();
+    while (iterator.hasNext()) {
+      IndexedRecord record = iterator.next();
+      
leftKeysActual.add(record.get(AVRO_SCHEMA.getField(ROW_KEY).pos()).toString());
+      leftTimestampsActual.add((Long) 
record.get(AVRO_SCHEMA.getField("timestamp").pos()));
+    }
+    assertEquals(leftKeysExpected, leftKeysActual);
+    assertEquals(leftTimestampsExpected, leftTimestampsActual);
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testWithThreeLogFiles(boolean useRecordPositions) throws 
IOException, InterruptedException {
+    shouldWritePositions = Arrays.asList(useRecordPositions, 
useRecordPositions, useRecordPositions, useRecordPositions);
+    // The FileSlice contains a base file and three log files.
+    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(4, 
useRecordPositions);
+    List<String> leftKeysExpected = Arrays.asList("1", "2", "3", "9", "10");
+    List<Long> leftTimestampsExpected = Arrays.asList(1L, 1L, 1L, 2L, 2L);
+    List<String> leftKeysActual = new ArrayList<>();
+    List<Long> leftTimestampsActual = new ArrayList<>();
+    while (iterator.hasNext()) {
+      IndexedRecord record = iterator.next();
+      
leftKeysActual.add(record.get(AVRO_SCHEMA.getField(ROW_KEY).pos()).toString());
+      leftTimestampsActual.add((Long) 
record.get(AVRO_SCHEMA.getField("timestamp").pos()));
+    }
+    assertEquals(leftKeysExpected, leftKeysActual);
+    assertEquals(leftTimestampsExpected, leftTimestampsActual);
+  }
+
+  @Test
+  public void testWithFourLogFiles() throws IOException, InterruptedException {
+    // The FileSlice contains a base file and three log files.
+    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(5);
+    List<String> leftKeysExpected = Arrays.asList("1", "2", "3", "4", "9", 
"10");
+    List<Long> leftTimestampsExpected = Arrays.asList(1L, 4L, 4L, 4L, 2L, 2L);
+    List<String> leftKeysActual = new ArrayList<>();
+    List<Long> leftTimestampsActual = new ArrayList<>();
+    while (iterator.hasNext()) {
+      IndexedRecord record = iterator.next();
+      
leftKeysActual.add(record.get(AVRO_SCHEMA.getField(ROW_KEY).pos()).toString());
+      leftTimestampsActual.add((Long) 
record.get(AVRO_SCHEMA.getField("timestamp").pos()));
+    }
+    assertEquals(leftKeysExpected, leftKeysActual);
+    assertEquals(leftTimestampsExpected, leftTimestampsActual);
+  }
+
+  @ParameterizedTest
+  @MethodSource("testArgs")
+  public void testPositionMergeFallback(boolean log1haspositions, boolean 
log2haspositions,
+                                        boolean log3haspositions, boolean 
log4haspositions) throws IOException, InterruptedException {
+    shouldWritePositions = Arrays.asList(true, log1haspositions, 
log2haspositions, log3haspositions, log4haspositions);
+    // The FileSlice contains a base file and three log files.
+    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(5, true);
+    List<String> leftKeysExpected = Arrays.asList("1", "2", "3", "4", "9", 
"10");
+    List<Long> leftTimestampsExpected = Arrays.asList(1L, 4L, 4L, 4L, 2L, 2L);
+    List<String> leftKeysActual = new ArrayList<>();
+    List<Long> leftTimestampsActual = new ArrayList<>();
+    while (iterator.hasNext()) {
+      IndexedRecord record = iterator.next();
+      
leftKeysActual.add(record.get(AVRO_SCHEMA.getField(ROW_KEY).pos()).toString());
+      leftTimestampsActual.add((Long) 
record.get(AVRO_SCHEMA.getField("timestamp").pos()));
+    }
+    assertEquals(leftKeysExpected, leftKeysActual);
+    assertEquals(leftTimestampsExpected, leftTimestampsActual);
+  }
+
+  private static Stream<Arguments> testArgs() {

Review Comment:
   let's add a comment on what these arguments are. I know the test method has 
the arg names, but would be good to add a comment here as well. 



##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java:
##########
@@ -219,21 +223,22 @@ public static HoodieDeleteBlock getDeleteBlock(
       List<IndexedRecord> records,
       Map<HoodieLogBlock.HeaderMetadataType, String> header,
       Schema schema,
-      Properties props
+      Properties props,
+      boolean writePositions
   ) {
     List<HoodieRecord> hoodieRecords = records.stream()
         .map(r -> {
           String rowKey = (String) 
r.get(r.getSchema().getField(ROW_KEY).pos());
           String partitionPath = (String) 
r.get(r.getSchema().getField(PARTITION_PATH).pos());
-          return new HoodieAvroIndexedRecord(new HoodieKey(rowKey, 
partitionPath), r);
+          return new HoodieAvroIndexedRecord(new HoodieKey(rowKey, 
partitionPath), r, new HoodieRecordLocation("", "", Long.parseLong((String) 
r.get(1)) - 1));
         })
         .collect(Collectors.toList());
     return new HoodieDeleteBlock(
         hoodieRecords.stream().map(
             r -> Pair.of(DeleteRecord.create(
-                r.getKey(), r.getOrderingValue(schema, props)), -1L))
+                r.getKey(), r.getOrderingValue(schema, props)), 
Long.parseLong((String) ((IndexedRecord) r.getData()).get(1)) - 1))

Review Comment:
   same here



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java:
##########
@@ -53,6 +54,16 @@ public HoodieAvroIndexedRecord(HoodieKey key, IndexedRecord 
data) {
     super(key, data);
   }
 
+  @VisibleForTesting
+  public HoodieAvroIndexedRecord(HoodieKey key, IndexedRecord data, 
HoodieRecordLocation currentLocation) {

Review Comment:
   these are already public constructors, do we need to scope to be limited. 
Asking as I see `VisibleForTesting` annotation.



##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java:
##########
@@ -165,21 +166,24 @@ private static HoodieDataBlock getDataBlock(
       HoodieLogBlock.HoodieLogBlockType dataBlockType,
       List<IndexedRecord> records,
       Map<HoodieLogBlock.HeaderMetadataType, String> header,
-      StoragePath logFilePath
+      StoragePath logFilePath,
+      boolean writePositions
   ) {
     return createDataBlock(
         dataBlockType,
-        records.stream().map(HoodieAvroIndexedRecord::new)
+        records.stream().map(r -> new HoodieAvroIndexedRecord(r, new 
HoodieRecordLocation("", "", Long.parseLong((String) r.get(1)) - 1)))

Review Comment:
   can you explain this logic? i am wary of getting on hardcoded index.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java:
##########
@@ -174,20 +189,97 @@ public boolean containsLogRecord(String recordKey) {
   }
 
   @Override
-  protected boolean doHasNext() throws IOException {
-    ValidationUtils.checkState(baseFileIterator != null, "Base file iterator 
has not been set yet");
-
-    // Handle merging.
-    while (baseFileIterator.hasNext()) {
-      T baseRecord = baseFileIterator.next();
-      nextRecordPosition = readerContext.extractRecordPosition(baseRecord, 
readerSchema, ROW_INDEX_TEMPORARY_COLUMN_NAME, nextRecordPosition);
-      Pair<Option<T>, Map<String, Object>> logRecordInfo = 
records.remove(nextRecordPosition++);
-      if (hasNextBaseRecord(baseRecord, logRecordInfo)) {
-        return true;
+  protected boolean hasNextBaseRecord(T baseRecord) throws IOException {
+    if (!readerContext.getShouldMergeUseRecordPosition()) {
+      return doHasNextFallbackBaseRecord(baseRecord);
+    }
+
+    nextRecordPosition = readerContext.extractRecordPosition(baseRecord, 
readerSchema,
+        ROW_INDEX_COLUMN_NAME, nextRecordPosition);
+    Pair<Option<T>, Map<String, Object>> logRecordInfo = 
records.remove(nextRecordPosition++);
+
+    Map<String, Object> metadata = readerContext.generateMetadataForRecord(
+        baseRecord, readerSchema);
+
+    Option<T> resultRecord = logRecordInfo != null
+        ? merge(Option.of(baseRecord), metadata, logRecordInfo.getLeft(), 
logRecordInfo.getRight())
+        : merge(Option.empty(), Collections.emptyMap(), Option.of(baseRecord), 
metadata);
+    if (resultRecord.isPresent()) {
+      nextRecord = readerContext.seal(resultRecord.get());
+      return true;
+    }
+    return false;
+  }
+
+  private boolean doHasNextFallbackBaseRecord(T baseRecord) throws IOException 
{
+    if (needToDoHybridStrategy) {
+      //see if there is a delete block with record positions
+      nextRecordPosition = readerContext.extractRecordPosition(baseRecord, 
readerSchema,
+          ROW_INDEX_TEMPORARY_COLUMN_NAME, nextRecordPosition);
+      Pair<Option<T>, Map<String, Object>> logRecordInfo  = 
records.remove(nextRecordPosition++);

Review Comment:
   ok then it should be fine.



##########
hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestMerger.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+
+import java.io.IOException;
+
+public class OverwriteWithLatestMerger implements HoodieRecordMerger {

Review Comment:
   javadocs



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/OverwriteWithLatestSparkMerger.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+
+import java.io.IOException;
+
+public class OverwriteWithLatestSparkMerger extends HoodieSparkRecordMerger {

Review Comment:
   Ok, now I see that the default one is merging based on ordering val.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/OverwriteWithLatestSparkMerger.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+
+import java.io.IOException;
+
+public class OverwriteWithLatestSparkMerger extends HoodieSparkRecordMerger {

Review Comment:
   please add javadocs for the new classes. And why do we need this new merger 
implementation? Isn't overwrite the default merge strategy?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to