This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 47ad41575de [HUDI-7176] Add file group reader test framework (#10263)
47ad41575de is described below
commit 47ad41575defafae3024312e801609b80eb9e900
Author: Lin Liu <[email protected]>
AuthorDate: Tue Dec 12 13:12:33 2023 -0800
[HUDI-7176] Add file group reader test framework (#10263)
Changes:
1. Add a builder class to construct file group reader.
2. Add an indexedRecord based reader context.
3. Implement rest of functions for the file group reader utils.
4. Add a util class for generating FileSlice.
---
.../hudi/common/engine/HoodieReaderContext.java | 3 +-
.../common/table/read/HoodieFileGroupReader.java | 8 +-
.../hudi/common/testutils/FileSystemTestUtils.java | 6 +
.../testutils/reader/DataGenerationPlan.java | 126 ++++++
.../reader/HoodieFileGroupReaderTestUtils.java | 127 ++++++
.../testutils/reader/HoodieFileSliceTestUtils.java | 440 +++++++++++++++++++++
.../testutils/reader/HoodieTestReaderContext.java | 163 ++++++++
7 files changed, 867 insertions(+), 6 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index 7b8b2888983..8daf3f441f7 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.function.UnaryOperator;
@@ -78,7 +79,7 @@ public abstract class HoodieReaderContext<T> {
* @return {@link ClosableIterator<T>} that can return all records through
iteration.
*/
public abstract ClosableIterator<T> getFileRecordIterator(
- Path filePath, long start, long length, Schema dataSchema, Schema
requiredSchema, Configuration conf);
+ Path filePath, long start, long length, Schema dataSchema, Schema
requiredSchema, Configuration conf) throws IOException;
/**
* Converts an Avro record, e.g., serialized in the log files, to an
engine-specific record.
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index 8413ef8a5e2..52ee14d969e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -139,14 +139,12 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
: new HoodieKeyBasedFileGroupRecordBuffer<>(
readerContext, requiredSchema, requiredSchema, Option.empty(),
Option.empty(),
recordMerger, props);
-
-
}
/**
* Initialize internal iterators on the base and log files.
*/
- public void initRecordIterators() {
+ public void initRecordIterators() throws IOException {
ClosableIterator<T> iter = makeBaseFileIterator();
if (logFiles.isEmpty()) {
this.baseFileIterator = CachingIterator.wrap(iter, readerContext);
@@ -157,7 +155,7 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
}
}
- private ClosableIterator<T> makeBaseFileIterator() {
+ private ClosableIterator<T> makeBaseFileIterator() throws IOException {
if (!hoodieBaseFileOption.isPresent()) {
return new EmptyIterator<>();
}
@@ -225,7 +223,7 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
return newSchema;
}
- private ClosableIterator<T> makeBootstrapBaseFileIterator(HoodieBaseFile
baseFile) {
+ private ClosableIterator<T> makeBootstrapBaseFileIterator(HoodieBaseFile
baseFile) throws IOException {
BaseFile dataFile = baseFile.getBootstrapBaseFile().get();
Pair<List<Schema.Field>,List<Schema.Field>> requiredFields =
getDataAndMetaCols(requiredSchema);
Pair<List<Schema.Field>,List<Schema.Field>> allFields =
getDataAndMetaCols(dataSchema);
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileSystemTestUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileSystemTestUtils.java
index e73f2bb0440..a00e215f10f 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileSystemTestUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileSystemTestUtils.java
@@ -61,6 +61,12 @@ public class FileSystemTestUtils {
return new Path(FILE_SCHEME + fileSuffix);
}
+ public static Path getRandomOuterFSPath(String extension) {
+ String randomFileName = UUID.randomUUID().toString();
+ String fileSuffix = COLON + FORWARD_SLASH + TEMP + FORWARD_SLASH +
randomFileName;
+ return new Path(FILE_SCHEME + fileSuffix + "." + extension);
+ }
+
public static Path getPhantomFile(Path outerPath, long startOffset, long
inlineLength) {
// Generate phantom inline file
return InLineFSUtils.getInlineFilePath(outerPath, FILE_SCHEME,
startOffset, inlineLength);
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/DataGenerationPlan.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/DataGenerationPlan.java
new file mode 100644
index 00000000000..29b0090a5db
--- /dev/null
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/DataGenerationPlan.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.testutils.reader;
+
+import java.util.List;
+
+/**
+ * The blueprint of records that will be generated
+ * by the data generator.
+ *
+ * Current limitations:
+ * 1. One plan generates one file, either a base file, or a log file.
+ * 2. One file contains one operation, e.g., insert, delete, or update.
+ */
+public class DataGenerationPlan {
+ // The values for "_row_key" field.
+ private final List<String> recordKeys;
+ // The partition path for all records.
+ private final String partitionPath;
+ // The ordering field.
+ private final long timestamp;
+ // The operation type of the record.
+ private final OperationType operationType;
+ private final String instantTime;
+
+ public enum OperationType {
+ INSERT,
+ UPDATE,
+ DELETE
+ }
+
+ public DataGenerationPlan(List<String> recordKeys,
+ String partitionPath,
+ long timestamp,
+ OperationType operationType,
+ String instantTime) {
+ this.recordKeys = recordKeys;
+ this.partitionPath = partitionPath;
+ this.timestamp = timestamp;
+ this.operationType = operationType;
+ this.instantTime = instantTime;
+ }
+
+ public List<String> getRecordKeys() {
+ return recordKeys;
+ }
+
+ public String getPartitionPath() {
+ return partitionPath;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public OperationType getOperationType() {
+ return operationType;
+ }
+
+ public String getInstantTime() {
+ return instantTime;
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private List<String> recordKeys;
+ private String partitionPath;
+ private long timestamp;
+ private OperationType operationType;
+ private String instantTime;
+
+ public Builder withRecordKeys(List<String> recordKeys) {
+ this.recordKeys = recordKeys;
+ return this;
+ }
+
+ public Builder withPartitionPath(String partitionPath) {
+ this.partitionPath = partitionPath;
+ return this;
+ }
+
+ public Builder withTimeStamp(long timestamp) {
+ this.timestamp = timestamp;
+ return this;
+ }
+
+ public Builder withOperationType(OperationType operationType) {
+ this.operationType = operationType;
+ return this;
+ }
+
+ public Builder withInstantTime(String instantTime) {
+ this.instantTime = instantTime;
+ return this;
+ }
+
+ public DataGenerationPlan build() {
+ return new DataGenerationPlan(
+ recordKeys,
+ partitionPath,
+ timestamp,
+ operationType,
+ instantTime);
+ }
+ }
+}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestUtils.java
new file mode 100644
index 00000000000..39ddc42bc37
--- /dev/null
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestUtils.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.testutils.reader;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+
+public class HoodieFileGroupReaderTestUtils {
+ public static HoodieFileGroupReader<IndexedRecord> createFileGroupReader(
+ Option<FileSlice> fileSliceOpt,
+ String basePath,
+ String latestCommitTime,
+ Schema schema,
+ boolean shouldUseRecordPosition,
+ long start,
+ long length,
+ TypedProperties properties,
+ Configuration hadoopConf,
+ HoodieTableConfig tableConfig,
+ HoodieReaderContext<IndexedRecord> readerContext
+ ) {
+ assert (fileSliceOpt.isPresent());
+ return new HoodieFileGroupReaderBuilder()
+ .withReaderContext(readerContext)
+ .withHadoopConf(hadoopConf)
+ .withFileSlice(fileSliceOpt.get())
+ .withStart(start)
+ .withLength(length)
+ .withProperties(properties)
+ .withTableConfig(tableConfig)
+ .build(basePath, latestCommitTime, schema, shouldUseRecordPosition);
+ }
+
+ public static class HoodieFileGroupReaderBuilder {
+ private HoodieReaderContext<IndexedRecord> readerContext;
+ private FileSlice fileSlice;
+ private Configuration hadoopConf;
+ private TypedProperties props;
+ private long start;
+ private long length;
+ private HoodieTableConfig tableConfig;
+
+ public HoodieFileGroupReaderBuilder withReaderContext(
+ HoodieReaderContext<IndexedRecord> context) {
+ this.readerContext = context;
+ return this;
+ }
+
+ public HoodieFileGroupReaderBuilder withFileSlice(FileSlice fileSlice) {
+ this.fileSlice = fileSlice;
+ return this;
+ }
+
+ public HoodieFileGroupReaderBuilder withHadoopConf(Configuration
hadoopConf) {
+ this.hadoopConf = hadoopConf;
+ return this;
+ }
+
+ public HoodieFileGroupReaderBuilder withProperties(TypedProperties props) {
+ this.props = props;
+ return this;
+ }
+
+ public HoodieFileGroupReaderBuilder withStart(long start) {
+ this.start = start;
+ return this;
+ }
+
+ public HoodieFileGroupReaderBuilder withLength(long length) {
+ this.length = length;
+ return this;
+ }
+
+ public HoodieFileGroupReaderBuilder withTableConfig(
+ HoodieTableConfig tableConfig
+ ) {
+ this.tableConfig = tableConfig;
+ return this;
+ }
+
+ public HoodieFileGroupReader<IndexedRecord> build(
+ String basePath,
+ String latestCommitTime,
+ Schema schema,
+ boolean shouldUseRecordPosition
+ ) {
+ return new HoodieFileGroupReader<>(
+ readerContext,
+ hadoopConf,
+ basePath,
+ latestCommitTime,
+ fileSlice,
+ schema,
+ schema,
+ props,
+ tableConfig,
+ start,
+ length,
+ shouldUseRecordPosition);
+ }
+ }
+}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
new file mode 100644
index 00000000000..ce8ad3ac8c7
--- /dev/null
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.testutils.reader;
+
+import org.apache.hudi.avro.HoodieAvroWriteSupport;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterFactory;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.LocalTaskContextSupplier;
+import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieCDCDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
+import org.apache.hudi.common.table.log.block.HoodieHFileDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.io.storage.HoodieAvroParquetWriter;
+import org.apache.hudi.io.storage.HoodieParquetConfig;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static
org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType.DELETE_BLOCK;
+import static
org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType.PARQUET_DATA_BLOCK;
+import static org.apache.hudi.common.testutils.FileCreateUtils.baseFileName;
+import static org.apache.hudi.common.testutils.FileCreateUtils.logFileName;
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA;
+import static
org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.DELETE;
+import static
org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.INSERT;
+
+public class HoodieFileSliceTestUtils {
+ public static final String FORWARD_SLASH = "/";
+ public static final String PARQUET = ".parquet";
+
+ public static final String DRIVER = "driver";
+ public static final String PARTITION_PATH = "partition_path";
+ public static final String RIDER = "rider";
+ public static final String ROW_KEY = "_row_key";
+ public static final String TIMESTAMP = "timestamp";
+ public static final HoodieTestDataGenerator DATA_GEN =
+ new HoodieTestDataGenerator(0xDEED);
+ public static final TypedProperties PROPERTIES = new TypedProperties();
+
+ static {
+ PROPERTIES.setProperty(
+ "hoodie.datasource.write.precombine.field", "timestamp");
+ }
+
+ // We use a number to represent a record key, and a (start, end) range
+ // to represent a set of record keys between start <= k <= end.
+ public static class KeyRange {
+ public int start;
+ public int end;
+
+ public KeyRange(int start, int end) {
+ this.start = start;
+ this.end = end;
+ }
+ }
+
+ private static Path generateBaseFilePath(
+ String basePath,
+ String fileId,
+ String instantTime
+ ) {
+ return new Path(
+ basePath + FORWARD_SLASH
+ + baseFileName(instantTime, fileId, PARQUET));
+ }
+
+ private static Path generateLogFilePath(
+ String basePath,
+ String fileId,
+ String instantTime,
+ int version) {
+ return new Path(
+ basePath + FORWARD_SLASH + logFileName(
+ instantTime, fileId, version));
+ }
+
+ // Note:
+ // "start < end" means start <= k <= end.
+ // "start == end" means k = start.
+ // "start > end" means no keys.
+ private static List<String> generateKeys(KeyRange range) {
+ List<String> keys = new ArrayList<>();
+ if (range.start == range.end) {
+ keys.add(String.valueOf(range.start));
+ } else {
+ keys = IntStream
+ .rangeClosed(range.start, range.end)
+ .boxed()
+ .map(String::valueOf).collect(Collectors.toList());
+ }
+ return keys;
+ }
+
+ private static List<IndexedRecord> generateRecords(DataGenerationPlan plan) {
+ List<IndexedRecord> records = new ArrayList<>();
+ List<String> keys = plan.getRecordKeys();
+ for (String key : keys) {
+ records.add(DATA_GEN.generateGenericRecord(
+ key,
+ plan.getPartitionPath(),
+ RIDER + "." + UUID.randomUUID(),
+ DRIVER + "." + UUID.randomUUID(),
+ plan.getTimestamp(),
+ plan.getOperationType() == DELETE,
+ false
+ ));
+ }
+ return records;
+ }
+
+ private static HoodieDataBlock getDataBlock(
+ HoodieLogBlock.HoodieLogBlockType dataBlockType,
+ List<IndexedRecord> records,
+ Map<HoodieLogBlock.HeaderMetadataType, String> header,
+ Path logFilePath
+ ) {
+ return createDataBlock(
+ dataBlockType,
+ records.stream().map(HoodieAvroIndexedRecord::new)
+ .collect(Collectors.toList()),
+ header,
+ logFilePath);
+ }
+
+ private static HoodieDataBlock createDataBlock(
+ HoodieLogBlock.HoodieLogBlockType dataBlockType,
+ List<HoodieRecord> records,
+ Map<HoodieLogBlock.HeaderMetadataType, String> header,
+ Path pathForReader
+ ) {
+ switch (dataBlockType) {
+ case CDC_DATA_BLOCK:
+ return new HoodieCDCDataBlock(
+ records,
+ header,
+ HoodieRecord.RECORD_KEY_METADATA_FIELD);
+ case AVRO_DATA_BLOCK:
+ return new HoodieAvroDataBlock(
+ records,
+ false,
+ header,
+ HoodieRecord.RECORD_KEY_METADATA_FIELD);
+ case HFILE_DATA_BLOCK:
+ return new HoodieHFileDataBlock(
+ records,
+ header,
+ Compression.Algorithm.GZ,
+ pathForReader);
+ case PARQUET_DATA_BLOCK:
+ return new HoodieParquetDataBlock(
+ records,
+ false,
+ header,
+ HoodieRecord.RECORD_KEY_METADATA_FIELD,
+ CompressionCodecName.GZIP,
+ 0.1,
+ true);
+ default:
+ throw new RuntimeException(
+ "Unknown data block type " + dataBlockType);
+ }
+ }
+
+ public static HoodieDeleteBlock getDeleteBlock(
+ List<IndexedRecord> records,
+ Map<HoodieLogBlock.HeaderMetadataType, String> header,
+ Schema schema,
+ Properties props
+ ) {
+ List<HoodieRecord> hoodieRecords = records.stream()
+ .map(r -> {
+ String rowKey = (String)
r.get(r.getSchema().getField(ROW_KEY).pos());
+ String partitionPath = (String)
r.get(r.getSchema().getField(PARTITION_PATH).pos());
+ return new HoodieAvroIndexedRecord(new HoodieKey(rowKey,
partitionPath), r);
+ })
+ .collect(Collectors.toList());
+ return new HoodieDeleteBlock(
+ hoodieRecords.stream().map(
+ r -> Pair.of(DeleteRecord.create(
+ r.getKey(), r.getOrderingValue(schema, props)), -1L))
+ .collect(Collectors.toList()),
+ false,
+ header
+ );
+ }
+
+ public static HoodieBaseFile createBaseFile(
+ String baseFilePath,
+ List<IndexedRecord> records,
+ Schema schema,
+ String baseInstantTime
+ ) throws IOException {
+ Configuration hadoopConf = new Configuration();
+
+ // TODO: Optimize these hard-coded parameters for test purpose. (HUDI-7214)
+ BloomFilter filter = BloomFilterFactory.createBloomFilter(
+ 1000,
+ 0.0001,
+ 10000,
+ BloomFilterTypeCode.DYNAMIC_V0.name());
+ HoodieAvroWriteSupport<IndexedRecord> writeSupport = new
HoodieAvroWriteSupport<>(
+ new AvroSchemaConverter().convert(schema),
+ schema,
+ Option.of(filter),
+ new Properties());
+ HoodieParquetConfig<HoodieAvroWriteSupport> parquetConfig = new
HoodieParquetConfig(
+ writeSupport,
+ CompressionCodecName.GZIP,
+ ParquetWriter.DEFAULT_BLOCK_SIZE,
+ ParquetWriter.DEFAULT_PAGE_SIZE,
+ 1024 * 1024 * 1024,
+ hadoopConf,
+ 0.1,
+ true);
+
+ try (HoodieAvroParquetWriter writer = new HoodieAvroParquetWriter(
+ new Path(baseFilePath),
+ parquetConfig,
+ baseInstantTime,
+ new LocalTaskContextSupplier(),
+ true)) {
+ for (IndexedRecord record : records) {
+ writer.writeAvro(
+ (String) record.get(schema.getField(ROW_KEY).pos()), record);
+ }
+ }
+ return new HoodieBaseFile(baseFilePath);
+ }
+
+ public static HoodieLogFile createLogFile(
+ FileSystem fileSystem,
+ String logFilePath,
+ List<IndexedRecord> records,
+ Schema schema,
+ String fileId,
+ String logInstantTime,
+ int version,
+ HoodieLogBlock.HoodieLogBlockType blockType
+ ) throws InterruptedException, IOException {
+ try (HoodieLogFormat.Writer writer =
+ HoodieLogFormat.newWriterBuilder()
+ .onParentPath(new Path(logFilePath).getParent())
+ .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+ .withFileId(fileId)
+ .withDeltaCommit(logInstantTime)
+ .withLogVersion(version)
+ .withFs(fileSystem).build()) {
+ Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
+ header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME,
logInstantTime);
+ header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
+
+ if (blockType != DELETE_BLOCK) {
+ HoodieDataBlock dataBlock = getDataBlock(
+ blockType, records, header, new Path(logFilePath));
+ writer.appendBlock(dataBlock);
+ } else {
+ HoodieDeleteBlock deleteBlock = getDeleteBlock(
+ records, header, schema, PROPERTIES);
+ writer.appendBlock(deleteBlock);
+ }
+ }
+ return new HoodieLogFile(logFilePath);
+ }
+
+ /**
+ * Based on provided parameters to generate a {@link FileSlice} object.
+ */
+ public static FileSlice generateFileSlice(
+ FileSystem fileSystem,
+ String basePath,
+ String fileId,
+ String partitionPath,
+ Schema schema,
+ List<DataGenerationPlan> plans
+ ) throws IOException, InterruptedException {
+ assert (!plans.isEmpty());
+
+ HoodieBaseFile baseFile = null;
+ List<HoodieLogFile> logFiles = new ArrayList<>();
+
+ // Generate a base file with records.
+ DataGenerationPlan baseFilePlan = plans.get(0);
+ if (!baseFilePlan.getRecordKeys().isEmpty()) {
+ Path baseFilePath = generateBaseFilePath(
+ basePath, fileId, baseFilePlan.getInstantTime());
+ List<IndexedRecord> records = generateRecords(baseFilePlan);
+ baseFile = createBaseFile(
+ baseFilePath.toString(),
+ records,
+ schema,
+ baseFilePlan.getInstantTime());
+ }
+
+ // Rest of plans are for log files.
+ for (int i = 1; i < plans.size(); i++) {
+ DataGenerationPlan logFilePlan = plans.get(i);
+ if (logFilePlan.getRecordKeys().isEmpty()) {
+ continue;
+ }
+
+ Path logFile = generateLogFilePath(
+ basePath,fileId, logFilePlan.getInstantTime(), i);
+ List<IndexedRecord> records = generateRecords(logFilePlan);
+ HoodieLogBlock.HoodieLogBlockType blockType =
+ logFilePlan.getOperationType() == DELETE ? DELETE_BLOCK :
PARQUET_DATA_BLOCK;
+ logFiles.add(createLogFile(
+ fileSystem,
+ logFile.toString(),
+ records,
+ schema,
+ fileId,
+ logFilePlan.getInstantTime(),
+ i,
+ blockType));
+ }
+
+ // Assemble the FileSlice finally.
+ HoodieFileGroupId fileGroupId = new HoodieFileGroupId(partitionPath,
fileId);
+ String baseInstantTime = baseFile == null ? null :
baseFile.getCommitTime();
+ return new FileSlice(fileGroupId, baseInstantTime, baseFile, logFiles);
+ }
+
+ /**
+ * Generate a {@link FileSlice} object which contains a {@link
HoodieBaseFile} only.
+ */
+ public static Option<FileSlice> getBaseFileOnlyFileSlice(
+ FileSystem fileSystem,
+ KeyRange range,
+ long timestamp,
+ String basePath,
+ String partitionPath,
+ String fileId,
+ String baseInstantTime
+ ) throws IOException, InterruptedException {
+ List<String> keys = generateKeys(range);
+ List<DataGenerationPlan> plans = new ArrayList<>();
+ DataGenerationPlan baseFilePlan = DataGenerationPlan
+ .newBuilder()
+ .withRecordKeys(keys)
+ .withOperationType(INSERT)
+ .withPartitionPath(partitionPath)
+ .withTimeStamp(timestamp)
+ .withInstantTime(baseInstantTime)
+ .build();
+ plans.add(baseFilePlan);
+
+ return Option.of(generateFileSlice(
+ fileSystem,
+ basePath,
+ fileId,
+ partitionPath,
+ AVRO_SCHEMA,
+ plans));
+ }
+
+ /**
+ * Generate a regular {@link FileSlice} containing both a base file and a
number of log files.
+ */
+ public static Option<FileSlice> getFileSlice(
+ FileSystem fileSystem,
+ List<KeyRange> ranges,
+ List<Long> timestamps,
+ List<DataGenerationPlan.OperationType> operationTypes,
+ List<String> instantTimes,
+ String basePath,
+ String partitionPath,
+ String fileId
+ ) throws IOException, InterruptedException {
+ List<DataGenerationPlan> plans = new ArrayList<>();
+ for (int i = 0; i < ranges.size(); i++) {
+ List<String> keys = generateKeys(ranges.get(i));
+ plans.add(DataGenerationPlan.newBuilder()
+ .withOperationType(operationTypes.get(i))
+ .withPartitionPath(partitionPath)
+ .withRecordKeys(keys)
+ .withTimeStamp(timestamps.get(i))
+ .withInstantTime(instantTimes.get(i))
+ .build());
+ }
+
+ return Option.of(generateFileSlice(
+ fileSystem,
+ basePath,
+ fileId,
+ partitionPath,
+ AVRO_SCHEMA,
+ plans));
+ }
+}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
new file mode 100644
index 00000000000..5c7c2bd4c71
--- /dev/null
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.testutils.reader;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieAvroRecordMerger;
+import org.apache.hudi.common.model.HoodieEmptyRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.io.storage.HoodieAvroParquetReader;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.function.UnaryOperator;
+
+import static
org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
+import static
org.apache.hudi.common.testutils.reader.HoodieFileSliceTestUtils.ROW_KEY;
+
+public class HoodieTestReaderContext extends
HoodieReaderContext<IndexedRecord> {
+ @Override
+ public FileSystem getFs(String path, Configuration conf) {
+ return FSUtils.getFs(path, conf);
+ }
+
+ @Override
+ public ClosableIterator<IndexedRecord> getFileRecordIterator(
+ Path filePath,
+ long start,
+ long length,
+ Schema dataSchema,
+ Schema requiredSchema,
+ Configuration conf
+ ) throws IOException {
+ HoodieAvroParquetReader reader = new HoodieAvroParquetReader(conf,
filePath);
+ return reader.getIndexedRecordIterator(dataSchema, requiredSchema);
+ }
+
+ @Override
+ public IndexedRecord convertAvroRecord(IndexedRecord record) {
+ return record;
+ }
+
+ @Override
+ public HoodieRecordMerger getRecordMerger(String mergerStrategy) {
+ switch (mergerStrategy) {
+ case DEFAULT_MERGER_STRATEGY_UUID:
+ return new HoodieAvroRecordMerger();
+ default:
+ throw new HoodieException(
+ "The merger strategy UUID is not supported: " + mergerStrategy);
+ }
+ }
+
+ @Override
+ public Object getValue(IndexedRecord record, Schema schema, String
fieldName) {
+ return getFieldValueFromIndexedRecord(record, schema, fieldName);
+ }
+
+ @Override
+ public String getRecordKey(IndexedRecord record, Schema schema) {
+ return getFieldValueFromIndexedRecord(record, schema, ROW_KEY).toString();
+ }
+
+ @Override
+ public Comparable getOrderingValue(
+ Option<IndexedRecord> recordOpt,
+ Map<String, Object> metadataMap,
+ Schema schema,
+ TypedProperties props
+ ) {
+ if (metadataMap.containsKey(INTERNAL_META_ORDERING_FIELD)) {
+ return (Comparable) metadataMap.get(INTERNAL_META_ORDERING_FIELD);
+ }
+
+ if (!recordOpt.isPresent()) {
+ return 0;
+ }
+
+ String orderingFieldName = ConfigUtils.getOrderingField(props);
+ Object value = getFieldValueFromIndexedRecord(recordOpt.get(), schema,
orderingFieldName);
+ return value != null ? (Comparable) value : 0;
+ }
+
+ @Override
+ public HoodieRecord<IndexedRecord> constructHoodieRecord(
+ Option<IndexedRecord> recordOpt,
+ Map<String, Object> metadataMap
+ ) {
+ if (!recordOpt.isPresent()) {
+ HoodieKey key = new HoodieKey((String)
metadataMap.get(INTERNAL_META_RECORD_KEY),
+ (String) metadataMap.get(INTERNAL_META_PARTITION_PATH));
+ return new HoodieEmptyRecord<>(
+ key,
+ HoodieOperation.DELETE,
+ (Comparable<?>) metadataMap.get(INTERNAL_META_ORDERING_FIELD),
+ HoodieRecord.HoodieRecordType.AVRO);
+ }
+ return new HoodieAvroIndexedRecord(recordOpt.get());
+ }
+
+ @Override
+ public IndexedRecord seal(IndexedRecord record) {
+ Schema schema = record.getSchema();
+ GenericRecordBuilder builder = new GenericRecordBuilder(schema);
+ for (Schema.Field field : schema.getFields()) {
+ builder.set(field, record.get(field.pos()));
+ }
+ return builder.build();
+ }
+
+ @Override
+ public ClosableIterator<IndexedRecord>
mergeBootstrapReaders(ClosableIterator<IndexedRecord> skeletonFileIterator,
ClosableIterator<IndexedRecord> dataFileIterator) {
+ return null;
+ }
+
+ @Override
+ public UnaryOperator<IndexedRecord> projectRecord(Schema from, Schema to) {
+ return null;
+ }
+
+ private Object getFieldValueFromIndexedRecord(
+ IndexedRecord record,
+ Schema recordSchema,
+ String fieldName
+ ) {
+ Schema.Field field = recordSchema.getField(fieldName);
+ int pos = field.pos();
+ return record.get(pos);
+ }
+}