This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new be2e95c9049 [HUDI-7176] Improve fg reader test framework (#10324)
be2e95c9049 is described below

commit be2e95c9049f0c1e902909f6abeaf7fa42946312
Author: Lin Liu <[email protected]>
AuthorDate: Mon Dec 18 18:53:12 2023 -0800

    [HUDI-7176] Improve fg reader test framework (#10324)
    
    Changes:
    
    1. Uses EmptyHoodieRecordPayload instead of HoodieEmptyRecord to represent 
a delete record.
    2. Implements "projectRecord" function in reader context.
    3. Capsulates more logic into the framwork to simplify the user logic.
---
 .../reader/HoodieFileGroupReaderTestHarness.java   | 123 +++++++++++++++++++++
 .../testutils/reader/HoodieTestReaderContext.java  |  69 ++++++++++--
 2 files changed, 181 insertions(+), 11 deletions(-)

diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestHarness.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestHarness.java
new file mode 100644
index 00000000000..793a23bea51
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestHarness.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.testutils.reader;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.junit.jupiter.api.AfterAll;
+
+import java.io.IOException;
+import java.util.List;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA;
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultHadoopConf;
+
+public class HoodieFileGroupReaderTestHarness extends HoodieCommonTestHarness {
+  protected static final String PARTITION_PATH = "any-partition-path";
+  protected static final String FILE_ID = "any-file-1";
+  // Set the key range for base file and log files.
+  protected static List<HoodieFileSliceTestUtils.KeyRange> keyRanges;
+  // Set the ordering field for each record set.
+  protected static List<Long> timestamps;
+  // Set the record types for each record set.
+  protected static List<DataGenerationPlan.OperationType> operationTypes;
+  // Set the instantTime for each record set.
+  protected static List<String> instantTimes;
+
+  // Environmental variables.
+  protected static Configuration hadoopConf;
+  protected static HoodieTestTable testTable;
+  protected static HoodieReaderContext<IndexedRecord> readerContext;
+  protected static TypedProperties properties;
+
+  static {
+    // Note: Make `timestamp` as ordering field.
+    properties = new TypedProperties();
+    properties.setProperty(
+        "hoodie.datasource.write.precombine.field", "timestamp");
+    hadoopConf = getDefaultHadoopConf();
+    readerContext = new HoodieTestReaderContext(Option.empty());
+  }
+
+  @AfterAll
+  public static void tearDown() throws IOException {
+    FileSystem.closeAll();
+  }
+
+  /**
+   * Assume the test is for MOR tables by default.
+   */
+  @Override
+  protected HoodieTableType getTableType() {
+    return HoodieTableType.MERGE_ON_READ;
+  }
+
+  protected void setUpMockCommits() throws Exception {
+    for (String instantTime : instantTimes) {
+      testTable.addDeltaCommit(instantTime);
+    }
+  }
+
+  protected ClosableIterator<IndexedRecord> getFileGroupIterator(int numFiles)
+      throws IOException, InterruptedException {
+    assert (numFiles >= 1 && numFiles <= keyRanges.size());
+
+    Option<FileSlice> fileSliceOpt =
+        HoodieFileSliceTestUtils.getFileSlice(
+            readerContext.getFs(basePath, hadoopConf),
+            keyRanges.subList(0, numFiles),
+            timestamps.subList(0, numFiles),
+            operationTypes.subList(0, numFiles),
+            instantTimes.subList(0, numFiles),
+            basePath,
+            PARTITION_PATH,
+            FILE_ID
+        );
+
+    HoodieFileGroupReader<IndexedRecord> fileGroupReader =
+        HoodieFileGroupReaderTestUtils.createFileGroupReader(
+            fileSliceOpt,
+            basePath,
+            "1000", // Not used internally.
+            AVRO_SCHEMA,
+            false,
+            0L,
+            Long.MAX_VALUE,
+            properties,
+            hadoopConf,
+            metaClient.getTableConfig(),
+            readerContext
+        );
+
+    fileGroupReader.initRecordIterators();
+    return fileGroupReader.getClosableIterator();
+  }
+}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
index 5c7c2bd4c71..0546396a452 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
@@ -22,20 +22,20 @@ package org.apache.hudi.common.testutils.reader;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieAvroRecordMerger;
-import org.apache.hudi.common.model.HoodieEmptyRecord;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.SpillableMapUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.io.storage.HoodieAvroParquetReader;
 
 import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.conf.Configuration;
@@ -43,13 +43,22 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static 
org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
 import static 
org.apache.hudi.common.testutils.reader.HoodieFileSliceTestUtils.ROW_KEY;
 
 public class HoodieTestReaderContext extends 
HoodieReaderContext<IndexedRecord> {
+  private Option<HoodieRecordMerger> customMerger;
+
+  public HoodieTestReaderContext(Option<HoodieRecordMerger> customMerger) {
+    this.customMerger = customMerger;
+  }
+
   @Override
   public FileSystem getFs(String path, Configuration conf) {
     return FSUtils.getFs(path, conf);
@@ -75,6 +84,12 @@ public class HoodieTestReaderContext extends 
HoodieReaderContext<IndexedRecord>
 
   @Override
   public HoodieRecordMerger getRecordMerger(String mergerStrategy) {
+    // Utilize the custom merger if provided.
+    if (customMerger.isPresent()) {
+      return customMerger.get();
+    }
+
+    // Otherwise.
     switch (mergerStrategy) {
       case DEFAULT_MERGER_STRATEGY_UUID:
         return new HoodieAvroRecordMerger();
@@ -115,18 +130,16 @@ public class HoodieTestReaderContext extends 
HoodieReaderContext<IndexedRecord>
   }
 
   @Override
-  public HoodieRecord<IndexedRecord> constructHoodieRecord(
+  public HoodieRecord constructHoodieRecord(
       Option<IndexedRecord> recordOpt,
       Map<String, Object> metadataMap
   ) {
     if (!recordOpt.isPresent()) {
-      HoodieKey key = new HoodieKey((String) 
metadataMap.get(INTERNAL_META_RECORD_KEY),
-          (String) metadataMap.get(INTERNAL_META_PARTITION_PATH));
-      return new HoodieEmptyRecord<>(
-          key,
-          HoodieOperation.DELETE,
+      return SpillableMapUtils.generateEmptyPayload(
+          (String) metadataMap.get(INTERNAL_META_RECORD_KEY),
+          (String) metadataMap.get(INTERNAL_META_PARTITION_PATH),
           (Comparable<?>) metadataMap.get(INTERNAL_META_ORDERING_FIELD),
-          HoodieRecord.HoodieRecordType.AVRO);
+          DefaultHoodieRecordPayload.class.getName());
     }
     return new HoodieAvroIndexedRecord(recordOpt.get());
   }
@@ -148,7 +161,41 @@ public class HoodieTestReaderContext extends 
HoodieReaderContext<IndexedRecord>
 
   @Override
   public UnaryOperator<IndexedRecord> projectRecord(Schema from, Schema to) {
-    return null;
+    Map<String, Integer> fromFields = IntStream.range(0, 
from.getFields().size())
+        .boxed()
+        .collect(Collectors.toMap(
+            i -> from.getFields().get(i).name(), i -> i));
+    Map<String, Integer> toFields = IntStream.range(0, to.getFields().size())
+        .boxed()
+        .collect(Collectors.toMap(
+            i -> to.getFields().get(i).name(), i -> i));
+
+    // Check if source schema contains all fields from target schema.
+    List<Schema.Field> missingFields = to.getFields().stream()
+        .filter(f -> 
!fromFields.containsKey(f.name())).collect(Collectors.toList());
+    if (!missingFields.isEmpty()) {
+      throw new HoodieException("There are some fields missing in source 
schema: "
+          + missingFields);
+    }
+
+    // Build the mapping from source schema to target schema.
+    Map<Integer, Integer> fieldMap = toFields.entrySet().stream()
+        .filter(e -> fromFields.containsKey(e.getKey()))
+        .collect(Collectors.toMap(
+            e -> fromFields.get(e.getKey()), Map.Entry::getValue));
+
+    // Do the transformation.
+    return record -> {
+      IndexedRecord outputRecord = new GenericData.Record(to);
+      for (int i = 0; i < from.getFields().size(); i++) {
+        if (!fieldMap.containsKey(i)) {
+          continue;
+        }
+        int j = fieldMap.get(i);
+        outputRecord.put(j, record.get(i));
+      }
+      return outputRecord;
+    };
   }
 
   private Object getFieldValueFromIndexedRecord(

Reply via email to