This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 47ad41575de [HUDI-7176] Add file group reader test framework (#10263)
47ad41575de is described below

commit 47ad41575defafae3024312e801609b80eb9e900
Author: Lin Liu <[email protected]>
AuthorDate: Tue Dec 12 13:12:33 2023 -0800

    [HUDI-7176] Add file group reader test framework (#10263)
    
    Changes:
    1. Add a builder class to construct file group reader.
    2. Add an indexedRecord based reader context.
    3. Implement rest of functions for the file group reader utils.
    4. Add a util class for generating FileSlice.
---
 .../hudi/common/engine/HoodieReaderContext.java    |   3 +-
 .../common/table/read/HoodieFileGroupReader.java   |   8 +-
 .../hudi/common/testutils/FileSystemTestUtils.java |   6 +
 .../testutils/reader/DataGenerationPlan.java       | 126 ++++++
 .../reader/HoodieFileGroupReaderTestUtils.java     | 127 ++++++
 .../testutils/reader/HoodieFileSliceTestUtils.java | 440 +++++++++++++++++++++
 .../testutils/reader/HoodieTestReaderContext.java  | 163 ++++++++
 7 files changed, 867 insertions(+), 6 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index 7b8b2888983..8daf3f441f7 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.function.UnaryOperator;
@@ -78,7 +79,7 @@ public abstract class HoodieReaderContext<T> {
    * @return {@link ClosableIterator<T>} that can return all records through 
iteration.
    */
   public abstract ClosableIterator<T> getFileRecordIterator(
-      Path filePath, long start, long length, Schema dataSchema, Schema 
requiredSchema, Configuration conf);
+      Path filePath, long start, long length, Schema dataSchema, Schema 
requiredSchema, Configuration conf) throws IOException;
 
   /**
    * Converts an Avro record, e.g., serialized in the log files, to an 
engine-specific record.
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index 8413ef8a5e2..52ee14d969e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -139,14 +139,12 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
         : new HoodieKeyBasedFileGroupRecordBuffer<>(
         readerContext, requiredSchema, requiredSchema, Option.empty(), 
Option.empty(),
         recordMerger, props);
-
-
   }
 
   /**
    * Initialize internal iterators on the base and log files.
    */
-  public void initRecordIterators() {
+  public void initRecordIterators() throws IOException {
     ClosableIterator<T> iter = makeBaseFileIterator();
     if (logFiles.isEmpty()) {
       this.baseFileIterator = CachingIterator.wrap(iter, readerContext);
@@ -157,7 +155,7 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
     }
   }
 
-  private ClosableIterator<T> makeBaseFileIterator() {
+  private ClosableIterator<T> makeBaseFileIterator() throws IOException {
     if (!hoodieBaseFileOption.isPresent()) {
       return new EmptyIterator<>();
     }
@@ -225,7 +223,7 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
     return newSchema;
   }
 
-  private ClosableIterator<T> makeBootstrapBaseFileIterator(HoodieBaseFile 
baseFile) {
+  private ClosableIterator<T> makeBootstrapBaseFileIterator(HoodieBaseFile 
baseFile) throws IOException {
     BaseFile dataFile = baseFile.getBootstrapBaseFile().get();
     Pair<List<Schema.Field>,List<Schema.Field>> requiredFields = 
getDataAndMetaCols(requiredSchema);
     Pair<List<Schema.Field>,List<Schema.Field>> allFields = 
getDataAndMetaCols(dataSchema);
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileSystemTestUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileSystemTestUtils.java
index e73f2bb0440..a00e215f10f 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileSystemTestUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileSystemTestUtils.java
@@ -61,6 +61,12 @@ public class FileSystemTestUtils {
     return new Path(FILE_SCHEME + fileSuffix);
   }
 
+  public static Path getRandomOuterFSPath(String extension) {
+    String randomFileName = UUID.randomUUID().toString();
+    String fileSuffix = COLON + FORWARD_SLASH + TEMP + FORWARD_SLASH + 
randomFileName;
+    return new Path(FILE_SCHEME + fileSuffix + "." + extension);
+  }
+
   public static Path getPhantomFile(Path outerPath, long startOffset, long 
inlineLength) {
     // Generate phantom inline file
     return InLineFSUtils.getInlineFilePath(outerPath, FILE_SCHEME, 
startOffset, inlineLength);
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/DataGenerationPlan.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/DataGenerationPlan.java
new file mode 100644
index 00000000000..29b0090a5db
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/DataGenerationPlan.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.testutils.reader;
+
+import java.util.List;
+
+/**
+ * The blueprint of records that will be generated
+ * by the data generator.
+ *
+ * Current limitations:
+ * 1. One plan generates one file, either a base file, or a log file.
+ * 2. One file contains one operation, e.g., insert, delete, or update.
+ */
+public class DataGenerationPlan {
+  // The values for "_row_key" field.
+  private final List<String> recordKeys;
+  // The partition path for all records.
+  private final String partitionPath;
+  // The ordering field.
+  private final long timestamp;
+  // The operation type of the record.
+  private final OperationType operationType;
+  private final String instantTime;
+
+  public enum OperationType {
+    INSERT,
+    UPDATE,
+    DELETE
+  }
+
+  public DataGenerationPlan(List<String> recordKeys,
+                            String partitionPath,
+                            long timestamp,
+                            OperationType operationType,
+                            String instantTime) {
+    this.recordKeys = recordKeys;
+    this.partitionPath = partitionPath;
+    this.timestamp = timestamp;
+    this.operationType = operationType;
+    this.instantTime = instantTime;
+  }
+
+  public List<String> getRecordKeys() {
+    return recordKeys;
+  }
+
+  public String getPartitionPath() {
+    return partitionPath;
+  }
+
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  public OperationType getOperationType() {
+    return operationType;
+  }
+
+  public String getInstantTime() {
+    return instantTime;
+  }
+
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  public static class Builder {
+    private List<String> recordKeys;
+    private String partitionPath;
+    private long timestamp;
+    private OperationType operationType;
+    private String instantTime;
+
+    public Builder withRecordKeys(List<String> recordKeys) {
+      this.recordKeys = recordKeys;
+      return this;
+    }
+
+    public Builder withPartitionPath(String partitionPath) {
+      this.partitionPath = partitionPath;
+      return this;
+    }
+
+    public Builder withTimeStamp(long timestamp) {
+      this.timestamp = timestamp;
+      return this;
+    }
+
+    public Builder withOperationType(OperationType operationType) {
+      this.operationType = operationType;
+      return this;
+    }
+
+    public Builder withInstantTime(String instantTime) {
+      this.instantTime = instantTime;
+      return this;
+    }
+
+    public DataGenerationPlan build() {
+      return new DataGenerationPlan(
+          recordKeys,
+          partitionPath,
+          timestamp,
+          operationType,
+          instantTime);
+    }
+  }
+}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestUtils.java
new file mode 100644
index 00000000000..39ddc42bc37
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestUtils.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.testutils.reader;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+
+public class HoodieFileGroupReaderTestUtils {
+  public static HoodieFileGroupReader<IndexedRecord> createFileGroupReader(
+      Option<FileSlice> fileSliceOpt,
+      String basePath,
+      String latestCommitTime,
+      Schema schema,
+      boolean shouldUseRecordPosition,
+      long start,
+      long length,
+      TypedProperties properties,
+      Configuration hadoopConf,
+      HoodieTableConfig tableConfig,
+      HoodieReaderContext<IndexedRecord> readerContext
+  ) {
+    assert (fileSliceOpt.isPresent());
+    return new HoodieFileGroupReaderBuilder()
+        .withReaderContext(readerContext)
+        .withHadoopConf(hadoopConf)
+        .withFileSlice(fileSliceOpt.get())
+        .withStart(start)
+        .withLength(length)
+        .withProperties(properties)
+        .withTableConfig(tableConfig)
+        .build(basePath, latestCommitTime, schema, shouldUseRecordPosition);
+  }
+
+  public static class HoodieFileGroupReaderBuilder {
+    private HoodieReaderContext<IndexedRecord> readerContext;
+    private FileSlice fileSlice;
+    private Configuration hadoopConf;
+    private TypedProperties props;
+    private long start;
+    private long length;
+    private HoodieTableConfig tableConfig;
+
+    public HoodieFileGroupReaderBuilder withReaderContext(
+        HoodieReaderContext<IndexedRecord> context) {
+      this.readerContext = context;
+      return this;
+    }
+
+    public HoodieFileGroupReaderBuilder withFileSlice(FileSlice fileSlice) {
+      this.fileSlice = fileSlice;
+      return this;
+    }
+
+    public HoodieFileGroupReaderBuilder withHadoopConf(Configuration 
hadoopConf) {
+      this.hadoopConf = hadoopConf;
+      return this;
+    }
+
+    public HoodieFileGroupReaderBuilder withProperties(TypedProperties props) {
+      this.props = props;
+      return this;
+    }
+
+    public HoodieFileGroupReaderBuilder withStart(long start) {
+      this.start = start;
+      return this;
+    }
+
+    public HoodieFileGroupReaderBuilder withLength(long length) {
+      this.length = length;
+      return this;
+    }
+
+    public HoodieFileGroupReaderBuilder withTableConfig(
+        HoodieTableConfig tableConfig
+    ) {
+      this.tableConfig = tableConfig;
+      return this;
+    }
+
+    public HoodieFileGroupReader<IndexedRecord> build(
+        String basePath,
+        String latestCommitTime,
+        Schema schema,
+        boolean shouldUseRecordPosition
+    ) {
+      return new HoodieFileGroupReader<>(
+          readerContext,
+          hadoopConf,
+          basePath,
+          latestCommitTime,
+          fileSlice,
+          schema,
+          schema,
+          props,
+          tableConfig,
+          start,
+          length,
+          shouldUseRecordPosition);
+    }
+  }
+}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
new file mode 100644
index 00000000000..ce8ad3ac8c7
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.testutils.reader;
+
+import org.apache.hudi.avro.HoodieAvroWriteSupport;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterFactory;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.LocalTaskContextSupplier;
+import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieCDCDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
+import org.apache.hudi.common.table.log.block.HoodieHFileDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.io.storage.HoodieAvroParquetWriter;
+import org.apache.hudi.io.storage.HoodieParquetConfig;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType.DELETE_BLOCK;
+import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType.PARQUET_DATA_BLOCK;
+import static org.apache.hudi.common.testutils.FileCreateUtils.baseFileName;
+import static org.apache.hudi.common.testutils.FileCreateUtils.logFileName;
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA;
+import static 
org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.DELETE;
+import static 
org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.INSERT;
+
+public class HoodieFileSliceTestUtils {
+  public static final String FORWARD_SLASH = "/";
+  public static final String PARQUET = ".parquet";
+
+  public static final String DRIVER = "driver";
+  public static final String PARTITION_PATH = "partition_path";
+  public static final String RIDER = "rider";
+  public static final String ROW_KEY = "_row_key";
+  public static final String TIMESTAMP = "timestamp";
+  public static final HoodieTestDataGenerator DATA_GEN =
+      new HoodieTestDataGenerator(0xDEED);
+  public static final TypedProperties PROPERTIES = new TypedProperties();
+
+  static {
+    PROPERTIES.setProperty(
+        "hoodie.datasource.write.precombine.field", "timestamp");
+  }
+
+  // We use a number to represent a record key, and a (start, end) range
+  // to represent a set of record keys between start <= k <= end.
+  public static class KeyRange {
+    public int start;
+    public int end;
+
+    public KeyRange(int start, int end) {
+      this.start = start;
+      this.end = end;
+    }
+  }
+
+  private static Path generateBaseFilePath(
+      String basePath,
+      String fileId,
+      String instantTime
+  ) {
+    return new Path(
+        basePath + FORWARD_SLASH
+        + baseFileName(instantTime, fileId, PARQUET));
+  }
+
+  private static Path generateLogFilePath(
+      String basePath,
+      String fileId,
+      String instantTime,
+      int version) {
+    return new Path(
+        basePath + FORWARD_SLASH + logFileName(
+        instantTime, fileId, version));
+  }
+
+  // Note:
+  // "start < end" means start <= k <= end.
+  // "start == end" means k = start.
+  // "start > end" means no keys.
+  private static List<String> generateKeys(KeyRange range) {
+    List<String> keys = new ArrayList<>();
+    if (range.start == range.end) {
+      keys.add(String.valueOf(range.start));
+    } else {
+      keys = IntStream
+          .rangeClosed(range.start, range.end)
+          .boxed()
+          .map(String::valueOf).collect(Collectors.toList());
+    }
+    return keys;
+  }
+
+  private static List<IndexedRecord> generateRecords(DataGenerationPlan plan) {
+    List<IndexedRecord> records = new ArrayList<>();
+    List<String> keys = plan.getRecordKeys();
+    for (String key : keys) {
+      records.add(DATA_GEN.generateGenericRecord(
+          key,
+          plan.getPartitionPath(),
+          RIDER + "." + UUID.randomUUID(),
+          DRIVER + "." + UUID.randomUUID(),
+          plan.getTimestamp(),
+          plan.getOperationType() == DELETE,
+          false
+      ));
+    }
+    return records;
+  }
+
+  private static HoodieDataBlock getDataBlock(
+      HoodieLogBlock.HoodieLogBlockType dataBlockType,
+      List<IndexedRecord> records,
+      Map<HoodieLogBlock.HeaderMetadataType, String> header,
+      Path logFilePath
+  ) {
+    return createDataBlock(
+        dataBlockType,
+        records.stream().map(HoodieAvroIndexedRecord::new)
+            .collect(Collectors.toList()),
+        header,
+        logFilePath);
+  }
+
+  private static HoodieDataBlock createDataBlock(
+      HoodieLogBlock.HoodieLogBlockType dataBlockType,
+      List<HoodieRecord> records,
+      Map<HoodieLogBlock.HeaderMetadataType, String> header,
+      Path pathForReader
+  ) {
+    switch (dataBlockType) {
+      case CDC_DATA_BLOCK:
+        return new HoodieCDCDataBlock(
+            records,
+            header,
+            HoodieRecord.RECORD_KEY_METADATA_FIELD);
+      case AVRO_DATA_BLOCK:
+        return new HoodieAvroDataBlock(
+            records,
+            false,
+            header,
+            HoodieRecord.RECORD_KEY_METADATA_FIELD);
+      case HFILE_DATA_BLOCK:
+        return new HoodieHFileDataBlock(
+            records,
+            header,
+            Compression.Algorithm.GZ,
+            pathForReader);
+      case PARQUET_DATA_BLOCK:
+        return new HoodieParquetDataBlock(
+            records,
+            false,
+            header,
+            HoodieRecord.RECORD_KEY_METADATA_FIELD,
+            CompressionCodecName.GZIP,
+            0.1,
+            true);
+      default:
+        throw new RuntimeException(
+            "Unknown data block type " + dataBlockType);
+    }
+  }
+
+  public static HoodieDeleteBlock getDeleteBlock(
+      List<IndexedRecord> records,
+      Map<HoodieLogBlock.HeaderMetadataType, String> header,
+      Schema schema,
+      Properties props
+  ) {
+    List<HoodieRecord> hoodieRecords = records.stream()
+        .map(r -> {
+          String rowKey = (String) 
r.get(r.getSchema().getField(ROW_KEY).pos());
+          String partitionPath = (String) 
r.get(r.getSchema().getField(PARTITION_PATH).pos());
+          return new HoodieAvroIndexedRecord(new HoodieKey(rowKey, 
partitionPath), r);
+        })
+        .collect(Collectors.toList());
+    return new HoodieDeleteBlock(
+        hoodieRecords.stream().map(
+            r -> Pair.of(DeleteRecord.create(
+                r.getKey(), r.getOrderingValue(schema, props)), -1L))
+            .collect(Collectors.toList()),
+        false,
+        header
+    );
+  }
+
+  public static HoodieBaseFile createBaseFile(
+      String baseFilePath,
+      List<IndexedRecord> records,
+      Schema schema,
+      String baseInstantTime
+  ) throws IOException {
+    Configuration hadoopConf = new Configuration();
+
+    // TODO: Optimize these hard-coded parameters for test purpose. (HUDI-7214)
+    BloomFilter filter = BloomFilterFactory.createBloomFilter(
+        1000,
+        0.0001,
+        10000,
+        BloomFilterTypeCode.DYNAMIC_V0.name());
+    HoodieAvroWriteSupport<IndexedRecord> writeSupport = new 
HoodieAvroWriteSupport<>(
+        new AvroSchemaConverter().convert(schema),
+        schema,
+        Option.of(filter),
+        new Properties());
+    HoodieParquetConfig<HoodieAvroWriteSupport> parquetConfig = new 
HoodieParquetConfig(
+        writeSupport,
+        CompressionCodecName.GZIP,
+        ParquetWriter.DEFAULT_BLOCK_SIZE,
+        ParquetWriter.DEFAULT_PAGE_SIZE,
+        1024 * 1024 * 1024,
+        hadoopConf,
+        0.1,
+        true);
+
+    try (HoodieAvroParquetWriter writer = new HoodieAvroParquetWriter(
+        new Path(baseFilePath),
+        parquetConfig,
+        baseInstantTime,
+        new LocalTaskContextSupplier(),
+        true)) {
+      for (IndexedRecord record : records) {
+        writer.writeAvro(
+            (String) record.get(schema.getField(ROW_KEY).pos()), record);
+      }
+    }
+    return new HoodieBaseFile(baseFilePath);
+  }
+
+  public static HoodieLogFile createLogFile(
+      FileSystem fileSystem,
+      String logFilePath,
+      List<IndexedRecord> records,
+      Schema schema,
+      String fileId,
+      String logInstantTime,
+      int version,
+      HoodieLogBlock.HoodieLogBlockType blockType
+  ) throws InterruptedException, IOException {
+    try (HoodieLogFormat.Writer writer =
+             HoodieLogFormat.newWriterBuilder()
+                 .onParentPath(new Path(logFilePath).getParent())
+                 .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+                 .withFileId(fileId)
+                 .withDeltaCommit(logInstantTime)
+                 .withLogVersion(version)
+                 .withFs(fileSystem).build()) {
+      Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
+      header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, 
logInstantTime);
+      header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
+
+      if (blockType != DELETE_BLOCK) {
+        HoodieDataBlock dataBlock = getDataBlock(
+            blockType, records, header, new Path(logFilePath));
+        writer.appendBlock(dataBlock);
+      } else {
+        HoodieDeleteBlock deleteBlock = getDeleteBlock(
+            records, header, schema, PROPERTIES);
+        writer.appendBlock(deleteBlock);
+      }
+    }
+    return new HoodieLogFile(logFilePath);
+  }
+
+  /**
+   * Based on provided parameters to generate a {@link FileSlice} object.
+   */
+  public static FileSlice generateFileSlice(
+      FileSystem fileSystem,
+      String basePath,
+      String fileId,
+      String partitionPath,
+      Schema schema,
+      List<DataGenerationPlan> plans
+  ) throws IOException, InterruptedException {
+    assert (!plans.isEmpty());
+
+    HoodieBaseFile baseFile = null;
+    List<HoodieLogFile> logFiles = new ArrayList<>();
+
+    // Generate a base file with records.
+    DataGenerationPlan baseFilePlan = plans.get(0);
+    if (!baseFilePlan.getRecordKeys().isEmpty()) {
+      Path baseFilePath = generateBaseFilePath(
+          basePath, fileId, baseFilePlan.getInstantTime());
+      List<IndexedRecord> records = generateRecords(baseFilePlan);
+      baseFile = createBaseFile(
+          baseFilePath.toString(),
+          records,
+          schema,
+          baseFilePlan.getInstantTime());
+    }
+
+    // Rest of plans are for log files.
+    for (int i = 1; i < plans.size(); i++) {
+      DataGenerationPlan logFilePlan = plans.get(i);
+      if (logFilePlan.getRecordKeys().isEmpty()) {
+        continue;
+      }
+
+      Path logFile = generateLogFilePath(
+          basePath,fileId, logFilePlan.getInstantTime(), i);
+      List<IndexedRecord> records = generateRecords(logFilePlan);
+      HoodieLogBlock.HoodieLogBlockType blockType =
+          logFilePlan.getOperationType() == DELETE ? DELETE_BLOCK : 
PARQUET_DATA_BLOCK;
+      logFiles.add(createLogFile(
+          fileSystem,
+          logFile.toString(),
+          records,
+          schema,
+          fileId,
+          logFilePlan.getInstantTime(),
+          i,
+          blockType));
+    }
+
+    // Assemble the FileSlice finally.
+    HoodieFileGroupId fileGroupId = new HoodieFileGroupId(partitionPath, 
fileId);
+    String baseInstantTime = baseFile == null ? null : 
baseFile.getCommitTime();
+    return new FileSlice(fileGroupId, baseInstantTime, baseFile, logFiles);
+  }
+
+  /**
+   * Generate a {@link FileSlice} object which contains a {@link 
HoodieBaseFile} only.
+   */
+  public static Option<FileSlice> getBaseFileOnlyFileSlice(
+      FileSystem fileSystem,
+      KeyRange range,
+      long timestamp,
+      String basePath,
+      String partitionPath,
+      String fileId,
+      String baseInstantTime
+  ) throws IOException, InterruptedException {
+    List<String> keys = generateKeys(range);
+    List<DataGenerationPlan> plans = new ArrayList<>();
+    DataGenerationPlan baseFilePlan = DataGenerationPlan
+        .newBuilder()
+        .withRecordKeys(keys)
+        .withOperationType(INSERT)
+        .withPartitionPath(partitionPath)
+        .withTimeStamp(timestamp)
+        .withInstantTime(baseInstantTime)
+        .build();
+    plans.add(baseFilePlan);
+
+    return Option.of(generateFileSlice(
+        fileSystem,
+        basePath,
+        fileId,
+        partitionPath,
+        AVRO_SCHEMA,
+        plans));
+  }
+
+  /**
+   * Generate a regular {@link FileSlice} containing both a base file and a 
number of log files.
+   */
+  public static Option<FileSlice> getFileSlice(
+      FileSystem fileSystem,
+      List<KeyRange> ranges,
+      List<Long> timestamps,
+      List<DataGenerationPlan.OperationType> operationTypes,
+      List<String> instantTimes,
+      String basePath,
+      String partitionPath,
+      String fileId
+  ) throws IOException, InterruptedException {
+    List<DataGenerationPlan> plans = new ArrayList<>();
+    for (int i = 0; i < ranges.size(); i++) {
+      List<String> keys = generateKeys(ranges.get(i));
+      plans.add(DataGenerationPlan.newBuilder()
+          .withOperationType(operationTypes.get(i))
+          .withPartitionPath(partitionPath)
+          .withRecordKeys(keys)
+          .withTimeStamp(timestamps.get(i))
+          .withInstantTime(instantTimes.get(i))
+          .build());
+    }
+
+    return Option.of(generateFileSlice(
+        fileSystem,
+        basePath,
+        fileId,
+        partitionPath,
+        AVRO_SCHEMA,
+        plans));
+  }
+}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
new file mode 100644
index 00000000000..5c7c2bd4c71
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.testutils.reader;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieAvroRecordMerger;
+import org.apache.hudi.common.model.HoodieEmptyRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.io.storage.HoodieAvroParquetReader;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.function.UnaryOperator;
+
+import static 
org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
+import static 
org.apache.hudi.common.testutils.reader.HoodieFileSliceTestUtils.ROW_KEY;
+
+public class HoodieTestReaderContext extends 
HoodieReaderContext<IndexedRecord> {
+  @Override
+  public FileSystem getFs(String path, Configuration conf) {
+    return FSUtils.getFs(path, conf);
+  }
+
+  @Override
+  public ClosableIterator<IndexedRecord> getFileRecordIterator(
+      Path filePath,
+      long start,
+      long length,
+      Schema dataSchema,
+      Schema requiredSchema,
+      Configuration conf
+  ) throws IOException {
+    HoodieAvroParquetReader reader = new HoodieAvroParquetReader(conf, 
filePath);
+    return reader.getIndexedRecordIterator(dataSchema, requiredSchema);
+  }
+
+  @Override
+  public IndexedRecord convertAvroRecord(IndexedRecord record) {
+    return record;
+  }
+
+  @Override
+  public HoodieRecordMerger getRecordMerger(String mergerStrategy) {
+    switch (mergerStrategy) {
+      case DEFAULT_MERGER_STRATEGY_UUID:
+        return new HoodieAvroRecordMerger();
+      default:
+        throw new HoodieException(
+            "The merger strategy UUID is not supported: " + mergerStrategy);
+    }
+  }
+
+  @Override
+  public Object getValue(IndexedRecord record, Schema schema, String 
fieldName) {
+    return getFieldValueFromIndexedRecord(record, schema, fieldName);
+  }
+
+  @Override
+  public String getRecordKey(IndexedRecord record, Schema schema) {
+    return getFieldValueFromIndexedRecord(record, schema, ROW_KEY).toString();
+  }
+
+  @Override
+  public Comparable getOrderingValue(
+      Option<IndexedRecord> recordOpt,
+      Map<String, Object> metadataMap,
+      Schema schema,
+      TypedProperties props
+  ) {
+    if (metadataMap.containsKey(INTERNAL_META_ORDERING_FIELD)) {
+      return (Comparable) metadataMap.get(INTERNAL_META_ORDERING_FIELD);
+    }
+
+    if (!recordOpt.isPresent()) {
+      return 0;
+    }
+
+    String orderingFieldName = ConfigUtils.getOrderingField(props);
+    Object value = getFieldValueFromIndexedRecord(recordOpt.get(), schema, 
orderingFieldName);
+    return value != null ? (Comparable) value : 0;
+  }
+
+  @Override
+  public HoodieRecord<IndexedRecord> constructHoodieRecord(
+      Option<IndexedRecord> recordOpt,
+      Map<String, Object> metadataMap
+  ) {
+    if (!recordOpt.isPresent()) {
+      HoodieKey key = new HoodieKey((String) 
metadataMap.get(INTERNAL_META_RECORD_KEY),
+          (String) metadataMap.get(INTERNAL_META_PARTITION_PATH));
+      return new HoodieEmptyRecord<>(
+          key,
+          HoodieOperation.DELETE,
+          (Comparable<?>) metadataMap.get(INTERNAL_META_ORDERING_FIELD),
+          HoodieRecord.HoodieRecordType.AVRO);
+    }
+    return new HoodieAvroIndexedRecord(recordOpt.get());
+  }
+
+  @Override
+  public IndexedRecord seal(IndexedRecord record) {
+    Schema schema = record.getSchema();
+    GenericRecordBuilder builder = new GenericRecordBuilder(schema);
+    for (Schema.Field field : schema.getFields()) {
+      builder.set(field, record.get(field.pos()));
+    }
+    return builder.build();
+  }
+
+  @Override
+  public ClosableIterator<IndexedRecord> 
mergeBootstrapReaders(ClosableIterator<IndexedRecord> skeletonFileIterator, 
ClosableIterator<IndexedRecord> dataFileIterator) {
+    return null;
+  }
+
+  @Override
+  public UnaryOperator<IndexedRecord> projectRecord(Schema from, Schema to) {
+    return null;
+  }
+
+  private Object getFieldValueFromIndexedRecord(
+      IndexedRecord record,
+      Schema recordSchema,
+      String fieldName
+  ) {
+    Schema.Field field = recordSchema.getField(fieldName);
+    int pos = field.pos();
+    return record.get(pos);
+  }
+}


Reply via email to