This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new be2e95c9049 [HUDI-7176] Improve fg reader test framework (#10324)
be2e95c9049 is described below
commit be2e95c9049f0c1e902909f6abeaf7fa42946312
Author: Lin Liu <[email protected]>
AuthorDate: Mon Dec 18 18:53:12 2023 -0800
[HUDI-7176] Improve fg reader test framework (#10324)
Changes:
1. Uses EmptyHoodieRecordPayload instead of HoodieEmptyRecord to represent
a delete record.
2. Implements "projectRecord" function in reader context.
3. Capsulates more logic into the framwork to simplify the user logic.
---
.../reader/HoodieFileGroupReaderTestHarness.java | 123 +++++++++++++++++++++
.../testutils/reader/HoodieTestReaderContext.java | 69 ++++++++++--
2 files changed, 181 insertions(+), 11 deletions(-)
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestHarness.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestHarness.java
new file mode 100644
index 00000000000..793a23bea51
--- /dev/null
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestHarness.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.testutils.reader;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.junit.jupiter.api.AfterAll;
+
+import java.io.IOException;
+import java.util.List;
+
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA;
+import static
org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultHadoopConf;
+
+public class HoodieFileGroupReaderTestHarness extends HoodieCommonTestHarness {
+ protected static final String PARTITION_PATH = "any-partition-path";
+ protected static final String FILE_ID = "any-file-1";
+ // Set the key range for base file and log files.
+ protected static List<HoodieFileSliceTestUtils.KeyRange> keyRanges;
+ // Set the ordering field for each record set.
+ protected static List<Long> timestamps;
+ // Set the record types for each record set.
+ protected static List<DataGenerationPlan.OperationType> operationTypes;
+ // Set the instantTime for each record set.
+ protected static List<String> instantTimes;
+
+ // Environmental variables.
+ protected static Configuration hadoopConf;
+ protected static HoodieTestTable testTable;
+ protected static HoodieReaderContext<IndexedRecord> readerContext;
+ protected static TypedProperties properties;
+
+ static {
+ // Note: Make `timestamp` as ordering field.
+ properties = new TypedProperties();
+ properties.setProperty(
+ "hoodie.datasource.write.precombine.field", "timestamp");
+ hadoopConf = getDefaultHadoopConf();
+ readerContext = new HoodieTestReaderContext(Option.empty());
+ }
+
+ @AfterAll
+ public static void tearDown() throws IOException {
+ FileSystem.closeAll();
+ }
+
+ /**
+ * Assume the test is for MOR tables by default.
+ */
+ @Override
+ protected HoodieTableType getTableType() {
+ return HoodieTableType.MERGE_ON_READ;
+ }
+
+ protected void setUpMockCommits() throws Exception {
+ for (String instantTime : instantTimes) {
+ testTable.addDeltaCommit(instantTime);
+ }
+ }
+
+ protected ClosableIterator<IndexedRecord> getFileGroupIterator(int numFiles)
+ throws IOException, InterruptedException {
+ assert (numFiles >= 1 && numFiles <= keyRanges.size());
+
+ Option<FileSlice> fileSliceOpt =
+ HoodieFileSliceTestUtils.getFileSlice(
+ readerContext.getFs(basePath, hadoopConf),
+ keyRanges.subList(0, numFiles),
+ timestamps.subList(0, numFiles),
+ operationTypes.subList(0, numFiles),
+ instantTimes.subList(0, numFiles),
+ basePath,
+ PARTITION_PATH,
+ FILE_ID
+ );
+
+ HoodieFileGroupReader<IndexedRecord> fileGroupReader =
+ HoodieFileGroupReaderTestUtils.createFileGroupReader(
+ fileSliceOpt,
+ basePath,
+ "1000", // Not used internally.
+ AVRO_SCHEMA,
+ false,
+ 0L,
+ Long.MAX_VALUE,
+ properties,
+ hadoopConf,
+ metaClient.getTableConfig(),
+ readerContext
+ );
+
+ fileGroupReader.initRecordIterators();
+ return fileGroupReader.getClosableIterator();
+ }
+}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
index 5c7c2bd4c71..0546396a452 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
@@ -22,20 +22,20 @@ package org.apache.hudi.common.testutils.reader;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroRecordMerger;
-import org.apache.hudi.common.model.HoodieEmptyRecord;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.storage.HoodieAvroParquetReader;
import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
@@ -43,13 +43,22 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
+import java.util.List;
import java.util.Map;
import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import static
org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
import static
org.apache.hudi.common.testutils.reader.HoodieFileSliceTestUtils.ROW_KEY;
public class HoodieTestReaderContext extends
HoodieReaderContext<IndexedRecord> {
+ private Option<HoodieRecordMerger> customMerger;
+
+ public HoodieTestReaderContext(Option<HoodieRecordMerger> customMerger) {
+ this.customMerger = customMerger;
+ }
+
@Override
public FileSystem getFs(String path, Configuration conf) {
return FSUtils.getFs(path, conf);
@@ -75,6 +84,12 @@ public class HoodieTestReaderContext extends
HoodieReaderContext<IndexedRecord>
@Override
public HoodieRecordMerger getRecordMerger(String mergerStrategy) {
+ // Utilize the custom merger if provided.
+ if (customMerger.isPresent()) {
+ return customMerger.get();
+ }
+
+ // Otherwise.
switch (mergerStrategy) {
case DEFAULT_MERGER_STRATEGY_UUID:
return new HoodieAvroRecordMerger();
@@ -115,18 +130,16 @@ public class HoodieTestReaderContext extends
HoodieReaderContext<IndexedRecord>
}
@Override
- public HoodieRecord<IndexedRecord> constructHoodieRecord(
+ public HoodieRecord constructHoodieRecord(
Option<IndexedRecord> recordOpt,
Map<String, Object> metadataMap
) {
if (!recordOpt.isPresent()) {
- HoodieKey key = new HoodieKey((String)
metadataMap.get(INTERNAL_META_RECORD_KEY),
- (String) metadataMap.get(INTERNAL_META_PARTITION_PATH));
- return new HoodieEmptyRecord<>(
- key,
- HoodieOperation.DELETE,
+ return SpillableMapUtils.generateEmptyPayload(
+ (String) metadataMap.get(INTERNAL_META_RECORD_KEY),
+ (String) metadataMap.get(INTERNAL_META_PARTITION_PATH),
(Comparable<?>) metadataMap.get(INTERNAL_META_ORDERING_FIELD),
- HoodieRecord.HoodieRecordType.AVRO);
+ DefaultHoodieRecordPayload.class.getName());
}
return new HoodieAvroIndexedRecord(recordOpt.get());
}
@@ -148,7 +161,41 @@ public class HoodieTestReaderContext extends
HoodieReaderContext<IndexedRecord>
@Override
public UnaryOperator<IndexedRecord> projectRecord(Schema from, Schema to) {
- return null;
+ Map<String, Integer> fromFields = IntStream.range(0,
from.getFields().size())
+ .boxed()
+ .collect(Collectors.toMap(
+ i -> from.getFields().get(i).name(), i -> i));
+ Map<String, Integer> toFields = IntStream.range(0, to.getFields().size())
+ .boxed()
+ .collect(Collectors.toMap(
+ i -> to.getFields().get(i).name(), i -> i));
+
+ // Check if source schema contains all fields from target schema.
+ List<Schema.Field> missingFields = to.getFields().stream()
+ .filter(f ->
!fromFields.containsKey(f.name())).collect(Collectors.toList());
+ if (!missingFields.isEmpty()) {
+ throw new HoodieException("There are some fields missing in source
schema: "
+ + missingFields);
+ }
+
+ // Build the mapping from source schema to target schema.
+ Map<Integer, Integer> fieldMap = toFields.entrySet().stream()
+ .filter(e -> fromFields.containsKey(e.getKey()))
+ .collect(Collectors.toMap(
+ e -> fromFields.get(e.getKey()), Map.Entry::getValue));
+
+ // Do the transformation.
+ return record -> {
+ IndexedRecord outputRecord = new GenericData.Record(to);
+ for (int i = 0; i < from.getFields().size(); i++) {
+ if (!fieldMap.containsKey(i)) {
+ continue;
+ }
+ int j = fieldMap.get(i);
+ outputRecord.put(j, record.get(i));
+ }
+ return outputRecord;
+ };
}
private Object getFieldValueFromIndexedRecord(