This is an automated email from the ASF dual-hosted git repository. vhs pushed a commit to branch release-1.0.2 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 56bfb8502a5640709ed11b8e38162c19d020bd05 Author: Lokesh Jain <[email protected]> AuthorDate: Wed Apr 2 19:47:13 2025 +0530 [HUDI-9206] Support reading inflight instants with HoodieLogRecordReader (#13010) (cherry picked from commit 1f52b4e8b0e4894a4ff7a3ceace6b0723f461f00) --- .../MultipleSparkJobExecutionStrategy.java | 17 +--- ...HoodieSparkFileGroupReaderBasedMergeHandle.java | 17 +--- .../table/log/BaseHoodieLogRecordReader.java | 24 ++--- .../table/log/HoodieMergedLogRecordReader.java | 23 +++-- .../common/table/read/HoodieFileGroupReader.java | 25 +++-- .../hudi/metadata/HoodieTableMetadataUtil.java | 1 + .../table/read/TestHoodieFileGroupReaderBase.java | 2 + .../reader/HoodieFileGroupReaderTestUtils.java | 27 ++--- .../TestHoodieFileGroupReaderInflightCommit.java | 112 +++++++++++++++++++++ .../reader/HoodieFileGroupReaderTestHarness.java | 9 +- .../HoodieFileGroupReaderBasedRecordReader.java | 2 +- ...odieFileGroupReaderBasedParquetFileFormat.scala | 16 +-- 12 files changed, 181 insertions(+), 94 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index 747c47b987b..0b922571283 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -473,20 +473,11 @@ public abstract class MultipleSparkJobExecutionStrategy<T> Configuration conf = broadcastManager.retrieveStorageConfig().get(); // instantiate FG reader - HoodieFileGroupReader<T> fileGroupReader = new HoodieFileGroupReader<>( - readerContextOpt.get(), + HoodieFileGroupReader<T> fileGroupReader = new HoodieFileGroupReader<>(readerContextOpt.get(), getHoodieTable().getMetaClient().getStorage().newInstance(new StoragePath(basePath), new HadoopStorageConfiguration(conf)), - basePath, - instantTime, - fileSlice, - readerSchema, - readerSchema, - internalSchemaOption, - getHoodieTable().getMetaClient(), - getHoodieTable().getMetaClient().getTableConfig().getProps(), - 0, - Long.MAX_VALUE, - usePosition); + basePath, instantTime, fileSlice, readerSchema, readerSchema, internalSchemaOption, + getHoodieTable().getMetaClient(), getHoodieTable().getMetaClient().getTableConfig().getProps(), + 0, Long.MAX_VALUE, usePosition, false); fileGroupReader.initRecordIterators(); // read records from the FG reader HoodieFileGroupReader.HoodieFileGroupReaderIterator<InternalRow> recordIterator diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieSparkFileGroupReaderBasedMergeHandle.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieSparkFileGroupReaderBasedMergeHandle.java index ecc95e4eb08..547517a0c4a 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieSparkFileGroupReaderBasedMergeHandle.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieSparkFileGroupReaderBasedMergeHandle.java @@ -185,20 +185,11 @@ public class HoodieSparkFileGroupReaderBasedMergeHandle<T, I, K, O> extends Hood hoodieTable.getMetaClient().getTableConfig().getProps().forEach(props::putIfAbsent); config.getProps().forEach(props::putIfAbsent); // Initializes file group reader - try (HoodieFileGroupReader<T> fileGroupReader = new HoodieFileGroupReader<>( - readerContext, + try (HoodieFileGroupReader<T> fileGroupReader = new HoodieFileGroupReader<>(readerContext, storage.newInstance(hoodieTable.getMetaClient().getBasePath(), new HadoopStorageConfiguration(conf)), - hoodieTable.getMetaClient().getBasePath().toString(), - instantTime, - fileSlice, - writeSchemaWithMetaFields, - writeSchemaWithMetaFields, - internalSchemaOption, - hoodieTable.getMetaClient(), - props, - 0, - Long.MAX_VALUE, - usePosition)) { + hoodieTable.getMetaClient().getBasePath().toString(), instantTime, fileSlice, + writeSchemaWithMetaFields, writeSchemaWithMetaFields, internalSchemaOption, + hoodieTable.getMetaClient(), props, 0, Long.MAX_VALUE, usePosition, false)) { fileGroupReader.initRecordIterators(); // Reads the records from the file slice try (HoodieFileGroupReaderIterator<InternalRow> recordIterator diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java index 1586e3f5ccc..71d4cd2842c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java @@ -136,16 +136,14 @@ public abstract class BaseHoodieLogRecordReader<T> { // Use scanV2 method. private final boolean enableOptimizedLogBlocksScan; protected FileGroupRecordBuffer<T> recordBuffer; + // Allows to consider inflight instants while merging log records + protected boolean allowInflightInstants; - protected BaseHoodieLogRecordReader(HoodieReaderContext readerContext, - HoodieStorage storage, - List<String> logFilePaths, + protected BaseHoodieLogRecordReader(HoodieReaderContext readerContext, HoodieStorage storage, List<String> logFilePaths, boolean reverseReader, int bufferSize, Option<InstantRange> instantRange, - boolean withOperationField, boolean forceFullScan, - Option<String> partitionNameOverride, - Option<String> keyFieldOverride, - boolean enableOptimizedLogBlocksScan, - FileGroupRecordBuffer<T> recordBuffer) { + boolean withOperationField, boolean forceFullScan, Option<String> partitionNameOverride, + Option<String> keyFieldOverride, boolean enableOptimizedLogBlocksScan, FileGroupRecordBuffer<T> recordBuffer, + boolean allowInflightInstants) { this.readerContext = readerContext; this.readerSchema = readerContext.getSchemaHandler().getRequiredSchema(); this.latestInstantTime = readerContext.getLatestCommitTime(); @@ -196,6 +194,8 @@ public abstract class BaseHoodieLogRecordReader<T> { this.partitionNameOverrideOpt = partitionNameOverride; this.recordBuffer = recordBuffer; + // When the allowInflightInstants flag is enabled, records written by inflight instants are also read + this.allowInflightInstants = allowInflightInstants; } /** @@ -256,8 +256,8 @@ public abstract class BaseHoodieLogRecordReader<T> { // Skip processing a data or delete block with the instant time greater than the latest instant time used by this log record reader continue; } - if (!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime) - || inflightInstantsTimeline.containsInstant(instantTime)) { + if (!allowInflightInstants + && (inflightInstantsTimeline.containsInstant(instantTime) || !completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime))) { // hit an uncommitted block possibly from a failed write, move to the next one and skip processing this one continue; } @@ -589,8 +589,8 @@ public abstract class BaseHoodieLogRecordReader<T> { continue; } if (logBlock.getBlockType() != COMMAND_BLOCK) { - if (!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime) - || inflightInstantsTimeline.containsInstant(instantTime)) { + if (!allowInflightInstants + && (inflightInstantsTimeline.containsInstant(instantTime)) || !completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime)) { // hit an uncommitted block possibly from a failed write, move to the next one and skip processing this one continue; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java index cffe48b6a24..ab45b8c7fa0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java @@ -63,16 +63,12 @@ public class HoodieMergedLogRecordReader<T> extends BaseHoodieLogRecordReader<T> private long totalTimeTakenToReadAndMergeBlocks; @SuppressWarnings("unchecked") - private HoodieMergedLogRecordReader(HoodieReaderContext<T> readerContext, - HoodieStorage storage, List<String> logFilePaths, boolean reverseReader, - int bufferSize, Option<InstantRange> instantRange, - boolean withOperationField, boolean forceFullScan, - Option<String> partitionName, - Option<String> keyFieldOverride, - boolean enableOptimizedLogBlocksScan, - FileGroupRecordBuffer<T> recordBuffer) { + private HoodieMergedLogRecordReader(HoodieReaderContext<T> readerContext, HoodieStorage storage, List<String> logFilePaths, boolean reverseReader, + int bufferSize, Option<InstantRange> instantRange, boolean withOperationField, boolean forceFullScan, + Option<String> partitionName, Option<String> keyFieldOverride, boolean enableOptimizedLogBlocksScan, + FileGroupRecordBuffer<T> recordBuffer, boolean allowInflightInstants) { super(readerContext, storage, logFilePaths, reverseReader, bufferSize, instantRange, withOperationField, - forceFullScan, partitionName, keyFieldOverride, enableOptimizedLogBlocksScan, recordBuffer); + forceFullScan, partitionName, keyFieldOverride, enableOptimizedLogBlocksScan, recordBuffer, allowInflightInstants); this.scannedPrefixes = new HashSet<>(); if (forceFullScan) { @@ -220,6 +216,7 @@ public class HoodieMergedLogRecordReader<T> extends BaseHoodieLogRecordReader<T> private boolean enableOptimizedLogBlocksScan = false; private FileGroupRecordBuffer<T> recordBuffer; + private boolean allowInflightInstants = false; @Override public Builder<T> withHoodieReaderContext(HoodieReaderContext<T> readerContext) { @@ -292,6 +289,11 @@ public class HoodieMergedLogRecordReader<T> extends BaseHoodieLogRecordReader<T> return this; } + public Builder<T> withAllowInflightInstants(boolean allowInflightInstants) { + this.allowInflightInstants = allowInflightInstants; + return this; + } + @Override public HoodieMergedLogRecordReader<T> build() { ValidationUtils.checkArgument(recordBuffer != null, "Record Buffer is null in Merged Log Record Reader"); @@ -307,7 +309,8 @@ public class HoodieMergedLogRecordReader<T> extends BaseHoodieLogRecordReader<T> withOperationField, forceFullScan, Option.ofNullable(partitionName), Option.ofNullable(keyFieldOverride), - enableOptimizedLogBlocksScan, recordBuffer); + enableOptimizedLogBlocksScan, recordBuffer, + allowInflightInstants); } } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java index 94d63aac81b..0d0673d6c06 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java @@ -82,20 +82,17 @@ public final class HoodieFileGroupReader<T> implements Closeable { private ClosableIterator<T> baseFileIterator; private final Option<UnaryOperator<T>> outputConverter; private final HoodieReadStats readStats; + // Allows to consider inflight instants while merging log records using HoodieMergedLogRecordReader + // The inflight instants need to be considered while updating RLI records. RLI needs to fetch the revived + // and deleted keys from the log files written as part of active data commit. During the RLI update, + // the allowInflightInstants flag would need to be set to true. This would ensure the HoodieMergedLogRecordReader + // considers the log records which are inflight. + private boolean allowInflightInstants; - public HoodieFileGroupReader(HoodieReaderContext<T> readerContext, - HoodieStorage storage, - String tablePath, - String latestCommitTime, - FileSlice fileSlice, - Schema dataSchema, - Schema requestedSchema, - Option<InternalSchema> internalSchemaOpt, - HoodieTableMetaClient hoodieTableMetaClient, - TypedProperties props, - long start, - long length, - boolean shouldUseRecordPosition) { + public HoodieFileGroupReader(HoodieReaderContext<T> readerContext, HoodieStorage storage, String tablePath, + String latestCommitTime, FileSlice fileSlice, Schema dataSchema, Schema requestedSchema, + Option<InternalSchema> internalSchemaOpt, HoodieTableMetaClient hoodieTableMetaClient, TypedProperties props, + long start, long length, boolean shouldUseRecordPosition, boolean allowInflightInstants) { this.readerContext = readerContext; this.storage = storage; this.hoodieBaseFileOption = fileSlice.getBaseFile(); @@ -134,6 +131,7 @@ public final class HoodieFileGroupReader<T> implements Closeable { this.recordBuffer = getRecordBuffer(readerContext, hoodieTableMetaClient, recordMergeMode, props, hoodieBaseFileOption, this.logFiles.isEmpty(), isSkipMerge, shouldUseRecordPosition, readStats); + this.allowInflightInstants = allowInflightInstants; } /** @@ -290,6 +288,7 @@ public final class HoodieFileGroupReader<T> implements Closeable { .withPartition(getRelativePartitionPath( new StoragePath(path), logFiles.get(0).getPath().getParent())) .withRecordBuffer(recordBuffer) + .withAllowInflightInstants(allowInflightInstants) .build()) { readStats.setTotalLogReadTimeMs(logRecordReader.getTotalTimeTakenToReadAndMergeBlocks()); readStats.setTotalUpdatedRecordsCompacted(logRecordReader.getNumMergedRecordsInLog()); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 3b47a81d2cd..887fe687306 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -997,6 +997,7 @@ public class HoodieTableMetadataUtil { Collections.emptyList(), datasetMetaClient.getTableConfig().getRecordMergeStrategyId()); + // CRITICAL: Ensure allowInflightInstants is set to true while replacing the scanner with *LogRecordReader or HoodieFileGroupReader HoodieMergedLogRecordScanner mergedLogRecordScanner = HoodieMergedLogRecordScanner.newBuilder() .withStorage(datasetMetaClient.getStorage()) .withBasePath(datasetMetaClient.getBasePath()) diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java index 3f1720734a2..1796feb609c 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java @@ -329,6 +329,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> { props, 1, fileSlice.getTotalFileSize(), + false, false)); } HoodieFileGroupReader<T> fileGroupReader = new HoodieFileGroupReader<>( @@ -344,6 +345,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> { props, 0, fileSlice.getTotalFileSize(), + false, false); fileGroupReader.initRecordIterators(); while (fileGroupReader.hasNext()) { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestUtils.java index e3faae07fa0..3fd798d3f2f 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestUtils.java @@ -45,8 +45,8 @@ public class HoodieFileGroupReaderTestUtils { TypedProperties properties, HoodieStorage storage, HoodieReaderContext<IndexedRecord> readerContext, - HoodieTableMetaClient metaClient - ) { + HoodieTableMetaClient metaClient, + boolean allowInflightCommits) { assert (fileSliceOpt.isPresent()); return new HoodieFileGroupReaderBuilder() .withReaderContext(readerContext) @@ -55,6 +55,7 @@ public class HoodieFileGroupReaderTestUtils { .withStart(start) .withLength(length) .withProperties(properties) + .withAllowInflightCommits(allowInflightCommits) .build(basePath, latestCommitTime, schema, shouldUseRecordPosition, metaClient); } @@ -65,6 +66,7 @@ public class HoodieFileGroupReaderTestUtils { private TypedProperties props; private long start; private long length; + private boolean allowInflightCommits = false; public HoodieFileGroupReaderBuilder withReaderContext( HoodieReaderContext<IndexedRecord> context) { @@ -97,6 +99,11 @@ public class HoodieFileGroupReaderTestUtils { return this; } + public HoodieFileGroupReaderBuilder withAllowInflightCommits(boolean allowInflightCommits) { + this.allowInflightCommits = allowInflightCommits; + return this; + } + public HoodieFileGroupReader<IndexedRecord> build( String basePath, String latestCommitTime, @@ -108,20 +115,8 @@ public class HoodieFileGroupReaderTestUtils { props.setProperty(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key(), basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME); props.setProperty(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), ExternalSpillableMap.DiskMapType.ROCKS_DB.name()); props.setProperty(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), "false"); - return new HoodieFileGroupReader<>( - readerContext, - storage, - basePath, - latestCommitTime, - fileSlice, - schema, - schema, - Option.empty(), - metaClient, - props, - start, - length, - shouldUseRecordPosition); + return new HoodieFileGroupReader<>(readerContext, storage, basePath, latestCommitTime, fileSlice, + schema, schema, Option.empty(), metaClient, props, start, length, shouldUseRecordPosition, allowInflightCommits); } } } diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderInflightCommit.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderInflightCommit.java new file mode 100644 index 00000000000..f006f852541 --- /dev/null +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderInflightCommit.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.table.read; + +import org.apache.hudi.common.model.HoodieAvroRecordMerger; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.testutils.HoodieTestTable; +import org.apache.hudi.common.testutils.reader.HoodieAvroRecordTestMerger; +import org.apache.hudi.common.testutils.reader.HoodieFileGroupReaderTestHarness; +import org.apache.hudi.common.testutils.reader.HoodieFileSliceTestUtils; +import org.apache.hudi.common.testutils.reader.HoodieRecordTestPayload; +import org.apache.hudi.common.testutils.reader.HoodieTestReaderContext; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ClosableIterator; + +import org.apache.avro.generic.IndexedRecord; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA; +import static org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.INSERT; +import static org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.UPDATE; +import static org.apache.hudi.common.testutils.reader.HoodieFileSliceTestUtils.ROW_KEY; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestHoodieFileGroupReaderInflightCommit extends HoodieFileGroupReaderTestHarness { + + @BeforeAll + public static void setUp() throws IOException { + HoodieAvroRecordMerger merger = new HoodieAvroRecordTestMerger(); + readerContext = new HoodieTestReaderContext( + Option.of(merger), + Option.of(HoodieRecordTestPayload.class.getName())); + + // ------------------------------------------------------------- + // The test logic is as follows: + // 1. Base file contains 10 records, + // whose key values are from 1 to 5, + // whose instant time is "001" and ordering value is 2. + // 2. After adding the first base file, + // we update the records with keys from 1 to 3 + // with ordering value 3. + + // Specify the key column values for each file. + keyRanges = Arrays.asList( + new HoodieFileSliceTestUtils.KeyRange(1, 5), + new HoodieFileSliceTestUtils.KeyRange(1, 3)); + // Specify the value of `timestamp` column for each file. + timestamps = Arrays.asList( + 2L, 3L); + // Specify the operation type for each file. + operationTypes = Arrays.asList( + INSERT, UPDATE); + // Specify the instant time for each file. + instantTimes = Arrays.asList( + "001", "002"); + shouldWritePositions = Arrays.asList(false, false); + } + + @BeforeEach + public void initialize() throws Exception { + setTableName(TestEventTimeMerging.class.getName()); + initPath(tableName); + initMetaClient(); + initTestDataGenerator(new String[]{PARTITION_PATH}); + testTable = HoodieTestTable.of(metaClient); + setUpMockCommits(); + } + + @Test + public void testInflightDataRead() throws IOException, InterruptedException { + // delete the completed instant to convert last update commit to inflight commit + testTable.moveCompleteCommitToInflight(instantTimes.get(1)); + metaClient = HoodieTableMetaClient.reload(metaClient); + // The FileSlice contains a base file and a log file. + ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(2, false, true); + List<String> leftKeysExpected = Arrays.asList("1", "2", "3", "4", "5"); + List<Long> leftTimestampsExpected = Arrays.asList(3L, 3L, 3L, 2L, 2L); + List<String> leftKeysActual = new ArrayList<>(); + List<Long> leftTimestampsActual = new ArrayList<>(); + while (iterator.hasNext()) { + IndexedRecord record = iterator.next(); + leftKeysActual.add(record.get(AVRO_SCHEMA.getField(ROW_KEY).pos()).toString()); + leftTimestampsActual.add((Long) record.get(AVRO_SCHEMA.getField("timestamp").pos())); + } + assertEquals(leftKeysExpected, leftKeysActual); + assertEquals(leftTimestampsExpected, leftTimestampsActual); + } +} diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestHarness.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestHarness.java index a071a7926e2..327fa0e7d00 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestHarness.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestHarness.java @@ -114,6 +114,11 @@ public class HoodieFileGroupReaderTestHarness extends HoodieCommonTestHarness { protected ClosableIterator<IndexedRecord> getFileGroupIterator(int numFiles, boolean shouldReadPositions) throws IOException, InterruptedException { + return getFileGroupIterator(numFiles, shouldReadPositions, false); + } + + protected ClosableIterator<IndexedRecord> getFileGroupIterator(int numFiles, boolean shouldReadPositions, boolean allowInflightCommits) + throws IOException, InterruptedException { assert (numFiles >= 1 && numFiles <= keyRanges.size()); HoodieStorage hoodieStorage = new HoodieHadoopStorage(basePath, storageConf); @@ -143,8 +148,8 @@ public class HoodieFileGroupReaderTestHarness extends HoodieCommonTestHarness { properties, hoodieStorage, readerContext, - metaClient - ); + metaClient, + allowInflightCommits); fileGroupReader.initRecordIterators(); return fileGroupReader.getClosableIterator(); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java index e74b48efd65..9100ffad49d 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java @@ -144,7 +144,7 @@ public class HoodieFileGroupReaderBasedRecordReader implements RecordReader<Null this.fileGroupReader = new HoodieFileGroupReader<>(readerContext, metaClient.getStorage(), tableBasePath, latestCommitTime, getFileSliceFromSplit(fileSplit, getFs(tableBasePath, jobConfCopy), tableBasePath), tableSchema, requestedSchema, Option.empty(), metaClient, props, fileSplit.getStart(), - fileSplit.getLength(), false); + fileSplit.getLength(), false, false); this.fileGroupReader.initRecordIterators(); // it expects the partition columns to be at the end Schema outputSchema = HoodieAvroUtils.generateProjectionSchema(tableSchema, diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala index cf8c211d373..5fcdc3b06a8 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala @@ -167,20 +167,8 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tablePath: String, .builder().setConf(storageConf).setBasePath(tablePath).build val props = metaClient.getTableConfig.getProps options.foreach(kv => props.setProperty(kv._1, kv._2)) - val reader = new HoodieFileGroupReader[InternalRow]( - readerContext, - new HoodieHadoopStorage(metaClient.getBasePath, storageConf), - tablePath, - queryTimestamp, - fileSlice, - dataAvroSchema, - requestedAvroSchema, - internalSchemaOpt, - metaClient, - props, - file.start, - file.length, - shouldUseRecordPosition) + val reader = new HoodieFileGroupReader[InternalRow](readerContext, new HoodieHadoopStorage(metaClient.getBasePath, storageConf), tablePath, queryTimestamp, + fileSlice, dataAvroSchema, requestedAvroSchema, internalSchemaOpt, metaClient, props, file.start, file.length, shouldUseRecordPosition, false) reader.initRecordIterators() // Append partition values to rows and project to output schema appendPartitionAndProject(
