yihua commented on code in PR #10302:
URL: https://github.com/apache/hudi/pull/10302#discussion_r1430538313


##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestHarness.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.testutils.reader;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.junit.jupiter.api.AfterAll;
+
+import java.io.IOException;
+import java.util.List;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA;
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultHadoopConf;
+
+public class HoodieFileGroupReaderTestHarness extends HoodieCommonTestHarness {

Review Comment:
   For all PRs, could you mark which PR(s) this one is stacked on so the 
reviewer does not have to review the same class?



##########
hudi-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import 
org.apache.hudi.common.testutils.reader.HoodieFileGroupReaderTestHarness;
+import org.apache.hudi.common.testutils.reader.HoodieFileSliceTestUtils;
+import org.apache.hudi.common.testutils.reader.HoodieTestReaderContext;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.hudi.common.model.HoodieRecord.HoodieRecordType.AVRO;
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA;
+import static 
org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.DELETE;
+import static 
org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.INSERT;
+import static 
org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.UPDATE;
+import static 
org.apache.hudi.common.testutils.reader.HoodieFileSliceTestUtils.ROW_KEY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestCustomMerger extends HoodieFileGroupReaderTestHarness {
+  @BeforeAll
+  public static void setUp() throws IOException {
+    // Enable our custom merger.
+    readerContext = new HoodieTestReaderContext(Option.of(new 
CustomAvroMerger()));
+
+    keyRanges = Arrays.asList(
+        new HoodieFileSliceTestUtils.KeyRange(1, 10),
+        new HoodieFileSliceTestUtils.KeyRange(1, 5),
+        new HoodieFileSliceTestUtils.KeyRange(1, 3),
+        new HoodieFileSliceTestUtils.KeyRange(6, 8),
+        new HoodieFileSliceTestUtils.KeyRange(1, 2));
+    timestamps = Arrays.asList(
+        2L, 3L, 4L, 1L, 9L);
+    operationTypes = Arrays.asList(
+        INSERT, DELETE, UPDATE, DELETE, UPDATE);
+    instantTimes = Arrays.asList(
+        "001", "002", "003", "004", "005");
+  }
+
+  @BeforeEach
+  public void initialize() throws Exception {
+    setTableName(TestCustomMerger.class.getName());
+    initPath(tableName);
+    initMetaClient();
+    initTestDataGenerator(new String[]{PARTITION_PATH});
+    testTable = HoodieTestTable.of(metaClient);
+    setUpMockCommits();
+  }
+
+  @Test
+  public void testWithOneLogFile() throws IOException, InterruptedException {
+    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(2);
+    List<String> leftKeysExpected =
+        Arrays.asList("6", "7", "8", "9", "10");
+    List<String> leftKeysActual = new ArrayList<>();
+    while (iterator.hasNext()) {
+      leftKeysActual.add(iterator.next()
+          .get(AVRO_SCHEMA.getField(ROW_KEY).pos())
+          .toString());
+    }
+    assertEquals(leftKeysExpected, leftKeysActual);
+  }
+
+  @Test
+  public void testWithTwoLogFiles() throws IOException, InterruptedException {
+    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(3);
+    List<String> leftKeysExpected =
+        Arrays.asList("1", "3", "6", "7", "8", "9", "10");
+    List<String> leftKeysActual = new ArrayList<>();
+    while (iterator.hasNext()) {
+      leftKeysActual.add(iterator.next()
+          .get(AVRO_SCHEMA.getField(ROW_KEY).pos())
+          .toString());
+    }
+    assertEquals(leftKeysExpected, leftKeysActual);
+  }
+
+  @Test
+  public void testWithThreeLogFiles() throws IOException, InterruptedException 
{
+    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(4);
+    List<String> leftKeysExpected =
+        Arrays.asList("1", "3", "7", "9", "10");
+    List<String> leftKeysActual = new ArrayList<>();
+    while (iterator.hasNext()) {
+      leftKeysActual.add(iterator.next()
+          .get(AVRO_SCHEMA.getField(ROW_KEY).pos())
+          .toString());
+    }
+    assertEquals(leftKeysExpected, leftKeysActual);
+  }
+
+  @Test
+  public void testWithFourLogFiles() throws IOException, InterruptedException {
+    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(5);
+    List<String> leftKeysExpected =
+        Arrays.asList("1", "3", "7", "9", "10");
+    List<String> leftKeysActual = new ArrayList<>();
+    while (iterator.hasNext()) {
+      leftKeysActual.add(iterator.next()
+          .get(AVRO_SCHEMA.getField(ROW_KEY).pos())
+          .toString());
+    }
+    assertEquals(leftKeysExpected, leftKeysActual);
+  }
+
+  public static class CustomAvroMerger implements HoodieRecordMerger {

Review Comment:
   Docs on what's the expected output from the merging.



##########
hudi-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import 
org.apache.hudi.common.testutils.reader.HoodieFileGroupReaderTestHarness;
+import org.apache.hudi.common.testutils.reader.HoodieFileSliceTestUtils;
+import org.apache.hudi.common.testutils.reader.HoodieTestReaderContext;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.hudi.common.model.HoodieRecord.HoodieRecordType.AVRO;
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA;
+import static 
org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.DELETE;
+import static 
org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.INSERT;
+import static 
org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.UPDATE;
+import static 
org.apache.hudi.common.testutils.reader.HoodieFileSliceTestUtils.ROW_KEY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestCustomMerger extends HoodieFileGroupReaderTestHarness {
+  @BeforeAll
+  public static void setUp() throws IOException {
+    // Enable our custom merger.
+    readerContext = new HoodieTestReaderContext(Option.of(new 
CustomAvroMerger()));
+
+    keyRanges = Arrays.asList(
+        new HoodieFileSliceTestUtils.KeyRange(1, 10),
+        new HoodieFileSliceTestUtils.KeyRange(1, 5),
+        new HoodieFileSliceTestUtils.KeyRange(1, 3),
+        new HoodieFileSliceTestUtils.KeyRange(6, 8),
+        new HoodieFileSliceTestUtils.KeyRange(1, 2));
+    timestamps = Arrays.asList(
+        2L, 3L, 4L, 1L, 9L);
+    operationTypes = Arrays.asList(
+        INSERT, DELETE, UPDATE, DELETE, UPDATE);
+    instantTimes = Arrays.asList(
+        "001", "002", "003", "004", "005");
+  }
+
+  @BeforeEach
+  public void initialize() throws Exception {
+    setTableName(TestCustomMerger.class.getName());
+    initPath(tableName);
+    initMetaClient();
+    initTestDataGenerator(new String[]{PARTITION_PATH});
+    testTable = HoodieTestTable.of(metaClient);
+    setUpMockCommits();
+  }
+
+  @Test
+  public void testWithOneLogFile() throws IOException, InterruptedException {
+    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(2);
+    List<String> leftKeysExpected =
+        Arrays.asList("6", "7", "8", "9", "10");
+    List<String> leftKeysActual = new ArrayList<>();
+    while (iterator.hasNext()) {
+      leftKeysActual.add(iterator.next()
+          .get(AVRO_SCHEMA.getField(ROW_KEY).pos())
+          .toString());
+    }
+    assertEquals(leftKeysExpected, leftKeysActual);
+  }
+
+  @Test
+  public void testWithTwoLogFiles() throws IOException, InterruptedException {
+    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(3);
+    List<String> leftKeysExpected =
+        Arrays.asList("1", "3", "6", "7", "8", "9", "10");
+    List<String> leftKeysActual = new ArrayList<>();
+    while (iterator.hasNext()) {
+      leftKeysActual.add(iterator.next()
+          .get(AVRO_SCHEMA.getField(ROW_KEY).pos())
+          .toString());
+    }
+    assertEquals(leftKeysExpected, leftKeysActual);
+  }
+
+  @Test
+  public void testWithThreeLogFiles() throws IOException, InterruptedException 
{
+    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(4);
+    List<String> leftKeysExpected =
+        Arrays.asList("1", "3", "7", "9", "10");
+    List<String> leftKeysActual = new ArrayList<>();
+    while (iterator.hasNext()) {
+      leftKeysActual.add(iterator.next()
+          .get(AVRO_SCHEMA.getField(ROW_KEY).pos())
+          .toString());
+    }
+    assertEquals(leftKeysExpected, leftKeysActual);
+  }
+
+  @Test
+  public void testWithFourLogFiles() throws IOException, InterruptedException {
+    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(5);
+    List<String> leftKeysExpected =
+        Arrays.asList("1", "3", "7", "9", "10");
+    List<String> leftKeysActual = new ArrayList<>();
+    while (iterator.hasNext()) {
+      leftKeysActual.add(iterator.next()
+          .get(AVRO_SCHEMA.getField(ROW_KEY).pos())
+          .toString());
+    }
+    assertEquals(leftKeysExpected, leftKeysActual);
+  }
+
+  public static class CustomAvroMerger implements HoodieRecordMerger {
+    public static final String KEEP_CERTAIN_TIMESTAMP_VALUE_ONLY =
+        "KEEP_CERTAIN_TIMESTAMP_VALUE_ONLY";
+    public static final String TIMESTAMP = "timestamp";
+
+    @Override
+    public Option<Pair<HoodieRecord, Schema>> merge(
+        HoodieRecord older,
+        Schema oldSchema,
+        HoodieRecord newer,
+        Schema newSchema,
+        TypedProperties props
+    ) throws IOException {
+      if (newer.getOrderingValue(newSchema, props).compareTo(
+          older.getOrderingValue(oldSchema, props)) >= 0) {
+        if (newer.isDelete(newSchema, props)) {
+          return Option.empty();
+        }
+        int id = Integer.parseInt(((HoodieAvroIndexedRecord) newer)
+            .getData().get(newSchema.getField(ROW_KEY).pos()).toString());
+        if (id % 2 == 1L) {
+          return Option.of(Pair.of(newer, newSchema));
+        }
+      } else {
+        if (older.isDelete(oldSchema, props)) {
+          return Option.empty();
+        }
+        int id = Integer.parseInt(((HoodieAvroIndexedRecord) older)
+            .getData().get(oldSchema.getField(ROW_KEY).pos()).toString());
+        if (id % 2 == 1L) {
+          return Option.of(Pair.of(older, oldSchema));
+        }

Review Comment:
   similar here



##########
hudi-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import 
org.apache.hudi.common.testutils.reader.HoodieFileGroupReaderTestHarness;
+import org.apache.hudi.common.testutils.reader.HoodieFileSliceTestUtils;
+import org.apache.hudi.common.testutils.reader.HoodieTestReaderContext;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.hudi.common.model.HoodieRecord.HoodieRecordType.AVRO;
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA;
+import static 
org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.DELETE;
+import static 
org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.INSERT;
+import static 
org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.UPDATE;
+import static 
org.apache.hudi.common.testutils.reader.HoodieFileSliceTestUtils.ROW_KEY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestCustomMerger extends HoodieFileGroupReaderTestHarness {
+  @BeforeAll
+  public static void setUp() throws IOException {
+    // Enable our custom merger.
+    readerContext = new HoodieTestReaderContext(Option.of(new 
CustomAvroMerger()));
+
+    keyRanges = Arrays.asList(
+        new HoodieFileSliceTestUtils.KeyRange(1, 10),
+        new HoodieFileSliceTestUtils.KeyRange(1, 5),
+        new HoodieFileSliceTestUtils.KeyRange(1, 3),
+        new HoodieFileSliceTestUtils.KeyRange(6, 8),
+        new HoodieFileSliceTestUtils.KeyRange(1, 2));
+    timestamps = Arrays.asList(
+        2L, 3L, 4L, 1L, 9L);
+    operationTypes = Arrays.asList(
+        INSERT, DELETE, UPDATE, DELETE, UPDATE);
+    instantTimes = Arrays.asList(
+        "001", "002", "003", "004", "005");
+  }
+
+  @BeforeEach

Review Comment:
   The test table is the same for all tests so there's no need to prepare the 
test table before each individual test.



##########
hudi-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import 
org.apache.hudi.common.testutils.reader.HoodieFileGroupReaderTestHarness;
+import org.apache.hudi.common.testutils.reader.HoodieFileSliceTestUtils;
+import org.apache.hudi.common.testutils.reader.HoodieTestReaderContext;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.hudi.common.model.HoodieRecord.HoodieRecordType.AVRO;
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA;
+import static 
org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.DELETE;
+import static 
org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.INSERT;
+import static 
org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.UPDATE;
+import static 
org.apache.hudi.common.testutils.reader.HoodieFileSliceTestUtils.ROW_KEY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestCustomMerger extends HoodieFileGroupReaderTestHarness {
+  @BeforeAll
+  public static void setUp() throws IOException {
+    // Enable our custom merger.
+    readerContext = new HoodieTestReaderContext(Option.of(new 
CustomAvroMerger()));
+
+    keyRanges = Arrays.asList(
+        new HoodieFileSliceTestUtils.KeyRange(1, 10),
+        new HoodieFileSliceTestUtils.KeyRange(1, 5),
+        new HoodieFileSliceTestUtils.KeyRange(1, 3),
+        new HoodieFileSliceTestUtils.KeyRange(6, 8),
+        new HoodieFileSliceTestUtils.KeyRange(1, 2));
+    timestamps = Arrays.asList(
+        2L, 3L, 4L, 1L, 9L);
+    operationTypes = Arrays.asList(
+        INSERT, DELETE, UPDATE, DELETE, UPDATE);
+    instantTimes = Arrays.asList(
+        "001", "002", "003", "004", "005");
+  }
+
+  @BeforeEach
+  public void initialize() throws Exception {
+    setTableName(TestCustomMerger.class.getName());
+    initPath(tableName);
+    initMetaClient();
+    initTestDataGenerator(new String[]{PARTITION_PATH});
+    testTable = HoodieTestTable.of(metaClient);
+    setUpMockCommits();
+  }
+
+  @Test
+  public void testWithOneLogFile() throws IOException, InterruptedException {
+    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(2);
+    List<String> leftKeysExpected =
+        Arrays.asList("6", "7", "8", "9", "10");
+    List<String> leftKeysActual = new ArrayList<>();
+    while (iterator.hasNext()) {
+      leftKeysActual.add(iterator.next()
+          .get(AVRO_SCHEMA.getField(ROW_KEY).pos())
+          .toString());
+    }
+    assertEquals(leftKeysExpected, leftKeysActual);
+  }
+
+  @Test
+  public void testWithTwoLogFiles() throws IOException, InterruptedException {
+    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(3);
+    List<String> leftKeysExpected =
+        Arrays.asList("1", "3", "6", "7", "8", "9", "10");
+    List<String> leftKeysActual = new ArrayList<>();
+    while (iterator.hasNext()) {
+      leftKeysActual.add(iterator.next()
+          .get(AVRO_SCHEMA.getField(ROW_KEY).pos())
+          .toString());
+    }
+    assertEquals(leftKeysExpected, leftKeysActual);
+  }
+
+  @Test
+  public void testWithThreeLogFiles() throws IOException, InterruptedException 
{
+    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(4);
+    List<String> leftKeysExpected =
+        Arrays.asList("1", "3", "7", "9", "10");
+    List<String> leftKeysActual = new ArrayList<>();
+    while (iterator.hasNext()) {
+      leftKeysActual.add(iterator.next()
+          .get(AVRO_SCHEMA.getField(ROW_KEY).pos())
+          .toString());
+    }
+    assertEquals(leftKeysExpected, leftKeysActual);
+  }
+
+  @Test
+  public void testWithFourLogFiles() throws IOException, InterruptedException {
+    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(5);
+    List<String> leftKeysExpected =
+        Arrays.asList("1", "3", "7", "9", "10");
+    List<String> leftKeysActual = new ArrayList<>();
+    while (iterator.hasNext()) {
+      leftKeysActual.add(iterator.next()
+          .get(AVRO_SCHEMA.getField(ROW_KEY).pos())
+          .toString());
+    }
+    assertEquals(leftKeysExpected, leftKeysActual);
+  }
+
+  public static class CustomAvroMerger implements HoodieRecordMerger {
+    public static final String KEEP_CERTAIN_TIMESTAMP_VALUE_ONLY =
+        "KEEP_CERTAIN_TIMESTAMP_VALUE_ONLY";
+    public static final String TIMESTAMP = "timestamp";
+
+    @Override
+    public Option<Pair<HoodieRecord, Schema>> merge(
+        HoodieRecord older,
+        Schema oldSchema,
+        HoodieRecord newer,
+        Schema newSchema,
+        TypedProperties props
+    ) throws IOException {
+      if (newer.getOrderingValue(newSchema, props).compareTo(
+          older.getOrderingValue(oldSchema, props)) >= 0) {
+        if (newer.isDelete(newSchema, props)) {
+          return Option.empty();
+        }
+        int id = Integer.parseInt(((HoodieAvroIndexedRecord) newer)
+            .getData().get(newSchema.getField(ROW_KEY).pos()).toString());
+        if (id % 2 == 1L) {
+          return Option.of(Pair.of(newer, newSchema));
+        }

Review Comment:
   Keep the old record otherwise?  or the intent is to delete the older after 
merging?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to