This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 90fe9b2240a6 [HUDI-9682] Add support to FileGroupRecordBuffer to
assist w/ cow merge handle migration (#13670)
90fe9b2240a6 is described below
commit 90fe9b2240a6a0aa94cfb14cfa98fe109155d24e
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Mon Aug 4 14:10:09 2025 -0700
[HUDI-9682] Add support to FileGroupRecordBuffer to assist w/ cow merge
handle migration (#13670)
Co-authored-by: danny0405 <[email protected]>
---
.../common/table/read/HoodieFileGroupReader.java | 15 +-
.../apache/hudi/common/table/read/InputSplit.java | 36 ++-
.../buffer/DefaultFileGroupRecordBufferLoader.java | 5 +-
.../read/buffer/FileGroupRecordBufferLoader.java | 4 +
...a => StreamingFileGroupRecordBufferLoader.java} | 56 ++--
.../read/buffer/BaseTestFileGroupRecordBuffer.java | 228 +++++++++++++++
.../buffer/TestFileGroupRecordBufferLoader.java | 118 ++++++++
.../buffer/TestKeyBasedFileGroupRecordBuffer.java | 309 ++++++++++++---------
.../TestSortedKeyBasedFileGroupRecordBuffer.java | 92 +++++-
...TestStreamingKeyBasedFileGroupRecordBuffer.java | 221 +++++++++++++++
10 files changed, 891 insertions(+), 193 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index 86a91a340db3..2a57d8b87376 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -34,6 +34,7 @@ import org.apache.hudi.common.table.PartitionPathParser;
import org.apache.hudi.common.table.read.buffer.FileGroupRecordBufferLoader;
import org.apache.hudi.common.table.read.buffer.HoodieFileGroupRecordBuffer;
import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.Either;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
@@ -53,6 +54,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.function.UnaryOperator;
import java.util.stream.Stream;
@@ -128,7 +130,7 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
*/
private void initRecordIterators() throws IOException {
ClosableIterator<T> iter = makeBaseFileIterator();
- if (inputSplit.getLogFiles().isEmpty()) {
+ if (inputSplit.hasNoRecordsToMerge()) {
this.baseFileIterator = new CloseableMappingIterator<>(iter,
readerContext::seal);
} else {
this.baseFileIterator = iter;
@@ -361,6 +363,7 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
private String partitionPath;
private long start = 0;
private long length = Long.MAX_VALUE;
+ private Iterator<BufferedRecord> recordIterator;
private boolean shouldUseRecordPosition = false;
private boolean allowInflightInstants = false;
private boolean emitDelete;
@@ -396,6 +399,12 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
return this;
}
+ public Builder<T> withRecordIterator(Iterator<BufferedRecord>
recordIterator) {
+ this.recordIterator = recordIterator;
+ this.recordBufferLoader =
FileGroupRecordBufferLoader.createStreamingRecordsBufferLoader();
+ return this;
+ }
+
public Builder<T> withPartitionPath(String partitionPath) {
this.partitionPath = partitionPath;
return this;
@@ -491,7 +500,6 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
ValidationUtils.checkArgument(requestedSchema != null, "Requested schema
is required");
ValidationUtils.checkArgument(props != null, "Props is required");
ValidationUtils.checkArgument(baseFileOption != null, "Base file option
is required");
- ValidationUtils.checkArgument(logFiles != null, "Log files stream is
required");
ValidationUtils.checkArgument(partitionPath != null, "Partition path is
required");
if (enableOptimizedLogBlockScan == null) {
// check to see if props contains this key if not explicitly set
@@ -510,7 +518,8 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
.allowInflightInstants(allowInflightInstants)
.enableOptimizedLogBlockScan(enableOptimizedLogBlockScan)
.build();
- InputSplit inputSplit = new InputSplit(baseFileOption, logFiles,
partitionPath, start, length);
+ InputSplit inputSplit = new InputSplit(baseFileOption, recordIterator !=
null ? Either.right(recordIterator) : Either.left(logFiles == null ?
Stream.empty() : logFiles),
+ partitionPath, start, length);
return new HoodieFileGroupReader<>(
readerContext, storage, tablePath, latestCommitTime, dataSchema,
requestedSchema, internalSchemaOpt, hoodieTableMetaClient,
props, readerParameters, inputSplit, fileGroupUpdateCallback,
recordBufferLoader);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/InputSplit.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/InputSplit.java
index 9ee6208354d8..39542d42b2e0 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/InputSplit.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/InputSplit.java
@@ -19,13 +19,16 @@
package org.apache.hudi.common.table.read;
-import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
+import org.apache.hudi.common.util.Either;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -37,32 +40,37 @@ import java.util.stream.Stream;
public class InputSplit {
private final Option<HoodieBaseFile> baseFileOption;
private final List<HoodieLogFile> logFiles;
+ private final Option<Iterator<BufferedRecord>> recordIterator;
private final String partitionPath;
// Byte offset to start reading from the base file
private final long start;
// Length of bytes to read from the base file
private final long length;
- InputSplit(Option<HoodieBaseFile> baseFileOption, Stream<HoodieLogFile>
logFiles, String partitionPath, long start, long length) {
+ InputSplit(Option<HoodieBaseFile> baseFileOption,
+ Either<Stream<HoodieLogFile>, Iterator<BufferedRecord>>
recordsToMerge,
+ String partitionPath, long start, long length) {
this.baseFileOption = baseFileOption;
- this.logFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator())
- .filter(logFile ->
!logFile.getFileName().endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
- .collect(Collectors.toList());
+ if (recordsToMerge.isLeft()) {
+ this.logFiles =
recordsToMerge.asLeft().sorted(HoodieLogFile.getLogFileComparator())
+ .filter(logFile ->
!logFile.getFileName().endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
+ .collect(Collectors.toList());
+ this.recordIterator = Option.empty();
+ } else {
+ this.logFiles = Collections.emptyList();
+ this.recordIterator = Option.of(recordsToMerge.asRight());
+ }
this.partitionPath = partitionPath;
this.start = start;
this.length = length;
}
- static InputSplit fromFileSlice(FileSlice fileSlice, long start, long
length) {
- return new InputSplit(fileSlice.getBaseFile(), fileSlice.getLogFiles(),
fileSlice.getPartitionPath(),
- start, length);
- }
-
public Option<HoodieBaseFile> getBaseFileOption() {
return baseFileOption;
}
public List<HoodieLogFile> getLogFiles() {
+ ValidationUtils.checkArgument(recordIterator.isEmpty(), "Log files are not
initialized");
return logFiles;
}
@@ -81,4 +89,12 @@ public class InputSplit {
public boolean isParquetBaseFile() {
return baseFileOption.map(baseFile ->
HoodieFileFormat.fromFileExtension(baseFile.getStoragePath().getFileExtension())
== HoodieFileFormat.PARQUET).orElse(false);
}
+
+ public boolean hasNoRecordsToMerge() {
+ return this.logFiles.isEmpty() && recordIterator.isEmpty();
+ }
+
+ public Iterator<BufferedRecord> getRecordIterator() {
+ return this.recordIterator.orElseThrow(() -> new
IllegalStateException("The record iterator has not been setup"));
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/DefaultFileGroupRecordBufferLoader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/DefaultFileGroupRecordBufferLoader.java
index b30dd12d976b..66a5d751529b 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/DefaultFileGroupRecordBufferLoader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/DefaultFileGroupRecordBufferLoader.java
@@ -37,6 +37,7 @@ import java.util.List;
/**
* Default implementation of {@link FileGroupRecordBufferLoader} that
initializes a buffer based on the reader parameters.
+ *
* @param <T> the engine specific record type
*/
class DefaultFileGroupRecordBufferLoader<T> extends
LogScanningRecordBufferLoader implements FileGroupRecordBufferLoader<T> {
@@ -59,7 +60,6 @@ class DefaultFileGroupRecordBufferLoader<T> extends
LogScanningRecordBufferLoade
ReaderParameters readerParameters,
HoodieReadStats readStats,
Option<BaseFileUpdateCallback<T>> fileGroupUpdateCallback) {
-
boolean isSkipMerge = ConfigUtils.getStringWithAltKeys(props,
HoodieReaderConfig.MERGE_TYPE,
true).equalsIgnoreCase(HoodieReaderConfig.REALTIME_SKIP_MERGE);
PartialUpdateMode partialUpdateMode =
hoodieTableMetaClient.getTableConfig().getPartialUpdateMode();
UpdateProcessor<T> updateProcessor = UpdateProcessor.create(readStats,
readerContext, readerParameters.emitDeletes(), fileGroupUpdateCallback);
@@ -78,6 +78,7 @@ class DefaultFileGroupRecordBufferLoader<T> extends
LogScanningRecordBufferLoade
recordBuffer = new KeyBasedFileGroupRecordBuffer<>(
readerContext, hoodieTableMetaClient, readerContext.getMergeMode(),
partialUpdateMode, props, orderingFieldNames, updateProcessor);
}
- return Pair.of(recordBuffer, scanLogFiles(readerContext, storage,
inputSplit, hoodieTableMetaClient, props, readerParameters, readStats,
recordBuffer));
+ return Pair.of(recordBuffer, scanLogFiles(readerContext, storage,
inputSplit, hoodieTableMetaClient, props,
+ readerParameters, readStats, recordBuffer));
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBufferLoader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBufferLoader.java
index c2d4834d9eff..983329dd6288 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBufferLoader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBufferLoader.java
@@ -52,6 +52,10 @@ public interface FileGroupRecordBufferLoader<T> {
return DefaultFileGroupRecordBufferLoader.getInstance();
}
+ static <T> FileGroupRecordBufferLoader<T>
createStreamingRecordsBufferLoader() {
+ return StreamingFileGroupRecordBufferLoader.getInstance();
+ }
+
static <T> ReusableFileGroupRecordBufferLoader<T>
createReusable(HoodieReaderContext<T> readerContextWithoutFilters) {
return new
ReusableFileGroupRecordBufferLoader<>(readerContextWithoutFilters);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/DefaultFileGroupRecordBufferLoader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/StreamingFileGroupRecordBufferLoader.java
similarity index 54%
copy from
hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/DefaultFileGroupRecordBufferLoader.java
copy to
hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/StreamingFileGroupRecordBufferLoader.java
index b30dd12d976b..8af9d3e3e5d5 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/DefaultFileGroupRecordBufferLoader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/StreamingFileGroupRecordBufferLoader.java
@@ -18,66 +18,64 @@
package org.apache.hudi.common.table.read.buffer;
-import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.PartialUpdateMode;
import org.apache.hudi.common.table.read.BaseFileUpdateCallback;
+import org.apache.hudi.common.table.read.BufferedRecord;
import org.apache.hudi.common.table.read.HoodieReadStats;
import org.apache.hudi.common.table.read.InputSplit;
import org.apache.hudi.common.table.read.ReaderParameters;
import org.apache.hudi.common.table.read.UpdateProcessor;
-import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.storage.HoodieStorage;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
/**
- * Default implementation of {@link FileGroupRecordBufferLoader} that
initializes a buffer based on the reader parameters.
- * @param <T> the engine specific record type
+ * Records based {@link FileGroupRecordBuffer} which takes in a map of records
to be merged with a base file of interest.
+ * This will be used in write paths for COW merge cases.
+ * @param <T> Engine native presentation of the record.
*/
-class DefaultFileGroupRecordBufferLoader<T> extends
LogScanningRecordBufferLoader implements FileGroupRecordBufferLoader<T> {
- private static final DefaultFileGroupRecordBufferLoader INSTANCE = new
DefaultFileGroupRecordBufferLoader<>();
+public class StreamingFileGroupRecordBufferLoader<T> implements
FileGroupRecordBufferLoader<T> {
+ private static final StreamingFileGroupRecordBufferLoader INSTANCE = new
StreamingFileGroupRecordBufferLoader<>();
- static <T> DefaultFileGroupRecordBufferLoader<T> getInstance() {
+ static <T> StreamingFileGroupRecordBufferLoader<T> getInstance() {
return INSTANCE;
}
- private DefaultFileGroupRecordBufferLoader() {
- }
-
@Override
- public Pair<HoodieFileGroupRecordBuffer<T>, List<String>>
getRecordBuffer(HoodieReaderContext<T> readerContext,
-
HoodieStorage storage,
-
InputSplit inputSplit,
-
List<String> orderingFieldNames,
-
HoodieTableMetaClient hoodieTableMetaClient,
-
TypedProperties props,
-
ReaderParameters readerParameters,
-
HoodieReadStats readStats,
+ public Pair<HoodieFileGroupRecordBuffer<T>, List<String>>
getRecordBuffer(HoodieReaderContext<T> readerContext, HoodieStorage storage,
InputSplit inputSplit,
+
List<String> orderingFieldNames, HoodieTableMetaClient hoodieTableMetaClient,
+
TypedProperties props, ReaderParameters readerParameters, HoodieReadStats
readStats,
Option<BaseFileUpdateCallback<T>> fileGroupUpdateCallback) {
-
- boolean isSkipMerge = ConfigUtils.getStringWithAltKeys(props,
HoodieReaderConfig.MERGE_TYPE,
true).equalsIgnoreCase(HoodieReaderConfig.REALTIME_SKIP_MERGE);
PartialUpdateMode partialUpdateMode =
hoodieTableMetaClient.getTableConfig().getPartialUpdateMode();
UpdateProcessor<T> updateProcessor = UpdateProcessor.create(readStats,
readerContext, readerParameters.emitDeletes(), fileGroupUpdateCallback);
FileGroupRecordBuffer<T> recordBuffer;
- if (isSkipMerge) {
- recordBuffer = new UnmergedFileGroupRecordBuffer<>(
- readerContext, hoodieTableMetaClient, readerContext.getMergeMode(),
partialUpdateMode, props, readStats);
- } else if (readerParameters.sortOutputs()) {
+ if (readerParameters.sortOutputs()) {
recordBuffer = new SortedKeyBasedFileGroupRecordBuffer<>(
readerContext, hoodieTableMetaClient, readerContext.getMergeMode(),
partialUpdateMode, props, orderingFieldNames, updateProcessor);
- } else if (readerParameters.useRecordPosition() &&
inputSplit.getBaseFileOption().isPresent()) {
- recordBuffer = new PositionBasedFileGroupRecordBuffer<>(
- readerContext, hoodieTableMetaClient, readerContext.getMergeMode(),
partialUpdateMode, inputSplit.getBaseFileOption().get().getCommitTime(), props,
- orderingFieldNames, updateProcessor);
} else {
recordBuffer = new KeyBasedFileGroupRecordBuffer<>(
readerContext, hoodieTableMetaClient, readerContext.getMergeMode(),
partialUpdateMode, props, orderingFieldNames, updateProcessor);
}
- return Pair.of(recordBuffer, scanLogFiles(readerContext, storage,
inputSplit, hoodieTableMetaClient, props, readerParameters, readStats,
recordBuffer));
+
+ Iterator<BufferedRecord> recordIterator = inputSplit.getRecordIterator();
+
+ while (recordIterator.hasNext()) {
+ BufferedRecord bufferedRecord = recordIterator.next();
+ try {
+ recordBuffer.processNextDataRecord(bufferedRecord,
bufferedRecord.getRecordKey());
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to process next buffered record",
e);
+ }
+ }
+ return Pair.of(recordBuffer, Collections.emptyList());
}
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/BaseTestFileGroupRecordBuffer.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/BaseTestFileGroupRecordBuffer.java
new file mode 100644
index 000000000000..a7ba45b29c37
--- /dev/null
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/BaseTestFileGroupRecordBuffer.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read.buffer;
+
+import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.BaseAvroPayload;
+import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.PartialUpdateMode;
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.DeleteContext;
+import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
+import org.apache.hudi.common.table.read.HoodieReadStats;
+import org.apache.hudi.common.table.read.InputSplit;
+import org.apache.hudi.common.table.read.ReaderParameters;
+import org.apache.hudi.common.table.read.UpdateProcessor;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.internal.schema.InternalSchema;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class BaseTestFileGroupRecordBuffer {
+
+ protected static final Schema SCHEMA = Schema.createRecord("test_record",
null, "namespace", false,
+ Arrays.asList(
+ new Schema.Field("record_key", Schema.create(Schema.Type.STRING)),
+ new Schema.Field("counter", Schema.create(Schema.Type.INT)),
+ new Schema.Field("ts", Schema.create(Schema.Type.LONG))));
+
+ protected static GenericRecord createTestRecord(String recordKey, int
counter, long ts) {
+ GenericRecord record = new GenericData.Record(SCHEMA);
+ record.put("record_key", recordKey);
+ record.put("counter", counter);
+ record.put("ts", ts);
+ return record;
+ }
+
+ protected static List<BufferedRecord>
convertToBufferedRecordsList(List<IndexedRecord> indexedRecords,
+
HoodieReaderContext<IndexedRecord> readerContext,
+
TypedProperties props, String[] orderingFieldNames) {
+ return indexedRecords.stream().map(rec -> {
+ HoodieAvroIndexedRecord indexedRecord = new HoodieAvroIndexedRecord(new
HoodieKey(rec.get(0).toString(), ""), rec, null);
+ return (BufferedRecord)
BufferedRecord.forRecordWithContext(indexedRecord,
readerContext.getSchemaHandler().getRequestedSchema(),
+ readerContext.getRecordContext(), props, orderingFieldNames);
+ }).collect(Collectors.toList());
+ }
+
+ protected static List<BufferedRecord>
convertToBufferedRecordsListForDeletes(List<IndexedRecord> indexedRecords,
boolean defaultOrderingValue) {
+ return indexedRecords.stream().map(rec -> {
+ return
+ (BufferedRecord)
BufferedRecord.forDeleteRecord(DeleteRecord.create(new
HoodieKey(rec.get(0).toString(), ""), defaultOrderingValue ? 0 : (Comparable)
rec.get(2)),
+ defaultOrderingValue ? 0 : (Comparable) rec.get(2));
+ }).collect(Collectors.toList());
+ }
+
+ protected static KeyBasedFileGroupRecordBuffer<IndexedRecord>
buildKeyBasedFileGroupRecordBuffer(HoodieReaderContext<IndexedRecord>
readerContext,
+
HoodieTableConfig tableConfig,
+
HoodieReadStats readStats,
+
HoodieRecordMerger recordMerger,
+
RecordMergeMode recordMergeMode,
+
List<String> orderingFieldNames,
+
Option<Pair<String, String>> deleteMarkerKeyValue) {
+ TypedProperties props = new TypedProperties();
+ deleteMarkerKeyValue.ifPresent(markerKeyValue -> {
+ props.setProperty(DELETE_KEY, markerKeyValue.getLeft());
+ props.setProperty(DELETE_MARKER, markerKeyValue.getRight());
+ });
+ FileGroupReaderSchemaHandler<IndexedRecord> fileGroupReaderSchemaHandler =
mock(FileGroupReaderSchemaHandler.class);
+ when(fileGroupReaderSchemaHandler.getRequiredSchema()).thenReturn(SCHEMA);
+
when(fileGroupReaderSchemaHandler.getInternalSchema()).thenReturn(InternalSchema.getEmptyInternalSchema());
+ when(fileGroupReaderSchemaHandler.getDeleteContext()).thenReturn(new
DeleteContext(props, SCHEMA));
+ readerContext.setSchemaHandler(fileGroupReaderSchemaHandler);
+ return buildKeyBasedFileGroupRecordBuffer(readerContext, tableConfig,
readStats, recordMerger, recordMergeMode, orderingFieldNames, props,
+ Option.empty());
+ }
+
+ protected static KeyBasedFileGroupRecordBuffer<IndexedRecord>
buildKeyBasedFileGroupRecordBuffer(HoodieReaderContext<IndexedRecord>
readerContext,
+
HoodieTableConfig tableConfig,
+
HoodieReadStats readStats,
+
HoodieRecordMerger recordMerger,
+
RecordMergeMode recordMergeMode,
+
List<String> orderingFieldNames,
+
TypedProperties props,
+
Option<Iterator<BufferedRecord>> fileGroupRecordBufferItrOpt)
{
+
+ readerContext.setRecordMerger(Option.ofNullable(recordMerger));
+ HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class,
RETURNS_DEEP_STUBS);
+ when(mockMetaClient.getTableConfig()).thenReturn(tableConfig);
+ UpdateProcessor<IndexedRecord> updateProcessor =
UpdateProcessor.create(readStats, readerContext, false, Option.empty());
+
+ if (fileGroupRecordBufferItrOpt.isEmpty()) {
+ return new KeyBasedFileGroupRecordBuffer<>(
+ readerContext, mockMetaClient, recordMergeMode,
PartialUpdateMode.NONE, props, orderingFieldNames, updateProcessor);
+ } else {
+ FileGroupRecordBufferLoader recordBufferLoader =
FileGroupRecordBufferLoader.createStreamingRecordsBufferLoader();
+ InputSplit inputSplit = mock(InputSplit.class);
+ when(inputSplit.hasNoRecordsToMerge()).thenReturn(false);
+
when(inputSplit.getRecordIterator()).thenReturn(fileGroupRecordBufferItrOpt.get());
+ ReaderParameters readerParameters = mock(ReaderParameters.class);
+ when(readerParameters.sortOutputs()).thenReturn(false);
+ return (KeyBasedFileGroupRecordBuffer<IndexedRecord>)
recordBufferLoader.getRecordBuffer(readerContext, mockMetaClient.getStorage(),
inputSplit,
+ orderingFieldNames, mockMetaClient, props, readerParameters,
readStats, Option.empty()).getKey();
+ }
+ }
+
+ protected static List<IndexedRecord>
getActualRecords(FileGroupRecordBuffer<IndexedRecord> fileGroupRecordBuffer)
throws IOException {
+ List<IndexedRecord> actualRecords = new ArrayList<>();
+ while (fileGroupRecordBuffer.hasNext()) {
+ actualRecords.add(fileGroupRecordBuffer.next());
+ }
+ return actualRecords;
+ }
+
+ /**
+ * A custom payload implementation for testing purposes that marks records
as deleted once the counter exceeds 2.
+ * During the merge, it will combine the counter values.
+ */
+ public static class CustomPayload extends BaseAvroPayload
+ implements HoodieRecordPayload<CustomPayload> {
+ private final GenericRecord payloadRecord;
+
+ public CustomPayload(GenericRecord record, Comparable orderingVal) {
+ super(record, orderingVal);
+ this.payloadRecord = record;
+ }
+
+ @Override
+ public TestKeyBasedFileGroupRecordBuffer.CustomPayload
preCombine(TestKeyBasedFileGroupRecordBuffer.CustomPayload oldValue) {
+ return this;
+ }
+
+ @Override
+ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord
currentValue, Schema schema) throws IOException {
+ if (currentValue.get(2).equals(payloadRecord.get(2))) {
+ // If the timestamps are the same, we do not update
+ return Option.of(currentValue);
+ }
+ int result = (int) currentValue.get(1) + (int) payloadRecord.get(1);
+ if (result > 2) {
+ return Option.empty();
+ }
+ return Option.of(createTestRecord(currentValue.get(0).toString(),
result, (long) payloadRecord.get(2)));
+ }
+
+ @Override
+ public Option<IndexedRecord> getInsertValue(Schema schema) throws
IOException {
+ return Option.of(payloadRecord);
+ }
+
+ @Override
+ public Comparable<?> getOrderingValue() {
+ return null;
+ }
+ }
+
+ public static class CustomMerger implements HoodieRecordMerger {
+ private final String strategy = UUID.randomUUID().toString();
+
+ @Override
+ public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws
IOException {
+ GenericRecord olderData = (GenericRecord) older.getData();
+ GenericRecord newerData = (GenericRecord) newer.getData();
+ if (olderData.get(2).equals(newerData.get(2))) {
+ // If the timestamps are the same, we do not update
+ return Option.of(Pair.of(older, oldSchema));
+ }
+ int result = (int) olderData.get(1) + (int) newerData.get(1);
+ if (result > 2) {
+ return Option.empty();
+ }
+ HoodieKey hoodieKey = older.getKey();
+ return Option.of(Pair.of(new
HoodieAvroIndexedRecord(createTestRecord(hoodieKey.getRecordKey(), result,
(long) newerData.get(2))), SCHEMA));
+ }
+
+ @Override
+ public HoodieRecord.HoodieRecordType getRecordType() {
+ return HoodieRecord.HoodieRecordType.AVRO;
+ }
+
+ @Override
+ public String getMergingStrategy() {
+ return strategy;
+ }
+ }
+}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestFileGroupRecordBufferLoader.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestFileGroupRecordBufferLoader.java
new file mode 100644
index 000000000000..3289614165af
--- /dev/null
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestFileGroupRecordBufferLoader.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read.buffer;
+
+import org.apache.hudi.avro.HoodieAvroReaderContext;
+import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.read.BaseFileUpdateCallback;
+import org.apache.hudi.common.table.read.DeleteContext;
+import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
+import org.apache.hudi.common.table.read.HoodieReadStats;
+import org.apache.hudi.common.table.read.InputSplit;
+import org.apache.hudi.common.table.read.ReaderParameters;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StorageConfiguration;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestFileGroupRecordBufferLoader extends
BaseTestFileGroupRecordBuffer {
+
+ @ParameterizedTest
+ @CsvSource({"KeyBasedFileGroupRecordBuffer,true",
"KeyBasedFileGroupRecordBuffer,false",
"SortedKeyBasedFileGroupRecordBuffer,true",
+ "SortedKeyBasedFileGroupRecordBuffer,false",
"PositionBasedFileGroupRecordBuffer,false"})
+ public void testDefaultFileGroupBufferRecordLoader(String
fileGroupRecordBufferType, boolean testRecordsBased) {
+ FileGroupRecordBufferLoader fileGroupRecordBufferLoader = !testRecordsBased
+ ? FileGroupRecordBufferLoader.createDefault()
+ : FileGroupRecordBufferLoader.createStreamingRecordsBufferLoader();
+ HoodieReadStats readStats = new HoodieReadStats();
+ HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
+
when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.COMMIT_TIME_ORDERING);
+ when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.NINE);
+ when(tableConfig.getPreCombineFieldsStr()).thenReturn(Option.empty());
+ when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[]
{"record_key"}));
+ StorageConfiguration<?> storageConfiguration =
mock(StorageConfiguration.class);
+ HoodieReaderContext<IndexedRecord> readerContext = new
HoodieAvroReaderContext(storageConfiguration, tableConfig, Option.empty(),
Option.empty());
+ readerContext.initRecordMerger(new TypedProperties());
+ FileGroupReaderSchemaHandler<IndexedRecord> fileGroupReaderSchemaHandler =
mock(FileGroupReaderSchemaHandler.class);
+ when(fileGroupReaderSchemaHandler.getRequiredSchema()).thenReturn(SCHEMA);
+
when(fileGroupReaderSchemaHandler.getInternalSchema()).thenReturn(InternalSchema.getEmptyInternalSchema());
+ DeleteContext deleteContext = mock(DeleteContext.class);
+
when(deleteContext.getCustomDeleteMarkerKeyValue()).thenReturn(Option.empty());
+ when(deleteContext.getHoodieOperationPos()).thenReturn(-1);
+
when(fileGroupReaderSchemaHandler.getDeleteContext()).thenReturn(deleteContext);
+ readerContext.setSchemaHandler(fileGroupReaderSchemaHandler);
+ readerContext.setRecordMerger(Option.ofNullable(null));
+ HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class,
RETURNS_DEEP_STUBS);
+ when(mockMetaClient.getTableConfig()).thenReturn(tableConfig);
+ HoodieStorage storage = mock(HoodieStorage.class);
+ when(mockMetaClient.getStorage()).thenReturn(storage);
+ InputSplit inputSplit = mock(InputSplit.class);
+ if (testRecordsBased) {
+
when(inputSplit.getRecordIterator()).thenReturn(Collections.emptyIterator());
+ }
+ ReaderParameters readerParameters = mock(ReaderParameters.class);
+ if (fileGroupRecordBufferType.contains("Sorted")) {
+ when(readerParameters.sortOutputs()).thenReturn(true);
+ }
+ if (fileGroupRecordBufferType.contains("Position")) {
+ HoodieBaseFile baseFile = mock(HoodieBaseFile.class);
+ when(inputSplit.getBaseFileOption()).thenReturn(Option.of(baseFile));
+ when(readerParameters.useRecordPosition()).thenReturn(true);
+ }
+
+ Option<BaseFileUpdateCallback> fileGroupUpdateCallback = Option.empty();
+
+ HoodieFileGroupRecordBuffer fileGroupRecordBuffer =
(HoodieFileGroupRecordBuffer) fileGroupRecordBufferLoader
+ .getRecordBuffer(readerContext, storage, inputSplit,
Collections.singletonList("ts"),
+ mockMetaClient, new TypedProperties(), readerParameters,
readStats, fileGroupUpdateCallback).getLeft();
+
+ switch (fileGroupRecordBufferType) {
+ case "KeyBasedFileGroupRecordBuffer":
+ assertTrue(fileGroupRecordBuffer instanceof
KeyBasedFileGroupRecordBuffer);
+ break;
+ case "SortedKeyBasedFileGroupRecordBuffer":
+ assertTrue(fileGroupRecordBuffer instanceof
SortedKeyBasedFileGroupRecordBuffer);
+ break;
+ case "PositionBasedFileGroupRecordBuffer":
+ assertTrue(fileGroupRecordBuffer instanceof
PositionBasedFileGroupRecordBuffer);
+ break;
+ default:
+ throw new HoodieIOException("Undefined type");
+ }
+ }
+}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestKeyBasedFileGroupRecordBuffer.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestKeyBasedFileGroupRecordBuffer.java
index 1cae85aca33c..d93543c691f6 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestKeyBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestKeyBasedFileGroupRecordBuffer.java
@@ -23,72 +23,62 @@ import org.apache.hudi.avro.HoodieAvroReaderContext;
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieReaderContext;
-import org.apache.hudi.common.model.BaseAvroPayload;
import org.apache.hudi.common.model.DeleteRecord;
-import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroRecordMerger;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.PartialUpdateMode;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
-import org.apache.hudi.common.table.read.DeleteContext;
+import org.apache.hudi.common.table.read.BufferedRecord;
import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
import org.apache.hudi.common.table.read.HoodieReadStats;
-import org.apache.hudi.common.table.read.UpdateProcessor;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.storage.StorageConfiguration;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.junit.jupiter.api.Test;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.UUID;
import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-class TestKeyBasedFileGroupRecordBuffer {
- private static final Schema SCHEMA = Schema.createRecord("test_record",
null, "namespace", false,
- Arrays.asList(
- new Schema.Field("record_key", Schema.create(Schema.Type.STRING)),
- new Schema.Field("counter", Schema.create(Schema.Type.INT)),
- new Schema.Field("ts", Schema.create(Schema.Type.LONG))));
+class TestKeyBasedFileGroupRecordBuffer extends BaseTestFileGroupRecordBuffer {
private final IndexedRecord testRecord1 = createTestRecord("1", 1, 1L);
private final IndexedRecord testRecord1UpdateWithSameTime =
createTestRecord("1", 2, 1L);
private final IndexedRecord testRecord2 = createTestRecord("2", 1, 1L);
private final IndexedRecord testRecord2Update = createTestRecord("2", 1, 2L);
private final IndexedRecord testRecord2EarlierUpdate = createTestRecord("2",
1, 0L);
private final IndexedRecord testRecord2Delete = createTestRecord("2", 2, 3L);
+ private final IndexedRecord testRecord2CustomPayloadExpected =
createTestRecord("2", 2, 2L);
private final IndexedRecord testRecord3 = createTestRecord("3", 1, 1L);
private final IndexedRecord testRecord3Update = createTestRecord("3", 1, 2L);
+ private final IndexedRecord testRecord3UpdateCustomPayloadExpected =
createTestRecord("3", 2, 2L);
private final IndexedRecord testRecord3DeleteByFieldValue =
createTestRecord("3", 3, 1L);
private final IndexedRecord testRecord4 = createTestRecord("4", 2, 1L);
private final IndexedRecord testRecord4Update = createTestRecord("4", 1, 2L);
+ private final IndexedRecord testRecord4EarlierUpdate = createTestRecord("4",
1, 0L);
+ private final IndexedRecord testRecord5 = createTestRecord("5", 1, 1L);
+ private final IndexedRecord testRecord5DeleteByCustomMarker =
createTestRecord("5", 3, 2L);
+ private final IndexedRecord testRecord6 = createTestRecord("6", 1, 5L);
+ private final IndexedRecord testRecord6DeleteByCustomMarker =
createTestRecord("6", 3, 2L);
+ private final IndexedRecord testRecord7 = createTestRecord("7", 1, 5L);
@Test
void readWithEventTimeOrdering() throws IOException {
HoodieReadStats readStats = new HoodieReadStats();
HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
- when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new
String[]{"record_key"}));
+ when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[]
{"record_key"}));
StorageConfiguration<?> storageConfiguration =
mock(StorageConfiguration.class);
HoodieReaderContext<IndexedRecord> readerContext = new
HoodieAvroReaderContext(storageConfiguration, tableConfig, Option.empty(),
Option.empty());
KeyBasedFileGroupRecordBuffer<IndexedRecord> fileGroupRecordBuffer =
buildKeyBasedFileGroupRecordBuffer(readerContext, tableConfig, readStats, null,
@@ -115,7 +105,7 @@ class TestKeyBasedFileGroupRecordBuffer {
void readWithEventTimeOrderingAndDeleteBlock() throws IOException {
HoodieReadStats readStats = new HoodieReadStats();
HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
- when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new
String[]{"record_key"}));
+ when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[]
{"record_key"}));
StorageConfiguration<?> storageConfiguration =
mock(StorageConfiguration.class);
HoodieReaderContext<IndexedRecord> readerContext = new
HoodieAvroReaderContext(storageConfiguration, tableConfig, Option.empty(),
Option.empty());
KeyBasedFileGroupRecordBuffer<IndexedRecord> fileGroupRecordBuffer =
buildKeyBasedFileGroupRecordBuffer(readerContext, tableConfig, readStats, null,
@@ -132,7 +122,7 @@ class TestKeyBasedFileGroupRecordBuffer {
when(dataBlock2.getEngineRecordIterator(readerContext)).thenReturn(ClosableIterator.wrap(Arrays.asList(testRecord2EarlierUpdate,
testRecord3Update).iterator()));
HoodieDeleteBlock deleteBlock = mock(HoodieDeleteBlock.class);
- when(deleteBlock.getRecordsToDelete()).thenReturn(new
DeleteRecord[]{DeleteRecord.create("3", ""), DeleteRecord.create("2", "", -1L),
+ when(deleteBlock.getRecordsToDelete()).thenReturn(new DeleteRecord[]
{DeleteRecord.create("3", ""), DeleteRecord.create("2", "", -1L),
DeleteRecord.create("1", "", 2L)});
// process data block, then delete block, then another data block
fileGroupRecordBuffer.processDataBlock(dataBlock, Option.empty());
@@ -146,11 +136,50 @@ class TestKeyBasedFileGroupRecordBuffer {
assertEquals(2, readStats.getNumUpdates());
}
+ @Test
+ void readWithEventTimeOrderingWithRecords() throws IOException {
+ HoodieReadStats readStats = new HoodieReadStats();
+ TypedProperties properties = new TypedProperties();
+ properties.setProperty(HoodieTableConfig.PRECOMBINE_FIELDS.key(), "ts");
+ properties.setProperty(DELETE_KEY, "counter");
+ properties.setProperty(DELETE_MARKER, "3");
+ HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
+
when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.EVENT_TIME_ORDERING);
+
when(tableConfig.getPartialUpdateMode()).thenReturn(PartialUpdateMode.NONE);
+
when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.current());
+ when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[]
{"record_key"}));
+ StorageConfiguration<?> storageConfiguration =
mock(StorageConfiguration.class);
+ HoodieReaderContext<IndexedRecord> readerContext = new
HoodieAvroReaderContext(storageConfiguration, tableConfig, Option.empty(),
Option.empty());
+ readerContext.setHasLogFiles(false);
+ readerContext.setHasBootstrapBaseFile(false);
+ readerContext.initRecordMerger(properties);
+ FileGroupReaderSchemaHandler schemaHandler = new
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(),
tableConfig,
+ properties);
+ readerContext.setSchemaHandler(schemaHandler);
+ List<BufferedRecord> inputRecords =
convertToBufferedRecordsList(Arrays.asList(testRecord1UpdateWithSameTime,
testRecord2Update, testRecord3Update,
+ testRecord4EarlierUpdate, testRecord7), readerContext, properties, new
String[] {"ts"});
+
inputRecords.addAll(convertToBufferedRecordsListForDeletes(Arrays.asList(testRecord5DeleteByCustomMarker,
testRecord6DeleteByCustomMarker), false));
+ KeyBasedFileGroupRecordBuffer<IndexedRecord> fileGroupRecordBuffer =
buildKeyBasedFileGroupRecordBuffer(readerContext, tableConfig, readStats, null,
+ RecordMergeMode.EVENT_TIME_ORDERING, Collections.singletonList("ts"),
properties, Option.of(inputRecords.iterator()));
+
+
fileGroupRecordBuffer.setBaseFileIterator(ClosableIterator.wrap(Arrays.asList(testRecord1,
testRecord2, testRecord3, testRecord4,
+ testRecord5, testRecord6).iterator()));
+
+ List<IndexedRecord> actualRecords =
getActualRecords(fileGroupRecordBuffer);
+ // update for 4 is ignored due to lower ordering value.
+ // record5 is deleted.
+ // delete for 6 is ignored due to lower ordering value.
+ assertEquals(Arrays.asList(testRecord1UpdateWithSameTime,
testRecord2Update, testRecord3Update, testRecord4, testRecord6, testRecord7),
actualRecords);
+ assertEquals(1, readStats.getNumInserts());
+ assertEquals(1, readStats.getNumDeletes());
+ assertEquals(3, readStats.getNumUpdates());
+ }
+
@Test
void readWithCommitTimeOrdering() throws IOException {
HoodieReadStats readStats = new HoodieReadStats();
HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
- when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new
String[]{"record_key"}));
+ when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[]
{"record_key"}));
StorageConfiguration<?> storageConfiguration =
mock(StorageConfiguration.class);
HoodieReaderContext<IndexedRecord> readerContext = new
HoodieAvroReaderContext(storageConfiguration, tableConfig, Option.empty(),
Option.empty());
KeyBasedFileGroupRecordBuffer<IndexedRecord> fileGroupRecordBuffer =
buildKeyBasedFileGroupRecordBuffer(readerContext, tableConfig, readStats, null,
@@ -176,12 +205,49 @@ class TestKeyBasedFileGroupRecordBuffer {
assertEquals(2, readStats.getNumUpdates());
}
+ @Test
+ void readWithCommitTimeOrderingWithRecords() throws IOException {
+ HoodieReadStats readStats = new HoodieReadStats();
+ TypedProperties properties = new TypedProperties();
+ properties.setProperty(DELETE_KEY, "counter");
+ properties.setProperty(DELETE_MARKER, "3");
+ HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
+
when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.COMMIT_TIME_ORDERING);
+
when(tableConfig.getPartialUpdateMode()).thenReturn(PartialUpdateMode.NONE);
+
when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.current());
+ when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[]
{"record_key"}));
+ StorageConfiguration<?> storageConfiguration =
mock(StorageConfiguration.class);
+ HoodieReaderContext<IndexedRecord> readerContext = new
HoodieAvroReaderContext(storageConfiguration, tableConfig, Option.empty(),
Option.empty());
+ readerContext.setHasLogFiles(false);
+ readerContext.setHasBootstrapBaseFile(false);
+ readerContext.initRecordMerger(properties);
+ FileGroupReaderSchemaHandler schemaHandler = new
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(),
tableConfig,
+ properties);
+ readerContext.setSchemaHandler(schemaHandler);
+ List<BufferedRecord> inputRecords =
convertToBufferedRecordsList(Arrays.asList(testRecord1UpdateWithSameTime,
testRecord2Update, testRecord3Update,
+ testRecord4EarlierUpdate, testRecord7), readerContext, properties, new
String[] {});
+
inputRecords.addAll(convertToBufferedRecordsListForDeletes(Arrays.asList(testRecord5DeleteByCustomMarker,
testRecord6DeleteByCustomMarker), true));
+ KeyBasedFileGroupRecordBuffer<IndexedRecord> fileGroupRecordBuffer =
buildKeyBasedFileGroupRecordBuffer(readerContext, tableConfig, readStats, null,
+ RecordMergeMode.COMMIT_TIME_ORDERING, Collections.singletonList("ts"),
properties, Option.of(inputRecords.iterator()));
+
+
fileGroupRecordBuffer.setBaseFileIterator(ClosableIterator.wrap(Arrays.asList(testRecord1,
testRecord2, testRecord3, testRecord4,
+ testRecord5, testRecord6).iterator()));
+
+ List<IndexedRecord> actualRecords =
getActualRecords(fileGroupRecordBuffer);
+ assertEquals(Arrays.asList(testRecord1UpdateWithSameTime,
testRecord2Update, testRecord3Update, testRecord4EarlierUpdate, testRecord7),
actualRecords);
+ assertEquals(1, readStats.getNumInserts());
+ assertEquals(2, readStats.getNumDeletes());
+ assertEquals(4, readStats.getNumUpdates());
+ }
+
@Test
void readWithCustomPayload() throws IOException {
HoodieReadStats readStats = new HoodieReadStats();
HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
when(tableConfig.getPayloadClass()).thenReturn(CustomPayload.class.getName());
- when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new
String[]{"record_key"}));
+ when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[]
{"record_key"}));
+ when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.CUSTOM);
+
when(tableConfig.getPartialUpdateMode()).thenReturn(PartialUpdateMode.NONE);
StorageConfiguration<?> storageConfiguration =
mock(StorageConfiguration.class);
HoodieReaderContext<IndexedRecord> readerContext = new
HoodieAvroReaderContext(storageConfiguration, tableConfig, Option.empty(),
Option.empty());
KeyBasedFileGroupRecordBuffer<IndexedRecord> fileGroupRecordBuffer =
buildKeyBasedFileGroupRecordBuffer(readerContext, tableConfig, readStats, new
HoodieAvroRecordMerger(),
@@ -200,7 +266,7 @@ class TestKeyBasedFileGroupRecordBuffer {
.thenReturn(ClosableIterator.wrap(Arrays.asList(testRecord1UpdateWithSameTime,
testRecord2Delete, testRecord4Update).iterator()));
HoodieDeleteBlock deleteBlock = mock(HoodieDeleteBlock.class);
- when(deleteBlock.getRecordsToDelete()).thenReturn(new
DeleteRecord[]{DeleteRecord.create("3", "")});
+ when(deleteBlock.getRecordsToDelete()).thenReturn(new DeleteRecord[]
{DeleteRecord.create("3", "")});
fileGroupRecordBuffer.processDataBlock(dataBlock1, Option.empty());
fileGroupRecordBuffer.processDataBlock(dataBlock2, Option.empty());
fileGroupRecordBuffer.processDeleteBlock(deleteBlock);
@@ -213,12 +279,52 @@ class TestKeyBasedFileGroupRecordBuffer {
assertEquals(0, readStats.getNumUpdates());
}
+ @Test
+ void readWithCustomPayloadWithRecords() throws IOException {
+ HoodieReadStats readStats = new HoodieReadStats();
+ TypedProperties properties = new TypedProperties();
+ properties.setProperty(DELETE_KEY, "counter");
+ properties.setProperty(DELETE_MARKER, "3");
+ properties.setProperty(HoodieTableConfig.RECORD_MERGE_MODE.key(),
"CUSTOM");
+ properties.setProperty(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(),
TestKeyBasedFileGroupRecordBuffer.CustomPayload.class.getName());
+ properties.setProperty(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key(),
HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID);
+ HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
+
when(tableConfig.getPayloadClass()).thenReturn(TestKeyBasedFileGroupRecordBuffer.CustomPayload.class.getName());
+ when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[]
{"record_key"}));
+ when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.CUSTOM);
+
when(tableConfig.getPartialUpdateMode()).thenReturn(PartialUpdateMode.NONE);
+
when(tableConfig.getRecordMergeStrategyId()).thenReturn(HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID);
+
when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.current());
+ StorageConfiguration<?> storageConfiguration =
mock(StorageConfiguration.class);
+ HoodieReaderContext<IndexedRecord> readerContext = new
HoodieAvroReaderContext(storageConfiguration, tableConfig, Option.empty(),
Option.empty());
+ readerContext.setHasLogFiles(false);
+ readerContext.setHasBootstrapBaseFile(false);
+ readerContext.initRecordMerger(properties);
+ FileGroupReaderSchemaHandler schemaHandler = new
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(),
tableConfig,
+ properties);
+ readerContext.setSchemaHandler(schemaHandler);
+ List<BufferedRecord> inputRecords =
convertToBufferedRecordsList(Arrays.asList(testRecord1UpdateWithSameTime,
testRecord2Update, testRecord3Update,
+ testRecord4EarlierUpdate), readerContext, properties, new String[]
{"ts"});
+
inputRecords.addAll(convertToBufferedRecordsListForDeletes(Arrays.asList(testRecord5DeleteByCustomMarker,
testRecord6DeleteByCustomMarker), true));
+ KeyBasedFileGroupRecordBuffer<IndexedRecord> fileGroupRecordBuffer =
buildKeyBasedFileGroupRecordBuffer(readerContext, tableConfig, readStats, new
HoodieAvroRecordMerger(),
+ RecordMergeMode.CUSTOM, Collections.singletonList("ts"), properties,
Option.of(inputRecords.iterator()));
+
+
fileGroupRecordBuffer.setBaseFileIterator(ClosableIterator.wrap(Arrays.asList(testRecord1,
testRecord2, testRecord3, testRecord4,
+ testRecord5, testRecord6).iterator()));
+
+ List<IndexedRecord> actualRecords =
getActualRecords(fileGroupRecordBuffer);
+ assertEquals(Arrays.asList(testRecord1, testRecord2CustomPayloadExpected,
testRecord3UpdateCustomPayloadExpected), actualRecords);
+ assertEquals(0, readStats.getNumInserts());
+ assertEquals(3, readStats.getNumDeletes());
+ assertEquals(2, readStats.getNumUpdates());
+ }
+
@Test
void readWithCustomMerger() throws IOException {
HoodieReadStats readStats = new HoodieReadStats();
HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
when(tableConfig.getPayloadClass()).thenReturn(CustomPayload.class.getName());
- when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new
String[]{"record_key"}));
+ when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[]
{"record_key"}));
StorageConfiguration<?> storageConfiguration =
mock(StorageConfiguration.class);
HoodieReaderContext<IndexedRecord> readerContext = new
HoodieAvroReaderContext(storageConfiguration, tableConfig, Option.empty(),
Option.empty());
KeyBasedFileGroupRecordBuffer<IndexedRecord> fileGroupRecordBuffer =
buildKeyBasedFileGroupRecordBuffer(readerContext, tableConfig, readStats, new
CustomMerger(),
@@ -237,7 +343,7 @@ class TestKeyBasedFileGroupRecordBuffer {
.thenReturn(ClosableIterator.wrap(Arrays.asList(testRecord2Delete,
testRecord4Update).iterator()));
HoodieDeleteBlock deleteBlock = mock(HoodieDeleteBlock.class);
- when(deleteBlock.getRecordsToDelete()).thenReturn(new
DeleteRecord[]{DeleteRecord.create("3", "")});
+ when(deleteBlock.getRecordsToDelete()).thenReturn(new DeleteRecord[]
{DeleteRecord.create("3", "")});
fileGroupRecordBuffer.processDataBlock(dataBlock1, Option.empty());
fileGroupRecordBuffer.processDataBlock(dataBlock2, Option.empty());
fileGroupRecordBuffer.processDeleteBlock(deleteBlock);
@@ -250,117 +356,42 @@ class TestKeyBasedFileGroupRecordBuffer {
assertEquals(0, readStats.getNumUpdates());
}
- private static GenericRecord createTestRecord(String recordKey, int counter,
long ts) {
- GenericRecord record = new GenericData.Record(SCHEMA);
- record.put("record_key", recordKey);
- record.put("counter", counter);
- record.put("ts", ts);
- return record;
- }
-
- private static KeyBasedFileGroupRecordBuffer<IndexedRecord>
buildKeyBasedFileGroupRecordBuffer(HoodieReaderContext<IndexedRecord>
readerContext,
-
HoodieTableConfig tableConfig,
-
HoodieReadStats readStats,
-
HoodieRecordMerger recordMerger,
-
RecordMergeMode recordMergeMode,
-
List<String> orderingFieldNames,
-
Option<Pair<String, String>> deleteMarkerKeyValue) {
-
- readerContext.setRecordMerger(Option.ofNullable(recordMerger));
- TypedProperties props = new TypedProperties();
- deleteMarkerKeyValue.ifPresent(markerKeyValue -> {
- props.setProperty(DELETE_KEY, markerKeyValue.getLeft());
- props.setProperty(DELETE_MARKER, markerKeyValue.getRight());
- });
- FileGroupReaderSchemaHandler<IndexedRecord> fileGroupReaderSchemaHandler =
mock(FileGroupReaderSchemaHandler.class);
- when(fileGroupReaderSchemaHandler.getRequiredSchema()).thenReturn(SCHEMA);
-
when(fileGroupReaderSchemaHandler.getInternalSchema()).thenReturn(InternalSchema.getEmptyInternalSchema());
- when(fileGroupReaderSchemaHandler.getDeleteContext()).thenReturn(new
DeleteContext(props, SCHEMA));
- readerContext.setSchemaHandler(fileGroupReaderSchemaHandler);
- HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class,
RETURNS_DEEP_STUBS);
- when(mockMetaClient.getTableConfig()).thenReturn(tableConfig);
- UpdateProcessor<IndexedRecord> updateProcessor =
UpdateProcessor.create(readStats, readerContext, false, Option.empty());
- return new KeyBasedFileGroupRecordBuffer<>(
- readerContext, mockMetaClient, recordMergeMode,
PartialUpdateMode.NONE, props, orderingFieldNames, updateProcessor);
- }
-
- private static List<IndexedRecord>
getActualRecords(FileGroupRecordBuffer<IndexedRecord> fileGroupRecordBuffer)
throws IOException {
- List<IndexedRecord> actualRecords = new ArrayList<>();
- while (fileGroupRecordBuffer.hasNext()) {
- actualRecords.add(fileGroupRecordBuffer.next());
- }
- return actualRecords;
- }
+ @Test
+ void readWithCustomMergerWithRecords() throws IOException {
+ HoodieReadStats readStats = new HoodieReadStats();
+ TypedProperties properties = new TypedProperties();
+ properties.setProperty(DELETE_KEY, "counter");
+ properties.setProperty(DELETE_MARKER, "3");
+ properties.setProperty(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(),
CustomPayload.class.getName());
+ HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
+
when(tableConfig.getPayloadClass()).thenReturn(CustomPayload.class.getName());
+ when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[]
{"record_key"}));
+ when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.CUSTOM);
+
when(tableConfig.getPartialUpdateMode()).thenReturn(PartialUpdateMode.NONE);
+
when(tableConfig.getRecordMergeStrategyId()).thenReturn(HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID);
+
when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.current());
- /**
- * A custom payload implementation for testing purposes that marks records
as deleted once the counter exceeds 2.
- * During the merge, it will combine the counter values.
- */
- public static class CustomPayload extends BaseAvroPayload
- implements HoodieRecordPayload<CustomPayload> {
- private final GenericRecord payloadRecord;
-
- public CustomPayload(GenericRecord record, Comparable orderingVal) {
- super(record, orderingVal);
- this.payloadRecord = record;
- }
-
- @Override
- public CustomPayload preCombine(CustomPayload oldValue) {
- return this;
- }
-
- @Override
- public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord
currentValue, Schema schema) throws IOException {
- if (currentValue.get(2).equals(payloadRecord.get(2))) {
- // If the timestamps are the same, we do not update
- return Option.of(currentValue);
- }
- int result = (int) currentValue.get(1) + (int) payloadRecord.get(1);
- if (result > 2) {
- return Option.empty();
- }
- return Option.of(createTestRecord(currentValue.get(0).toString(),
result, (long) payloadRecord.get(2)));
- }
-
- @Override
- public Option<IndexedRecord> getInsertValue(Schema schema) throws
IOException {
- return Option.of(payloadRecord);
- }
-
- @Override
- public Comparable<?> getOrderingValue() {
- return null;
- }
- }
+ StorageConfiguration<?> storageConfiguration =
mock(StorageConfiguration.class);
+ HoodieReaderContext<IndexedRecord> readerContext = new
HoodieAvroReaderContext(storageConfiguration, tableConfig, Option.empty(),
Option.empty());
+ readerContext.setHasLogFiles(false);
+ readerContext.setHasBootstrapBaseFile(false);
+ readerContext.initRecordMerger(properties);
+ FileGroupReaderSchemaHandler schemaHandler = new
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(),
tableConfig,
+ properties);
+ readerContext.setSchemaHandler(schemaHandler);
+ List<BufferedRecord> inputRecords =
convertToBufferedRecordsList(Arrays.asList(testRecord1UpdateWithSameTime,
testRecord2Update, testRecord3Update,
+ testRecord4EarlierUpdate), readerContext, properties, new String[]
{"ts"});
+
inputRecords.addAll(convertToBufferedRecordsListForDeletes(Arrays.asList(testRecord5DeleteByCustomMarker,
testRecord6DeleteByCustomMarker), true));
+ KeyBasedFileGroupRecordBuffer<IndexedRecord> fileGroupRecordBuffer =
buildKeyBasedFileGroupRecordBuffer(readerContext, tableConfig, readStats, new
TestKeyBasedFileGroupRecordBuffer.CustomMerger(),
+ RecordMergeMode.CUSTOM, Collections.singletonList("ts"), properties,
Option.of(inputRecords.iterator()));
+
+
fileGroupRecordBuffer.setBaseFileIterator(ClosableIterator.wrap(Arrays.asList(testRecord1,
testRecord2, testRecord3, testRecord4,
+ testRecord5, testRecord6).iterator()));
- public static class CustomMerger implements HoodieRecordMerger {
- private final String strategy = UUID.randomUUID().toString();
-
- @Override
- public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws
IOException {
- GenericRecord olderData = (GenericRecord) older.getData();
- GenericRecord newerData = (GenericRecord) newer.getData();
- if (olderData.get(2).equals(newerData.get(2))) {
- // If the timestamps are the same, we do not update
- return Option.of(Pair.of(older, oldSchema));
- }
- int result = (int) olderData.get(1) + (int) newerData.get(1);
- if (result > 2) {
- return Option.empty();
- }
- HoodieKey hoodieKey = older.getKey();
- return Option.of(Pair.of(new
HoodieAvroIndexedRecord(createTestRecord(hoodieKey.getRecordKey(), result,
(long) newerData.get(2))), SCHEMA));
- }
-
- @Override
- public HoodieRecord.HoodieRecordType getRecordType() {
- return HoodieRecord.HoodieRecordType.AVRO;
- }
-
- @Override
- public String getMergingStrategy() {
- return strategy;
- }
+ List<IndexedRecord> actualRecords =
getActualRecords(fileGroupRecordBuffer);
+ assertEquals(Arrays.asList(testRecord1, testRecord2CustomPayloadExpected,
testRecord3UpdateCustomPayloadExpected), actualRecords);
+ assertEquals(0, readStats.getNumInserts());
+ assertEquals(3, readStats.getNumDeletes());
+ assertEquals(2, readStats.getNumUpdates());
}
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java
index 43da3f7521f0..0ef3da31cc1e 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java
@@ -19,21 +19,31 @@
package org.apache.hudi.common.table.read.buffer;
+import org.apache.hudi.avro.HoodieAvroReaderContext;
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.PartialUpdateMode;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
import org.apache.hudi.common.table.read.HoodieReadStats;
+import org.apache.hudi.common.table.read.InputSplit;
+import org.apache.hudi.common.table.read.ReaderParameters;
import org.apache.hudi.common.table.read.UpdateProcessor;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.avro.generic.IndexedRecord;
import org.junit.jupiter.api.Test;
import java.io.IOException;
@@ -42,13 +52,15 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-class TestSortedKeyBasedFileGroupRecordBuffer {
+class TestSortedKeyBasedFileGroupRecordBuffer extends
BaseTestFileGroupRecordBuffer {
private final TestRecord testRecord1 = new TestRecord("1", 0);
private final TestRecord testRecord2 = new TestRecord("2", 0);
private final TestRecord testRecord2Update = new TestRecord("2", 1);
@@ -58,31 +70,92 @@ class TestSortedKeyBasedFileGroupRecordBuffer {
private final TestRecord testRecord6 = new TestRecord("6", 0);
private final TestRecord testRecord6Update = new TestRecord("6", 1);
+ private final IndexedRecord testIndexedRecord1 = createTestRecord("1", 1,
1L);
+ private final IndexedRecord testIndexedRecord2 = createTestRecord("2", 1,
1L);
+ private final IndexedRecord testIndexedRecord2Update = createTestRecord("2",
1, 2L);
+ private final IndexedRecord testIndexedRecord3 = createTestRecord("3", 1,
1L);
+ private final IndexedRecord testIndexedRecord4 = createTestRecord("4", 2,
2L);
+ private final IndexedRecord testIndexedRecord4LowerOrdering =
createTestRecord("4", 2, 1L);
+ private final IndexedRecord testIndexedRecord5 = createTestRecord("5", 1,
1L);
+ private final IndexedRecord testRecord5DeleteByCustomMarker =
createTestRecord("5", 3, 2L);
+ private final IndexedRecord testIndexedRecord6 = createTestRecord("6", 1,
5L);
+ private final IndexedRecord testIndexedRecord6Update = createTestRecord("6",
2, 10L);
+
@Test
void readBaseFileAndLogFile() throws IOException {
HoodieReadStats readStats = new HoodieReadStats();
HoodieReaderContext<TestRecord> mockReaderContext =
mock(HoodieReaderContext.class, RETURNS_DEEP_STUBS);
+
SortedKeyBasedFileGroupRecordBuffer<TestRecord> fileGroupRecordBuffer =
buildSortedKeyBasedFileGroupRecordBuffer(mockReaderContext, readStats);
fileGroupRecordBuffer.setBaseFileIterator(ClosableIterator.wrap(Arrays.asList(testRecord2,
testRecord3, testRecord5).iterator()));
HoodieDataBlock dataBlock = mock(HoodieDataBlock.class);
when(dataBlock.getSchema()).thenReturn(HoodieTestDataGenerator.AVRO_SCHEMA);
-
when(dataBlock.getEngineRecordIterator(mockReaderContext)).thenReturn(ClosableIterator.wrap(Arrays.asList(testRecord6,
testRecord4, testRecord1, testRecord6Update, testRecord2Update).iterator()));
+ when(dataBlock.getEngineRecordIterator(mockReaderContext)).thenReturn(
+ ClosableIterator.wrap(Arrays.asList(testRecord6, testRecord4,
testRecord1, testRecord6Update, testRecord2Update).iterator()));
HoodieDeleteBlock deleteBlock = mock(HoodieDeleteBlock.class);
- when(deleteBlock.getRecordsToDelete()).thenReturn(new
DeleteRecord[]{DeleteRecord.create("3", "")});
+ when(deleteBlock.getRecordsToDelete()).thenReturn(new DeleteRecord[]
{DeleteRecord.create("3", "")});
fileGroupRecordBuffer.processDataBlock(dataBlock, Option.empty());
fileGroupRecordBuffer.processDeleteBlock(deleteBlock);
-
- List<TestRecord> actualRecords = getActualRecords(fileGroupRecordBuffer);
+ List<TestRecord> actualRecords =
getActualRecordsForSortedKeyBased(fileGroupRecordBuffer);
assertEquals(Arrays.asList(testRecord1, testRecord2Update, testRecord4,
testRecord5, testRecord6Update), actualRecords);
assertEquals(3, readStats.getNumInserts());
assertEquals(1, readStats.getNumUpdates());
assertEquals(1, readStats.getNumDeletes());
}
+ @Test
+ void readWithStreamingRecordBufferLoaderAndEventTimeOrdering() throws
IOException {
+ HoodieReadStats readStats = new HoodieReadStats();
+ TypedProperties properties = new TypedProperties();
+ properties.setProperty(HoodieTableConfig.PRECOMBINE_FIELDS.key(), "ts");
+ properties.setProperty(DELETE_KEY, "counter");
+ properties.setProperty(DELETE_MARKER, "3");
+ HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
+ when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[]
{"record_key"}));
+
when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.current());
+
when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.EVENT_TIME_ORDERING);
+ StorageConfiguration<?> storageConfiguration =
mock(StorageConfiguration.class);
+ HoodieReaderContext<IndexedRecord> readerContext = new
HoodieAvroReaderContext(storageConfiguration, tableConfig, Option.empty(),
Option.empty());
+ readerContext.setHasLogFiles(false);
+ readerContext.setHasBootstrapBaseFile(false);
+ FileGroupReaderSchemaHandler schemaHandler = new
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(),
tableConfig,
+ properties);
+ readerContext.setSchemaHandler(schemaHandler);
+ readerContext.initRecordMerger(properties);
+ List<BufferedRecord> inputRecords =
+ convertToBufferedRecordsList(Arrays.asList(testIndexedRecord6Update,
testIndexedRecord4LowerOrdering, testIndexedRecord1, testIndexedRecord2Update),
readerContext, properties,
+ new String[]{"ts"});
+
inputRecords.addAll(convertToBufferedRecordsListForDeletes(Arrays.asList(testRecord5DeleteByCustomMarker),
false));
+ HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class,
RETURNS_DEEP_STUBS);
+ when(mockMetaClient.getTableConfig()).thenReturn(tableConfig);
+
when(tableConfig.getPayloadClass()).thenReturn(DefaultHoodieRecordPayload.class.getName());
+
when(tableConfig.getPartialUpdateMode()).thenReturn(PartialUpdateMode.NONE);
+
+ FileGroupRecordBufferLoader recordBufferLoader =
FileGroupRecordBufferLoader.createStreamingRecordsBufferLoader();
+ InputSplit inputSplit = mock(InputSplit.class);
+ when(inputSplit.hasNoRecordsToMerge()).thenReturn(false);
+ when(inputSplit.getRecordIterator()).thenReturn(inputRecords.iterator());
+ ReaderParameters readerParameters = mock(ReaderParameters.class);
+ when(readerParameters.sortOutputs()).thenReturn(true);
+ SortedKeyBasedFileGroupRecordBuffer fileGroupRecordBuffer =
(SortedKeyBasedFileGroupRecordBuffer<IndexedRecord>) recordBufferLoader
+ .getRecordBuffer(readerContext, mockMetaClient.getStorage(),
inputSplit, Collections.singletonList("ts"), mockMetaClient, properties,
+ readerParameters, readStats, Option.empty()).getKey();
+
when(tableConfig.getPayloadClass()).thenReturn(DefaultHoodieRecordPayload.class.getName());
+
+
fileGroupRecordBuffer.setBaseFileIterator(ClosableIterator.wrap(Arrays.asList(testIndexedRecord2,
testIndexedRecord3, testIndexedRecord4,
+ testIndexedRecord5, testIndexedRecord6).iterator()));
+
+ List<IndexedRecord> actualRecords =
getActualRecords(fileGroupRecordBuffer);
+ assertEquals(Arrays.asList(testIndexedRecord1, testIndexedRecord2Update,
testIndexedRecord3, testIndexedRecord4, testIndexedRecord6Update),
actualRecords);
+ assertEquals(1, readStats.getNumInserts());
+ assertEquals(1, readStats.getNumDeletes());
+ assertEquals(2, readStats.getNumUpdates());
+ }
+
@Test
void readLogFiles() throws IOException {
HoodieReadStats readStats = new HoodieReadStats();
@@ -100,13 +173,13 @@ class TestSortedKeyBasedFileGroupRecordBuffer {
when(dataBlock2.getEngineRecordIterator(mockReaderContext)).thenReturn(ClosableIterator.wrap(Arrays.asList(testRecord2Update,
testRecord5, testRecord3, testRecord1).iterator()));
HoodieDeleteBlock deleteBlock = mock(HoodieDeleteBlock.class);
- when(deleteBlock.getRecordsToDelete()).thenReturn(new
DeleteRecord[]{DeleteRecord.create("3", "")});
+ when(deleteBlock.getRecordsToDelete()).thenReturn(new DeleteRecord[]
{DeleteRecord.create("3", "")});
fileGroupRecordBuffer.processDataBlock(dataBlock1, Option.empty());
fileGroupRecordBuffer.processDataBlock(dataBlock2, Option.empty());
fileGroupRecordBuffer.processDeleteBlock(deleteBlock);
- List<TestRecord> actualRecords = getActualRecords(fileGroupRecordBuffer);
+ List<TestRecord> actualRecords =
getActualRecordsForSortedKeyBased(fileGroupRecordBuffer);
assertEquals(Arrays.asList(testRecord1, testRecord2Update, testRecord4,
testRecord5, testRecord6Update), actualRecords);
assertEquals(5, readStats.getNumInserts());
assertEquals(0, readStats.getNumUpdates());
@@ -133,12 +206,11 @@ class TestSortedKeyBasedFileGroupRecordBuffer {
mockReaderContext, mockMetaClient, recordMergeMode, partialUpdateMode,
props, Collections.emptyList(), updateProcessor);
}
- private static List<TestRecord>
getActualRecords(SortedKeyBasedFileGroupRecordBuffer<TestRecord>
fileGroupRecordBuffer) throws IOException {
- List<TestRecord> actualRecords = new ArrayList<>();
+ private static <T> List<T>
getActualRecordsForSortedKeyBased(SortedKeyBasedFileGroupRecordBuffer<T>
fileGroupRecordBuffer) throws IOException {
+ List<T> actualRecords = new ArrayList<>();
while (fileGroupRecordBuffer.hasNext()) {
actualRecords.add(fileGroupRecordBuffer.next());
}
return actualRecords;
}
-
}
\ No newline at end of file
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestStreamingKeyBasedFileGroupRecordBuffer.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestStreamingKeyBasedFileGroupRecordBuffer.java
new file mode 100644
index 000000000000..4aa7856c57cc
--- /dev/null
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestStreamingKeyBasedFileGroupRecordBuffer.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read.buffer;
+
+import org.apache.hudi.avro.HoodieAvroReaderContext;
+import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.HoodieAvroRecordMerger;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.PartialUpdateMode;
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
+import org.apache.hudi.common.table.read.HoodieReadStats;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.storage.StorageConfiguration;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class TestStreamingKeyBasedFileGroupRecordBuffer extends
BaseTestFileGroupRecordBuffer {
+ private final IndexedRecord testRecord1 = createTestRecord("1", 1, 1L);
+ private final IndexedRecord testRecord1UpdateWithSameTime =
createTestRecord("1", 2, 1L);
+ private final IndexedRecord testRecord2 = createTestRecord("2", 1, 1L);
+ private final IndexedRecord testRecord2Update = createTestRecord("2", 1, 2L);
+ private final IndexedRecord testRecord2CustomPayloadExpected =
createTestRecord("2", 2, 2L);
+ private final IndexedRecord testRecord3 = createTestRecord("3", 1, 1L);
+ private final IndexedRecord testRecord3Update = createTestRecord("3", 1, 2L);
+ private final IndexedRecord testRecord3UpdateCustomPayloadExpected =
createTestRecord("3", 2, 2L);
+ private final IndexedRecord testRecord4 = createTestRecord("4", 2, 1L);
+ private final IndexedRecord testRecord4EarlierUpdate = createTestRecord("4",
1, 0L);
+ private final IndexedRecord testRecord5 = createTestRecord("5", 1, 1L);
+ private final IndexedRecord testRecord5DeleteByCustomMarker =
createTestRecord("5", 3, 2L);
+ private final IndexedRecord testRecord6 = createTestRecord("6", 1, 5L);
+ private final IndexedRecord testRecord6DeleteByCustomMarker =
createTestRecord("6", 3, 2L);
+ private final IndexedRecord testRecord7 = createTestRecord("7", 1, 5L);
+
+ @Test
+ void readWithEventTimeOrdering() throws IOException {
+ HoodieReadStats readStats = new HoodieReadStats();
+ TypedProperties properties = new TypedProperties();
+ properties.setProperty(HoodieTableConfig.PRECOMBINE_FIELDS.key(), "ts");
+ properties.setProperty(DELETE_KEY, "counter");
+ properties.setProperty(DELETE_MARKER, "3");
+ HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
+
when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.EVENT_TIME_ORDERING);
+
when(tableConfig.getPartialUpdateMode()).thenReturn(PartialUpdateMode.NONE);
+
when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.current());
+ when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[]
{"record_key"}));
+ StorageConfiguration<?> storageConfiguration =
mock(StorageConfiguration.class);
+ HoodieReaderContext<IndexedRecord> readerContext = new
HoodieAvroReaderContext(storageConfiguration, tableConfig, Option.empty(),
Option.empty());
+ readerContext.setHasLogFiles(false);
+ readerContext.setHasBootstrapBaseFile(false);
+ readerContext.initRecordMerger(properties);
+ FileGroupReaderSchemaHandler schemaHandler = new
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(),
tableConfig,
+ properties);
+ readerContext.setSchemaHandler(schemaHandler);
+ List<BufferedRecord> inputRecords =
convertToBufferedRecordsList(Arrays.asList(testRecord1UpdateWithSameTime,
testRecord2Update, testRecord3Update,
+ testRecord4EarlierUpdate, testRecord7), readerContext, properties, new
String[] {"ts"});
+
inputRecords.addAll(convertToBufferedRecordsListForDeletes(Arrays.asList(testRecord5DeleteByCustomMarker,
testRecord6DeleteByCustomMarker), false));
+ KeyBasedFileGroupRecordBuffer<IndexedRecord> fileGroupRecordBuffer =
buildKeyBasedFileGroupRecordBuffer(readerContext, tableConfig, readStats, null,
+ RecordMergeMode.EVENT_TIME_ORDERING, Collections.singletonList("ts"),
properties, Option.of(inputRecords.iterator()));
+
+
fileGroupRecordBuffer.setBaseFileIterator(ClosableIterator.wrap(Arrays.asList(testRecord1,
testRecord2, testRecord3, testRecord4,
+ testRecord5, testRecord6).iterator()));
+
+ List<IndexedRecord> actualRecords =
getActualRecords(fileGroupRecordBuffer);
+ // update for 4 is ignored due to lower ordering value.
+ // record5 is deleted.
+ // delete for 6 is ignored due to lower ordering value.
+ assertEquals(Arrays.asList(testRecord1UpdateWithSameTime,
testRecord2Update, testRecord3Update, testRecord4, testRecord6, testRecord7),
actualRecords);
+ assertEquals(1, readStats.getNumInserts());
+ assertEquals(1, readStats.getNumDeletes());
+ assertEquals(3, readStats.getNumUpdates());
+ }
+
+ @Test
+ void readWithCommitTimeOrdering() throws IOException {
+ HoodieReadStats readStats = new HoodieReadStats();
+ TypedProperties properties = new TypedProperties();
+ properties.setProperty(DELETE_KEY, "counter");
+ properties.setProperty(DELETE_MARKER, "3");
+ HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
+
when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.COMMIT_TIME_ORDERING);
+
when(tableConfig.getPartialUpdateMode()).thenReturn(PartialUpdateMode.NONE);
+
when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.current());
+ when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[]
{"record_key"}));
+ StorageConfiguration<?> storageConfiguration =
mock(StorageConfiguration.class);
+ HoodieReaderContext<IndexedRecord> readerContext = new
HoodieAvroReaderContext(storageConfiguration, tableConfig, Option.empty(),
Option.empty());
+ readerContext.setHasLogFiles(false);
+ readerContext.setHasBootstrapBaseFile(false);
+ readerContext.initRecordMerger(properties);
+ FileGroupReaderSchemaHandler schemaHandler = new
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(),
tableConfig,
+ properties);
+ readerContext.setSchemaHandler(schemaHandler);
+ List<BufferedRecord> inputRecords =
convertToBufferedRecordsList(Arrays.asList(testRecord1UpdateWithSameTime,
testRecord2Update, testRecord3Update,
+ testRecord4EarlierUpdate, testRecord7), readerContext, properties, new
String[] {});
+
inputRecords.addAll(convertToBufferedRecordsListForDeletes(Arrays.asList(testRecord5DeleteByCustomMarker,
testRecord6DeleteByCustomMarker), true));
+ KeyBasedFileGroupRecordBuffer<IndexedRecord> fileGroupRecordBuffer =
buildKeyBasedFileGroupRecordBuffer(readerContext, tableConfig, readStats, null,
+ RecordMergeMode.COMMIT_TIME_ORDERING, Collections.singletonList("ts"),
properties, Option.of(inputRecords.iterator()));
+
+
fileGroupRecordBuffer.setBaseFileIterator(ClosableIterator.wrap(Arrays.asList(testRecord1,
testRecord2, testRecord3, testRecord4,
+ testRecord5, testRecord6).iterator()));
+
+ List<IndexedRecord> actualRecords =
getActualRecords(fileGroupRecordBuffer);
+ assertEquals(Arrays.asList(testRecord1UpdateWithSameTime,
testRecord2Update, testRecord3Update, testRecord4EarlierUpdate, testRecord7),
actualRecords);
+ assertEquals(1, readStats.getNumInserts());
+ assertEquals(2, readStats.getNumDeletes());
+ assertEquals(4, readStats.getNumUpdates());
+ }
+
+ @Test
+ void readWithCustomPayload() throws IOException {
+ HoodieReadStats readStats = new HoodieReadStats();
+ TypedProperties properties = new TypedProperties();
+ properties.setProperty(DELETE_KEY, "counter");
+ properties.setProperty(DELETE_MARKER, "3");
+ properties.setProperty(HoodieTableConfig.RECORD_MERGE_MODE.key(),
"CUSTOM");
+ properties.setProperty(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(),
CustomPayload.class.getName());
+ properties.setProperty(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key(),
HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID);
+ HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
+
when(tableConfig.getPayloadClass()).thenReturn(CustomPayload.class.getName());
+ when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[]
{"record_key"}));
+ when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.CUSTOM);
+
when(tableConfig.getPartialUpdateMode()).thenReturn(PartialUpdateMode.NONE);
+
when(tableConfig.getRecordMergeStrategyId()).thenReturn(HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID);
+
when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.current());
+ StorageConfiguration<?> storageConfiguration =
mock(StorageConfiguration.class);
+ HoodieReaderContext<IndexedRecord> readerContext = new
HoodieAvroReaderContext(storageConfiguration, tableConfig, Option.empty(),
Option.empty());
+ readerContext.setHasLogFiles(false);
+ readerContext.setHasBootstrapBaseFile(false);
+ readerContext.initRecordMerger(properties);
+ FileGroupReaderSchemaHandler schemaHandler = new
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(),
tableConfig,
+ properties);
+ readerContext.setSchemaHandler(schemaHandler);
+ List<BufferedRecord> inputRecords =
convertToBufferedRecordsList(Arrays.asList(testRecord1UpdateWithSameTime,
testRecord2Update, testRecord3Update,
+ testRecord4EarlierUpdate), readerContext, properties, new String[]
{"ts"});
+
inputRecords.addAll(convertToBufferedRecordsListForDeletes(Arrays.asList(testRecord5DeleteByCustomMarker,
testRecord6DeleteByCustomMarker), true));
+ KeyBasedFileGroupRecordBuffer<IndexedRecord> fileGroupRecordBuffer =
buildKeyBasedFileGroupRecordBuffer(readerContext, tableConfig, readStats, new
HoodieAvroRecordMerger(),
+ RecordMergeMode.CUSTOM, Collections.singletonList("ts"), properties,
Option.of(inputRecords.iterator()));
+
+
fileGroupRecordBuffer.setBaseFileIterator(ClosableIterator.wrap(Arrays.asList(testRecord1,
testRecord2, testRecord3, testRecord4,
+ testRecord5, testRecord6).iterator()));
+
+ List<IndexedRecord> actualRecords =
getActualRecords(fileGroupRecordBuffer);
+ assertEquals(Arrays.asList(testRecord1, testRecord2CustomPayloadExpected,
testRecord3UpdateCustomPayloadExpected), actualRecords);
+ assertEquals(0, readStats.getNumInserts());
+ assertEquals(3, readStats.getNumDeletes());
+ assertEquals(2, readStats.getNumUpdates());
+ }
+
+ @Test
+ void readWithCustomMergerWithRecords() throws IOException {
+ HoodieReadStats readStats = new HoodieReadStats();
+ TypedProperties properties = new TypedProperties();
+ properties.setProperty(DELETE_KEY, "counter");
+ properties.setProperty(DELETE_MARKER, "3");
+ properties.setProperty(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(),
CustomPayload.class.getName());
+ HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
+
when(tableConfig.getPayloadClass()).thenReturn(CustomPayload.class.getName());
+ when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[]
{"record_key"}));
+ when(tableConfig.getRecordMergeMode()).thenReturn(RecordMergeMode.CUSTOM);
+
when(tableConfig.getPartialUpdateMode()).thenReturn(PartialUpdateMode.NONE);
+
when(tableConfig.getRecordMergeStrategyId()).thenReturn(HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID);
+
when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.current());
+
+ StorageConfiguration<?> storageConfiguration =
mock(StorageConfiguration.class);
+ HoodieReaderContext<IndexedRecord> readerContext = new
HoodieAvroReaderContext(storageConfiguration, tableConfig, Option.empty(),
Option.empty());
+ readerContext.setHasLogFiles(false);
+ readerContext.setHasBootstrapBaseFile(false);
+ readerContext.initRecordMerger(properties);
+ FileGroupReaderSchemaHandler schemaHandler = new
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(),
tableConfig,
+ properties);
+ readerContext.setSchemaHandler(schemaHandler);
+ List<BufferedRecord> inputRecords =
convertToBufferedRecordsList(Arrays.asList(testRecord1UpdateWithSameTime,
testRecord2Update, testRecord3Update,
+ testRecord4EarlierUpdate), readerContext, properties, new String[]
{"ts"});
+
inputRecords.addAll(convertToBufferedRecordsListForDeletes(Arrays.asList(testRecord5DeleteByCustomMarker,
testRecord6DeleteByCustomMarker), true));
+ KeyBasedFileGroupRecordBuffer<IndexedRecord> fileGroupRecordBuffer =
buildKeyBasedFileGroupRecordBuffer(readerContext, tableConfig, readStats, new
TestKeyBasedFileGroupRecordBuffer.CustomMerger(),
+ RecordMergeMode.CUSTOM, Collections.singletonList("ts"), properties,
Option.of(inputRecords.iterator()));
+
+
fileGroupRecordBuffer.setBaseFileIterator(ClosableIterator.wrap(Arrays.asList(testRecord1,
testRecord2, testRecord3, testRecord4,
+ testRecord5, testRecord6).iterator()));
+
+ List<IndexedRecord> actualRecords =
getActualRecords(fileGroupRecordBuffer);
+ assertEquals(Arrays.asList(testRecord1, testRecord2CustomPayloadExpected,
testRecord3UpdateCustomPayloadExpected), actualRecords);
+ assertEquals(0, readStats.getNumInserts());
+ assertEquals(3, readStats.getNumDeletes());
+ assertEquals(2, readStats.getNumUpdates());
+ }
+}