This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0b9ae7e00c3 [HUDI-7177] Test custom merger for MOR tables (#10302)
0b9ae7e00c3 is described below
commit 0b9ae7e00c3c1b4919a855ac9646e8a5f6448a5d
Author: Lin Liu <[email protected]>
AuthorDate: Wed Dec 20 17:47:29 2023 -0800
[HUDI-7177] Test custom merger for MOR tables (#10302)
---
.../read/HoodieBaseFileGroupRecordBuffer.java | 4 +-
.../hudi/common/table/read/TestCustomMerger.java | 236 +++++++++++++++++++++
.../common/table/read/TestEventTimeMerging.java | 177 ++++++++++++++++
.../reader/HoodieAvroRecordTestMerger.java | 81 +++++++
.../reader/HoodieFileGroupReaderTestHarness.java | 9 +-
.../testutils/reader/HoodieRecordTestPayload.java | 202 ++++++++++++++++++
.../testutils/reader/HoodieTestReaderContext.java | 16 +-
7 files changed, 719 insertions(+), 6 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
index f22e8f221f3..2f695cf0249 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
@@ -259,7 +259,9 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
readerContext.constructHoodieRecord(older, olderInfoMap), (Schema)
olderInfoMap.get(INTERNAL_META_SCHEMA),
readerContext.constructHoodieRecord(newer, newerInfoMap), (Schema)
newerInfoMap.get(INTERNAL_META_SCHEMA), payloadProps);
}
- if (mergedRecord.isPresent()) {
+
+ if (mergedRecord.isPresent()
+ &&
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(),
payloadProps)) {
return Option.ofNullable((T) mergedRecord.get().getLeft().getData());
}
return Option.empty();
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java
new file mode 100644
index 00000000000..3e80d4bee56
--- /dev/null
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import
org.apache.hudi.common.testutils.reader.HoodieFileGroupReaderTestHarness;
+import org.apache.hudi.common.testutils.reader.HoodieFileSliceTestUtils;
+import org.apache.hudi.common.testutils.reader.HoodieRecordTestPayload;
+import org.apache.hudi.common.testutils.reader.HoodieTestReaderContext;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.hudi.common.model.HoodieRecord.HoodieRecordType.AVRO;
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA;
+import static
org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.DELETE;
+import static
org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.INSERT;
+import static
org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.UPDATE;
+import static
org.apache.hudi.common.testutils.reader.HoodieFileSliceTestUtils.ROW_KEY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestCustomMerger extends HoodieFileGroupReaderTestHarness {
+ @BeforeAll
+ public static void setUp() throws IOException {
+ // Enable our custom merger.
+ readerContext = new HoodieTestReaderContext(
+ Option.of(new CustomAvroMerger()),
+ Option.of(HoodieRecordTestPayload.class.getName()));
+
+ // -------------------------------------------------------------
+ // The test logic is as follows:
+ // 1. Base file contains 10 records,
+ // whose key values are from 1 to 10,
+ // whose instant time is "001" and ordering value is 2.
+ // 2. After adding the first log file,
+ // we delete the records with keys from 1 to 5
+ // with ordering value 3. Since the rest of records are
+ // not through merger, they are kept as it is.
+ // Current existing keys: [6, 7, 8, 9, 10]
+ // 3. After adding the second log file,
+ // we tried to add the records with keys from 1 to 3 back,
+ // and we did it since their ordering value is 4 > 3, with
+ // the merge and flush function, only 1, 3 stay. Records with
+ // keys from 6 to 10 are as it is.
+ // Current existing keys: [1, 3, 6, 7, 8, 9, 10]
+ // 4. After adding the third log file,
+ // we tried to delete records with keys from 6 to 8,
+ // but we cannot since their ordering value is 1 < 2.
+ // This step brings records from 6 to 8 into the merger and flush,
+ // and only record with key 7 left.
+ // Current existing keys: [1, 3, 7, 9, 10]
+ // 5. After adding the fourth log file,
+ // we tried to add the records with keys from 1 to 2 back,
+ // and it worked since their ordering value is 9.
+ // Current existing keys: [1, 3, 7, 9]
+ // -------------------------------------------------------------
+
+ keyRanges = Arrays.asList(
+ new HoodieFileSliceTestUtils.KeyRange(1, 10),
+ new HoodieFileSliceTestUtils.KeyRange(1, 5),
+ new HoodieFileSliceTestUtils.KeyRange(1, 3),
+ new HoodieFileSliceTestUtils.KeyRange(6, 8),
+ new HoodieFileSliceTestUtils.KeyRange(1, 10));
+ timestamps = Arrays.asList(
+ 2L, 3L, 4L, 1L, 9L);
+ operationTypes = Arrays.asList(
+ INSERT, DELETE, UPDATE, DELETE, UPDATE);
+ instantTimes = Arrays.asList(
+ "001", "002", "003", "004", "005");
+ }
+
+ @BeforeEach
+ public void initialize() throws Exception {
+ setTableName(TestCustomMerger.class.getName());
+ initPath(tableName);
+ initMetaClient();
+ initTestDataGenerator(new String[]{PARTITION_PATH});
+ testTable = HoodieTestTable.of(metaClient);
+ setUpMockCommits();
+ }
+
+ @Test
+ public void testWithOneLogFile() throws IOException, InterruptedException {
+ ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(2);
+ List<String> leftKeysExpected =
+ Arrays.asList("6", "7", "8", "9", "10");
+ List<String> leftKeysActual = new ArrayList<>();
+ while (iterator.hasNext()) {
+ leftKeysActual.add(iterator.next()
+ .get(AVRO_SCHEMA.getField(ROW_KEY).pos())
+ .toString());
+ }
+ assertEquals(leftKeysExpected, leftKeysActual);
+ }
+
+ @Test
+ public void testWithTwoLogFiles() throws IOException, InterruptedException {
+ ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(3);
+ List<String> leftKeysExpected =
+ Arrays.asList("1", "3", "6", "7", "8", "9", "10");
+ List<String> leftKeysActual = new ArrayList<>();
+ while (iterator.hasNext()) {
+ leftKeysActual.add(iterator.next()
+ .get(AVRO_SCHEMA.getField(ROW_KEY).pos())
+ .toString());
+ }
+ assertEquals(leftKeysExpected, leftKeysActual);
+ }
+
+ @Test
+ public void testWithThreeLogFiles() throws IOException, InterruptedException
{
+ ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(4);
+ List<String> leftKeysExpected =
+ Arrays.asList("1", "3", "7", "9", "10");
+ List<String> leftKeysActual = new ArrayList<>();
+ while (iterator.hasNext()) {
+ leftKeysActual.add(iterator.next()
+ .get(AVRO_SCHEMA.getField(ROW_KEY).pos())
+ .toString());
+ }
+ assertEquals(leftKeysExpected, leftKeysActual);
+ }
+
+ @Test
+ public void testWithFourLogFiles() throws IOException, InterruptedException {
+ ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(5);
+ List<String> leftKeysExpected =
+ Arrays.asList("1", "3", "5", "7", "9");
+ List<String> leftKeysActual = new ArrayList<>();
+ while (iterator.hasNext()) {
+ leftKeysActual.add(iterator.next()
+ .get(AVRO_SCHEMA.getField(ROW_KEY).pos())
+ .toString());
+ }
+ assertEquals(leftKeysExpected, leftKeysActual);
+ }
+
+ /**
+ * This merger is designed to save records whose record key is odd.
+ * That means, if the record is not a delete record, and its record
+ * key is odd, then it will be output. Before the flush stage, we only
+ * flush records whose timestamp is multiple of 3; but since the write
+ * is insert operation, it will not go through the flush stage. Therefore,
+ * in this test, we solely test the merge function.
+ */
+ public static class CustomAvroMerger implements HoodieRecordMerger {
+ public static final String KEEP_CERTAIN_TIMESTAMP_VALUE_ONLY =
+ "KEEP_CERTAIN_TIMESTAMP_VALUE_ONLY";
+ public static final String TIMESTAMP = "timestamp";
+
+ @Override
+ public Option<Pair<HoodieRecord, Schema>> merge(
+ HoodieRecord older,
+ Schema oldSchema,
+ HoodieRecord newer,
+ Schema newSchema,
+ TypedProperties props
+ ) throws IOException {
+ if (newer.getOrderingValue(newSchema, props).compareTo(
+ older.getOrderingValue(oldSchema, props)) >= 0) {
+ if (newer.isDelete(newSchema, props)) {
+ return Option.empty();
+ }
+ int id = Integer.parseInt(((HoodieAvroIndexedRecord) newer)
+ .getData().get(newSchema.getField(ROW_KEY).pos()).toString());
+ if (id % 2 == 1L) {
+ return Option.of(Pair.of(newer, newSchema));
+ }
+ } else {
+ if (older.isDelete(oldSchema, props)) {
+ return Option.empty();
+ }
+ int id = Integer.parseInt(((HoodieAvroIndexedRecord) older)
+ .getData().get(oldSchema.getField(ROW_KEY).pos()).toString());
+ if (id % 2 == 1L) {
+ return Option.of(Pair.of(older, oldSchema));
+ }
+ }
+ return Option.empty();
+ }
+
+ @Override
+ public boolean shouldFlush(
+ HoodieRecord record,
+ Schema schema,
+ TypedProperties props
+ ) {
+ long timestamp = (long) ((HoodieAvroIndexedRecord) record)
+ .getData()
+ .get(schema.getField(TIMESTAMP).pos());
+ return timestamp % 3 == 0L;
+ }
+
+ @Override
+ public HoodieRecord.HoodieRecordType getRecordType() {
+ return AVRO;
+ }
+
+ @Override
+ public String getMergingStrategy() {
+ return KEEP_CERTAIN_TIMESTAMP_VALUE_ONLY;
+ }
+ }
+}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestEventTimeMerging.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestEventTimeMerging.java
new file mode 100644
index 00000000000..bf0fac19c67
--- /dev/null
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestEventTimeMerging.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.model.HoodieAvroRecordMerger;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.testutils.reader.HoodieAvroRecordTestMerger;
+import
org.apache.hudi.common.testutils.reader.HoodieFileGroupReaderTestHarness;
+import org.apache.hudi.common.testutils.reader.HoodieFileSliceTestUtils;
+import org.apache.hudi.common.testutils.reader.HoodieRecordTestPayload;
+import org.apache.hudi.common.testutils.reader.HoodieTestReaderContext;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA;
+import static
org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.DELETE;
+import static
org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.INSERT;
+import static
org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.UPDATE;
+import static
org.apache.hudi.common.testutils.reader.HoodieFileSliceTestUtils.ROW_KEY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestEventTimeMerging extends HoodieFileGroupReaderTestHarness {
+ @BeforeAll
+ public static void setUp() throws IOException {
+ // Create dedicated merger to avoid current delete logic holes.
+ // TODO: Unify delete logic (HUDI-7240).
+ HoodieAvroRecordMerger merger = new HoodieAvroRecordTestMerger();
+ readerContext = new HoodieTestReaderContext(
+ Option.of(merger),
+ Option.of(HoodieRecordTestPayload.class.getName()));
+
+ // -------------------------------------------------------------
+ // The test logic is as follows:
+ // 1. Base file contains 10 records,
+ // whose key values are from 1 to 10,
+ // whose instant time is "001" and ordering value is 2.
+ // 2. After adding the first log file,
+ // we delete the records with keys from 1 to 5
+ // with ordering value 3.
+ // Current existing keys: [6, 7, 8, 9, 10]
+ // 3. After adding the second log file,
+ // we tried to add the records with keys from 1 to 3 back,
+ // but we cannot since their ordering value is 1 < 3.
+ // Current existing keys: [6, 7, 8, 9, 10]
+ // 4. After adding the third log file,
+ // we tried to delete records with keys from 6 to 8,
+ // but we cannot since their ordering value is 1 < 2.
+ // Current existing keys: [6, 7, 8, 9, 10]
+ // 5. After adding the fourth log file,
+ // we tried to add the records with keys from 1 to 2 back,
+ // and it worked since their ordering value is 4 > 3.
+ // Current existing keys: [1, 2, 6, 7, 8, 9, 10]
+ // -------------------------------------------------------------
+
+ // Specify the key column values for each file.
+ keyRanges = Arrays.asList(
+ new HoodieFileSliceTestUtils.KeyRange(1, 10),
+ new HoodieFileSliceTestUtils.KeyRange(1, 5),
+ new HoodieFileSliceTestUtils.KeyRange(1, 3),
+ new HoodieFileSliceTestUtils.KeyRange(6, 8),
+ new HoodieFileSliceTestUtils.KeyRange(1, 2));
+ // Specify the value of `timestamp` column for each file.
+ timestamps = Arrays.asList(
+ 2L, 3L, 1L, 1L, 4L);
+ // Specify the operation type for each file.
+ operationTypes = Arrays.asList(
+ INSERT, DELETE, UPDATE, DELETE, UPDATE);
+ // Specify the instant time for each file.
+ instantTimes = Arrays.asList(
+ "001", "002", "003", "004", "005");
+ }
+
+ @BeforeEach
+ public void initialize() throws Exception {
+ setTableName(TestEventTimeMerging.class.getName());
+ initPath(tableName);
+ initMetaClient();
+ initTestDataGenerator(new String[]{PARTITION_PATH});
+ testTable = HoodieTestTable.of(metaClient);
+ setUpMockCommits();
+ }
+
+ @Test
+ public void testWithOneLogFile() throws IOException, InterruptedException {
+ // The FileSlice contains a base file and a log file.
+ ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(2);
+ List<String> leftKeysExpected = Arrays.asList("6", "7", "8", "9", "10");
+ List<Long> leftTimestampsExpected = Arrays.asList(2L, 2L, 2L, 2L, 2L);
+ List<String> leftKeysActual = new ArrayList<>();
+ List<Long> leftTimestampsActual = new ArrayList<>();
+ while (iterator.hasNext()) {
+ IndexedRecord record = iterator.next();
+
leftKeysActual.add(record.get(AVRO_SCHEMA.getField(ROW_KEY).pos()).toString());
+ leftTimestampsActual.add((Long)
record.get(AVRO_SCHEMA.getField("timestamp").pos()));
+ }
+ assertEquals(leftKeysExpected, leftKeysActual);
+ assertEquals(leftTimestampsExpected, leftTimestampsActual);
+ }
+
+ @Test
+ public void testWithTwoLogFiles() throws IOException, InterruptedException {
+ // The FileSlice contains a base file and two log files.
+ ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(3);
+ List<String> leftKeysExpected = Arrays.asList("6", "7", "8", "9", "10");
+ List<Long> leftTimestampsExpected = Arrays.asList(2L, 2L, 2L, 2L, 2L);
+ List<String> leftKeysActual = new ArrayList<>();
+ List<Long> leftTimestampsActual = new ArrayList<>();
+ while (iterator.hasNext()) {
+ IndexedRecord record = iterator.next();
+
leftKeysActual.add(record.get(AVRO_SCHEMA.getField(ROW_KEY).pos()).toString());
+ leftTimestampsActual.add((Long)
record.get(AVRO_SCHEMA.getField("timestamp").pos()));
+ }
+ assertEquals(leftKeysExpected, leftKeysActual);
+ assertEquals(leftTimestampsExpected, leftTimestampsActual);
+ }
+
+ @Test
+ public void testWithThreeLogFiles() throws IOException, InterruptedException
{
+ // The FileSlice contains a base file and three log files.
+ ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(4);
+ List<String> leftKeysExpected = Arrays.asList("6", "7", "8", "9", "10");
+ List<Long> leftTimestampsExpected = Arrays.asList(2L, 2L, 2L, 2L, 2L);
+ List<String> leftKeysActual = new ArrayList<>();
+ List<Long> leftTimestampsActual = new ArrayList<>();
+ while (iterator.hasNext()) {
+ IndexedRecord record = iterator.next();
+
leftKeysActual.add(record.get(AVRO_SCHEMA.getField(ROW_KEY).pos()).toString());
+ leftTimestampsActual.add((Long)
record.get(AVRO_SCHEMA.getField("timestamp").pos()));
+ }
+ assertEquals(leftKeysExpected, leftKeysActual);
+ assertEquals(leftTimestampsExpected, leftTimestampsActual);
+ }
+
+ @Test
+ public void testWithFourLogFiles() throws IOException, InterruptedException {
+ // The FileSlice contains a base file and three log files.
+ ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(5);
+ List<String> leftKeysExpected = Arrays.asList("1", "2", "6", "7", "8",
"9", "10");
+ List<Long> leftTimestampsExpected = Arrays.asList(4L, 4L, 2L, 2L, 2L, 2L,
2L);
+ List<String> leftKeysActual = new ArrayList<>();
+ List<Long> leftTimestampsActual = new ArrayList<>();
+ while (iterator.hasNext()) {
+ IndexedRecord record = iterator.next();
+
leftKeysActual.add(record.get(AVRO_SCHEMA.getField(ROW_KEY).pos()).toString());
+ leftTimestampsActual.add((Long)
record.get(AVRO_SCHEMA.getField("timestamp").pos()));
+ }
+ assertEquals(leftKeysExpected, leftKeysActual);
+ assertEquals(leftTimestampsExpected, leftTimestampsActual);
+ }
+}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieAvroRecordTestMerger.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieAvroRecordTestMerger.java
new file mode 100644
index 00000000000..9bb69b6ceb0
--- /dev/null
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieAvroRecordTestMerger.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.testutils.reader;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieAvroRecordMerger;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.io.IOException;
+import java.util.Properties;
+
+public class HoodieAvroRecordTestMerger extends HoodieAvroRecordMerger {
+ @Override
+ public Option<Pair<HoodieRecord, Schema>> merge(
+ HoodieRecord older,
+ Schema oldSchema,
+ HoodieRecord newer,
+ Schema newSchema,
+ TypedProperties props
+ ) throws IOException {
+ Comparable oldOrderingVal = older.getOrderingValue(oldSchema, props);
+ Comparable newOrderingVal = newer.getOrderingValue(newSchema, props);
+
+ // The record with higher ordering value is returned.
+ if (oldOrderingVal == null || newOrderingVal.compareTo(oldOrderingVal) >
0) {
+ return Option.of(Pair.of(newer, newSchema));
+ } else if (newOrderingVal.compareTo(oldOrderingVal) < 0) {
+ return Option.of(Pair.of(older, oldSchema));
+ }
+
+ // When their orderings are the same, we rely on the logic of the payload.
+ return combineAndGetUpdateValue(older, newer, newSchema, props)
+ .map(r -> Pair.of(new HoodieAvroIndexedRecord(r), r.getSchema()));
+ }
+
+ private Option<IndexedRecord> combineAndGetUpdateValue(
+ HoodieRecord older,
+ HoodieRecord newer,
+ Schema schema,
+ Properties props
+ ) throws IOException {
+ Option<IndexedRecord> previousAvroData = older
+ .toIndexedRecord(schema, props)
+ .map(HoodieAvroIndexedRecord::getData);
+
+ if (!previousAvroData.isPresent()) {
+ return newer
+ .toIndexedRecord(schema, props)
+ .map(HoodieAvroIndexedRecord::getData);
+ }
+
+ return ((HoodieAvroRecord) newer)
+ .getData()
+ .combineAndGetUpdateValue(
+ previousAvroData.get(), schema, props);
+ }
+}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestHarness.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestHarness.java
index 793a23bea51..c5dea3c675d 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestHarness.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestHarness.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.read.HoodieFileGroupReader;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestTable;
@@ -37,6 +38,7 @@ import org.junit.jupiter.api.AfterAll;
import java.io.IOException;
import java.util.List;
+import static
org.apache.hudi.common.table.HoodieTableConfig.POPULATE_META_FIELDS;
import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultHadoopConf;
@@ -64,7 +66,8 @@ public class HoodieFileGroupReaderTestHarness extends
HoodieCommonTestHarness {
properties.setProperty(
"hoodie.datasource.write.precombine.field", "timestamp");
hadoopConf = getDefaultHadoopConf();
- readerContext = new HoodieTestReaderContext(Option.empty());
+ readerContext = new HoodieTestReaderContext(
+ Option.empty(), Option.empty());
}
@AfterAll
@@ -102,6 +105,8 @@ public class HoodieFileGroupReaderTestHarness extends
HoodieCommonTestHarness {
FILE_ID
);
+ HoodieTableConfig tableConfig = metaClient.getTableConfig();
+ tableConfig.setValue(POPULATE_META_FIELDS, "false");
HoodieFileGroupReader<IndexedRecord> fileGroupReader =
HoodieFileGroupReaderTestUtils.createFileGroupReader(
fileSliceOpt,
@@ -113,7 +118,7 @@ public class HoodieFileGroupReaderTestHarness extends
HoodieCommonTestHarness {
Long.MAX_VALUE,
properties,
hadoopConf,
- metaClient.getTableConfig(),
+ tableConfig,
readerContext
);
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieRecordTestPayload.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieRecordTestPayload.java
new file mode 100644
index 00000000000..bff49a025fa
--- /dev/null
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieRecordTestPayload.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.testutils.reader;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.model.HoodiePayloadProps;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.hudi.common.util.ConfigUtils.getOrderingField;
+
+public class HoodieRecordTestPayload extends OverwriteWithLatestAvroPayload {
+ public static final String METADATA_EVENT_TIME_KEY =
"metadata.event_time.key";
+ public static final String DELETE_KEY = "hoodie.payload.delete.field";
+ public static final String DELETE_MARKER = "hoodie.payload.delete.marker";
+ private Option<Object> eventTime = Option.empty();
+ private AtomicBoolean isDeleteComputed = new AtomicBoolean(false);
+ private boolean isDefaultRecordPayloadDeleted = false;
+
+ public HoodieRecordTestPayload(GenericRecord record, Comparable orderingVal)
{
+ super(record, orderingVal);
+ }
+
+ @Override
+ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord
currentValue, Schema schema, Properties properties) throws IOException {
+ // The new record is a delete record.
+ if (isDeleted(schema, properties)) {
+ String orderingField = getOrderingField(properties);
+ // If orderingField cannot be found, we can not do the compare, then use
the natural order.
+ if (orderingField == null) {
+ return Option.empty();
+ }
+
+ // Otherwise, we compare their ordering values.
+ Comparable<?> currentOrderingVal = (Comparable<?>)
currentValue.get(currentValue.getSchema().getField(orderingField).pos());
+ if (orderingVal.compareTo(currentOrderingVal) >= 0) {
+ return Option.empty();
+ }
+ return Option.of(currentValue);
+ }
+
+ // If the new record is not a delete record.
+ GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes,
schema);
+
+ // Null check is needed here to support schema evolution. The record in
storage may be from old schema where
+ // the new ordering column might not be present and hence returns null.
+ if (!needUpdatingPersistedRecord(currentValue, incomingRecord,
properties)) {
+ return Option.of(currentValue);
+ }
+
+ /*
+ * We reached a point where the value is disk is older than the incoming
record.
+ */
+ eventTime = updateEventTime(incomingRecord, properties);
+
+ if (!isDeleteComputed.getAndSet(true)) {
+ isDefaultRecordPayloadDeleted = isDeleteRecord(incomingRecord,
properties);
+ }
+ /*
+ * Now check if the incoming record is a delete record.
+ */
+ return isDefaultRecordPayloadDeleted ? Option.empty() :
Option.of(incomingRecord);
+ }
+
+ @Override
+ public Option<IndexedRecord> getInsertValue(Schema schema, Properties
properties) throws IOException {
+ if (recordBytes.length == 0) {
+ return Option.empty();
+ }
+ GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes,
schema);
+ eventTime = updateEventTime(incomingRecord, properties);
+
+ if (!isDeleteComputed.getAndSet(true)) {
+ isDefaultRecordPayloadDeleted = isDeleteRecord(incomingRecord,
properties);
+ }
+ return isDefaultRecordPayloadDeleted ? Option.empty() :
Option.of(incomingRecord);
+ }
+
+ public boolean isDeleted(Schema schema, Properties props) {
+ if (recordBytes.length == 0) {
+ return true;
+ }
+ try {
+ if (!isDeleteComputed.getAndSet(true)) {
+ GenericRecord incomingRecord =
HoodieAvroUtils.bytesToAvro(recordBytes, schema);
+ isDefaultRecordPayloadDeleted = isDeleteRecord(incomingRecord, props);
+ }
+ return isDefaultRecordPayloadDeleted;
+ } catch (IOException e) {
+ throw new HoodieIOException("Deserializing bytes to avro failed ", e);
+ }
+ }
+
+ /**
+ * @param genericRecord instance of {@link GenericRecord} of interest.
+ * @param properties payload related properties
+ * @returns {@code true} if record represents a delete record. {@code false}
otherwise.
+ */
+ protected boolean isDeleteRecord(GenericRecord genericRecord, Properties
properties) {
+ final String deleteKey = properties.getProperty(DELETE_KEY);
+ if (StringUtils.isNullOrEmpty(deleteKey)) {
+ return isDeleteRecord(genericRecord);
+ }
+
+
ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(properties.getProperty(DELETE_MARKER)),
+ () -> DELETE_MARKER + " should be configured with " + DELETE_KEY);
+ // Modify to be compatible with new version Avro.
+ // The new version Avro throws for GenericRecord.get if the field name
+ // does not exist in the schema.
+ if (genericRecord.getSchema().getField(deleteKey) == null) {
+ return false;
+ }
+ Object deleteMarker = genericRecord.get(deleteKey);
+ return deleteMarker != null &&
properties.getProperty(DELETE_MARKER).equals(deleteMarker.toString());
+ }
+
+ private static Option<Object> updateEventTime(GenericRecord record,
Properties properties) {
+ boolean consistentLogicalTimestampEnabled =
Boolean.parseBoolean(properties.getProperty(
+
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
+
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()));
+ String eventTimeField = properties
+ .getProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY);
+ if (eventTimeField == null) {
+ return Option.empty();
+ }
+ return Option.ofNullable(
+ HoodieAvroUtils.getNestedFieldVal(
+ record,
+ eventTimeField,
+ true,
+ consistentLogicalTimestampEnabled)
+ );
+ }
+
+ @Override
+ public Option<Map<String, String>> getMetadata() {
+ Map<String, String> metadata = new HashMap<>();
+ if (eventTime.isPresent()) {
+ metadata.put(METADATA_EVENT_TIME_KEY, String.valueOf(eventTime.get()));
+ }
+ return metadata.isEmpty() ? Option.empty() : Option.of(metadata);
+ }
+
+ protected boolean needUpdatingPersistedRecord(IndexedRecord currentValue,
+ IndexedRecord incomingRecord,
+ Properties properties) {
+ /*
+ * Combining strategy here returns currentValue on disk if incoming record
is older.
+ * The incoming record can be either a delete (sent as an upsert with
_hoodie_is_deleted set to true)
+ * or an insert/update record. In any case, if it is older than the record
in disk, the currentValue
+ * in disk is returned (to be rewritten with new commit time).
+ *
+ * NOTE: Deletes sent via EmptyHoodieRecordPayload and/or Delete operation
type do not hit this code path
+ * and need to be dealt with separately.
+ */
+ String orderingField = getOrderingField(properties);
+ if (orderingField == null) {
+ return true;
+ }
+ boolean consistentLogicalTimestampEnabled =
Boolean.parseBoolean(properties.getProperty(
+
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
+
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()));
+ Object persistedOrderingVal =
HoodieAvroUtils.getNestedFieldVal((GenericRecord) currentValue,
+ orderingField,
+ true, consistentLogicalTimestampEnabled);
+ Comparable incomingOrderingVal = (Comparable)
HoodieAvroUtils.getNestedFieldVal((GenericRecord) incomingRecord,
+ orderingField,
+ true, consistentLogicalTimestampEnabled);
+ return persistedOrderingVal == null || ((Comparable)
persistedOrderingVal).compareTo(incomingOrderingVal) <= 0;
+ }
+}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
index 0546396a452..ebfda308c07 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
@@ -54,9 +54,13 @@ import static
org.apache.hudi.common.testutils.reader.HoodieFileSliceTestUtils.R
public class HoodieTestReaderContext extends
HoodieReaderContext<IndexedRecord> {
private Option<HoodieRecordMerger> customMerger;
+ private Option<String> payloadClass;
- public HoodieTestReaderContext(Option<HoodieRecordMerger> customMerger) {
+ public HoodieTestReaderContext(
+ Option<HoodieRecordMerger> customMerger,
+ Option<String> payloadClass) {
this.customMerger = customMerger;
+ this.payloadClass = payloadClass;
}
@Override
@@ -134,12 +138,16 @@ public class HoodieTestReaderContext extends
HoodieReaderContext<IndexedRecord>
Option<IndexedRecord> recordOpt,
Map<String, Object> metadataMap
) {
+ String appliedPayloadClass =
+ payloadClass.isPresent()
+ ? payloadClass.get()
+ : DefaultHoodieRecordPayload.class.getName();
if (!recordOpt.isPresent()) {
return SpillableMapUtils.generateEmptyPayload(
(String) metadataMap.get(INTERNAL_META_RECORD_KEY),
(String) metadataMap.get(INTERNAL_META_PARTITION_PATH),
(Comparable<?>) metadataMap.get(INTERNAL_META_ORDERING_FIELD),
- DefaultHoodieRecordPayload.class.getName());
+ appliedPayloadClass);
}
return new HoodieAvroIndexedRecord(recordOpt.get());
}
@@ -155,7 +163,9 @@ public class HoodieTestReaderContext extends
HoodieReaderContext<IndexedRecord>
}
@Override
- public ClosableIterator<IndexedRecord>
mergeBootstrapReaders(ClosableIterator<IndexedRecord> skeletonFileIterator,
ClosableIterator<IndexedRecord> dataFileIterator) {
+ public ClosableIterator<IndexedRecord> mergeBootstrapReaders(
+ ClosableIterator<IndexedRecord> skeletonFileIterator,
+ ClosableIterator<IndexedRecord> dataFileIterator) {
return null;
}