This is an automated email from the ASF dual-hosted git repository.

vhs pushed a commit to branch release-1.0.2
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 56bfb8502a5640709ed11b8e38162c19d020bd05
Author: Lokesh Jain <[email protected]>
AuthorDate: Wed Apr 2 19:47:13 2025 +0530

    [HUDI-9206] Support reading inflight instants with HoodieLogRecordReader 
(#13010)
    
    (cherry picked from commit 1f52b4e8b0e4894a4ff7a3ceace6b0723f461f00)
---
 .../MultipleSparkJobExecutionStrategy.java         |  17 +---
 ...HoodieSparkFileGroupReaderBasedMergeHandle.java |  17 +---
 .../table/log/BaseHoodieLogRecordReader.java       |  24 ++---
 .../table/log/HoodieMergedLogRecordReader.java     |  23 +++--
 .../common/table/read/HoodieFileGroupReader.java   |  25 +++--
 .../hudi/metadata/HoodieTableMetadataUtil.java     |   1 +
 .../table/read/TestHoodieFileGroupReaderBase.java  |   2 +
 .../reader/HoodieFileGroupReaderTestUtils.java     |  27 ++---
 .../TestHoodieFileGroupReaderInflightCommit.java   | 112 +++++++++++++++++++++
 .../reader/HoodieFileGroupReaderTestHarness.java   |   9 +-
 .../HoodieFileGroupReaderBasedRecordReader.java    |   2 +-
 ...odieFileGroupReaderBasedParquetFileFormat.scala |  16 +--
 12 files changed, 181 insertions(+), 94 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 747c47b987b..0b922571283 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -473,20 +473,11 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
         Configuration conf = broadcastManager.retrieveStorageConfig().get();
 
         // instantiate FG reader
-        HoodieFileGroupReader<T> fileGroupReader = new HoodieFileGroupReader<>(
-            readerContextOpt.get(),
+        HoodieFileGroupReader<T> fileGroupReader = new 
HoodieFileGroupReader<>(readerContextOpt.get(),
             getHoodieTable().getMetaClient().getStorage().newInstance(new 
StoragePath(basePath), new HadoopStorageConfiguration(conf)),
-            basePath,
-            instantTime,
-            fileSlice,
-            readerSchema,
-            readerSchema,
-            internalSchemaOption,
-            getHoodieTable().getMetaClient(),
-            getHoodieTable().getMetaClient().getTableConfig().getProps(),
-            0,
-            Long.MAX_VALUE,
-            usePosition);
+            basePath, instantTime, fileSlice, readerSchema, readerSchema, 
internalSchemaOption,
+            getHoodieTable().getMetaClient(), 
getHoodieTable().getMetaClient().getTableConfig().getProps(),
+            0, Long.MAX_VALUE, usePosition, false);
         fileGroupReader.initRecordIterators();
         // read records from the FG reader
         HoodieFileGroupReader.HoodieFileGroupReaderIterator<InternalRow> 
recordIterator
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieSparkFileGroupReaderBasedMergeHandle.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieSparkFileGroupReaderBasedMergeHandle.java
index ecc95e4eb08..547517a0c4a 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieSparkFileGroupReaderBasedMergeHandle.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieSparkFileGroupReaderBasedMergeHandle.java
@@ -185,20 +185,11 @@ public class 
HoodieSparkFileGroupReaderBasedMergeHandle<T, I, K, O> extends Hood
     
hoodieTable.getMetaClient().getTableConfig().getProps().forEach(props::putIfAbsent);
     config.getProps().forEach(props::putIfAbsent);
     // Initializes file group reader
-    try (HoodieFileGroupReader<T> fileGroupReader = new 
HoodieFileGroupReader<>(
-        readerContext,
+    try (HoodieFileGroupReader<T> fileGroupReader = new 
HoodieFileGroupReader<>(readerContext,
         storage.newInstance(hoodieTable.getMetaClient().getBasePath(), new 
HadoopStorageConfiguration(conf)),
-        hoodieTable.getMetaClient().getBasePath().toString(),
-        instantTime,
-        fileSlice,
-        writeSchemaWithMetaFields,
-        writeSchemaWithMetaFields,
-        internalSchemaOption,
-        hoodieTable.getMetaClient(),
-        props,
-        0,
-        Long.MAX_VALUE,
-        usePosition)) {
+        hoodieTable.getMetaClient().getBasePath().toString(), instantTime, 
fileSlice,
+        writeSchemaWithMetaFields, writeSchemaWithMetaFields, 
internalSchemaOption,
+        hoodieTable.getMetaClient(), props, 0, Long.MAX_VALUE, usePosition, 
false)) {
       fileGroupReader.initRecordIterators();
       // Reads the records from the file slice
       try (HoodieFileGroupReaderIterator<InternalRow> recordIterator
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
index 1586e3f5ccc..71d4cd2842c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
@@ -136,16 +136,14 @@ public abstract class BaseHoodieLogRecordReader<T> {
   // Use scanV2 method.
   private final boolean enableOptimizedLogBlocksScan;
   protected FileGroupRecordBuffer<T> recordBuffer;
+  // Allows to consider inflight instants while merging log records
+  protected boolean allowInflightInstants;
 
-  protected BaseHoodieLogRecordReader(HoodieReaderContext readerContext,
-                                      HoodieStorage storage,
-                                      List<String> logFilePaths,
+  protected BaseHoodieLogRecordReader(HoodieReaderContext readerContext, 
HoodieStorage storage, List<String> logFilePaths,
                                       boolean reverseReader, int bufferSize, 
Option<InstantRange> instantRange,
-                                      boolean withOperationField, boolean 
forceFullScan,
-                                      Option<String> partitionNameOverride,
-                                      Option<String> keyFieldOverride,
-                                      boolean enableOptimizedLogBlocksScan,
-                                      FileGroupRecordBuffer<T> recordBuffer) {
+                                      boolean withOperationField, boolean 
forceFullScan, Option<String> partitionNameOverride,
+                                      Option<String> keyFieldOverride, boolean 
enableOptimizedLogBlocksScan, FileGroupRecordBuffer<T> recordBuffer,
+                                      boolean allowInflightInstants) {
     this.readerContext = readerContext;
     this.readerSchema = readerContext.getSchemaHandler().getRequiredSchema();
     this.latestInstantTime = readerContext.getLatestCommitTime();
@@ -196,6 +194,8 @@ public abstract class BaseHoodieLogRecordReader<T> {
 
     this.partitionNameOverrideOpt = partitionNameOverride;
     this.recordBuffer = recordBuffer;
+    // When the allowInflightInstants flag is enabled, records written by 
inflight instants are also read
+    this.allowInflightInstants = allowInflightInstants;
   }
 
   /**
@@ -256,8 +256,8 @@ public abstract class BaseHoodieLogRecordReader<T> {
             // Skip processing a data or delete block with the instant time 
greater than the latest instant time used by this log record reader
             continue;
           }
-          if 
(!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime)
-              || inflightInstantsTimeline.containsInstant(instantTime)) {
+          if (!allowInflightInstants
+              && (inflightInstantsTimeline.containsInstant(instantTime) || 
!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime))) {
             // hit an uncommitted block possibly from a failed write, move to 
the next one and skip processing this one
             continue;
           }
@@ -589,8 +589,8 @@ public abstract class BaseHoodieLogRecordReader<T> {
           continue;
         }
         if (logBlock.getBlockType() != COMMAND_BLOCK) {
-          if 
(!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime)
-              || inflightInstantsTimeline.containsInstant(instantTime)) {
+          if (!allowInflightInstants
+              && (inflightInstantsTimeline.containsInstant(instantTime)) || 
!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime)) {
             // hit an uncommitted block possibly from a failed write, move to 
the next one and skip processing this one
             continue;
           }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
index cffe48b6a24..ab45b8c7fa0 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
@@ -63,16 +63,12 @@ public class HoodieMergedLogRecordReader<T> extends 
BaseHoodieLogRecordReader<T>
   private long totalTimeTakenToReadAndMergeBlocks;
 
   @SuppressWarnings("unchecked")
-  private HoodieMergedLogRecordReader(HoodieReaderContext<T> readerContext,
-                                      HoodieStorage storage, List<String> 
logFilePaths, boolean reverseReader,
-                                      int bufferSize, Option<InstantRange> 
instantRange,
-                                      boolean withOperationField, boolean 
forceFullScan,
-                                      Option<String> partitionName,
-                                      Option<String> keyFieldOverride,
-                                      boolean enableOptimizedLogBlocksScan,
-                                      FileGroupRecordBuffer<T> recordBuffer) {
+  private HoodieMergedLogRecordReader(HoodieReaderContext<T> readerContext, 
HoodieStorage storage, List<String> logFilePaths, boolean reverseReader,
+                                      int bufferSize, Option<InstantRange> 
instantRange, boolean withOperationField, boolean forceFullScan,
+                                      Option<String> partitionName, 
Option<String> keyFieldOverride, boolean enableOptimizedLogBlocksScan,
+                                      FileGroupRecordBuffer<T> recordBuffer, 
boolean allowInflightInstants) {
     super(readerContext, storage, logFilePaths, reverseReader, bufferSize, 
instantRange, withOperationField,
-        forceFullScan, partitionName, keyFieldOverride, 
enableOptimizedLogBlocksScan, recordBuffer);
+        forceFullScan, partitionName, keyFieldOverride, 
enableOptimizedLogBlocksScan, recordBuffer, allowInflightInstants);
     this.scannedPrefixes = new HashSet<>();
 
     if (forceFullScan) {
@@ -220,6 +216,7 @@ public class HoodieMergedLogRecordReader<T> extends 
BaseHoodieLogRecordReader<T>
     private boolean enableOptimizedLogBlocksScan = false;
 
     private FileGroupRecordBuffer<T> recordBuffer;
+    private boolean allowInflightInstants = false;
 
     @Override
     public Builder<T> withHoodieReaderContext(HoodieReaderContext<T> 
readerContext) {
@@ -292,6 +289,11 @@ public class HoodieMergedLogRecordReader<T> extends 
BaseHoodieLogRecordReader<T>
       return this;
     }
 
+    public Builder<T> withAllowInflightInstants(boolean allowInflightInstants) 
{
+      this.allowInflightInstants = allowInflightInstants;
+      return this;
+    }
+
     @Override
     public HoodieMergedLogRecordReader<T> build() {
       ValidationUtils.checkArgument(recordBuffer != null, "Record Buffer is 
null in Merged Log Record Reader");
@@ -307,7 +309,8 @@ public class HoodieMergedLogRecordReader<T> extends 
BaseHoodieLogRecordReader<T>
           withOperationField, forceFullScan,
           Option.ofNullable(partitionName),
           Option.ofNullable(keyFieldOverride),
-          enableOptimizedLogBlocksScan, recordBuffer);
+          enableOptimizedLogBlocksScan, recordBuffer,
+          allowInflightInstants);
     }
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index 94d63aac81b..0d0673d6c06 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -82,20 +82,17 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
   private ClosableIterator<T> baseFileIterator;
   private final Option<UnaryOperator<T>> outputConverter;
   private final HoodieReadStats readStats;
+  // Allows to consider inflight instants while merging log records using 
HoodieMergedLogRecordReader
+  // The inflight instants need to be considered while updating RLI records. 
RLI needs to fetch the revived
+  // and deleted keys from the log files written as part of active data 
commit. During the RLI update,
+  // the allowInflightInstants flag would need to be set to true. This would 
ensure the HoodieMergedLogRecordReader
+  // considers the log records which are inflight.
+  private boolean allowInflightInstants;
 
-  public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
-                               HoodieStorage storage,
-                               String tablePath,
-                               String latestCommitTime,
-                               FileSlice fileSlice,
-                               Schema dataSchema,
-                               Schema requestedSchema,
-                               Option<InternalSchema> internalSchemaOpt,
-                               HoodieTableMetaClient hoodieTableMetaClient,
-                               TypedProperties props,
-                               long start,
-                               long length,
-                               boolean shouldUseRecordPosition) {
+  public HoodieFileGroupReader(HoodieReaderContext<T> readerContext, 
HoodieStorage storage, String tablePath,
+                               String latestCommitTime, FileSlice fileSlice, 
Schema dataSchema, Schema requestedSchema,
+                               Option<InternalSchema> internalSchemaOpt, 
HoodieTableMetaClient hoodieTableMetaClient, TypedProperties props,
+                               long start, long length, boolean 
shouldUseRecordPosition, boolean allowInflightInstants) {
     this.readerContext = readerContext;
     this.storage = storage;
     this.hoodieBaseFileOption = fileSlice.getBaseFile();
@@ -134,6 +131,7 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
     this.recordBuffer = getRecordBuffer(readerContext, hoodieTableMetaClient,
         recordMergeMode, props, hoodieBaseFileOption, this.logFiles.isEmpty(),
         isSkipMerge, shouldUseRecordPosition, readStats);
+    this.allowInflightInstants = allowInflightInstants;
   }
 
   /**
@@ -290,6 +288,7 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
         .withPartition(getRelativePartitionPath(
             new StoragePath(path), logFiles.get(0).getPath().getParent()))
         .withRecordBuffer(recordBuffer)
+        .withAllowInflightInstants(allowInflightInstants)
         .build()) {
       
readStats.setTotalLogReadTimeMs(logRecordReader.getTotalTimeTakenToReadAndMergeBlocks());
       
readStats.setTotalUpdatedRecordsCompacted(logRecordReader.getNumMergedRecordsInLog());
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 3b47a81d2cd..887fe687306 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -997,6 +997,7 @@ public class HoodieTableMetadataUtil {
           Collections.emptyList(),
           datasetMetaClient.getTableConfig().getRecordMergeStrategyId());
 
+      // CRITICAL: Ensure allowInflightInstants is set to true while replacing 
the scanner with *LogRecordReader or HoodieFileGroupReader
       HoodieMergedLogRecordScanner mergedLogRecordScanner = 
HoodieMergedLogRecordScanner.newBuilder()
           .withStorage(datasetMetaClient.getStorage())
           .withBasePath(datasetMetaClient.getBasePath())
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index 3f1720734a2..1796feb609c 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -329,6 +329,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
           props,
           1,
           fileSlice.getTotalFileSize(),
+          false,
           false));
     }
     HoodieFileGroupReader<T> fileGroupReader = new HoodieFileGroupReader<>(
@@ -344,6 +345,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
         props,
         0,
         fileSlice.getTotalFileSize(),
+        false,
         false);
     fileGroupReader.initRecordIterators();
     while (fileGroupReader.hasNext()) {
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestUtils.java
index e3faae07fa0..3fd798d3f2f 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestUtils.java
@@ -45,8 +45,8 @@ public class HoodieFileGroupReaderTestUtils {
       TypedProperties properties,
       HoodieStorage storage,
       HoodieReaderContext<IndexedRecord> readerContext,
-      HoodieTableMetaClient metaClient
-  ) {
+      HoodieTableMetaClient metaClient,
+      boolean allowInflightCommits) {
     assert (fileSliceOpt.isPresent());
     return new HoodieFileGroupReaderBuilder()
         .withReaderContext(readerContext)
@@ -55,6 +55,7 @@ public class HoodieFileGroupReaderTestUtils {
         .withStart(start)
         .withLength(length)
         .withProperties(properties)
+        .withAllowInflightCommits(allowInflightCommits)
         .build(basePath, latestCommitTime, schema, shouldUseRecordPosition, 
metaClient);
   }
 
@@ -65,6 +66,7 @@ public class HoodieFileGroupReaderTestUtils {
     private TypedProperties props;
     private long start;
     private long length;
+    private boolean allowInflightCommits = false;
 
     public HoodieFileGroupReaderBuilder withReaderContext(
         HoodieReaderContext<IndexedRecord> context) {
@@ -97,6 +99,11 @@ public class HoodieFileGroupReaderTestUtils {
       return this;
     }
 
+    public HoodieFileGroupReaderBuilder withAllowInflightCommits(boolean 
allowInflightCommits) {
+      this.allowInflightCommits = allowInflightCommits;
+      return this;
+    }
+
     public HoodieFileGroupReader<IndexedRecord> build(
         String basePath,
         String latestCommitTime,
@@ -108,20 +115,8 @@ public class HoodieFileGroupReaderTestUtils {
       props.setProperty(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key(),  
basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME);
       props.setProperty(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), 
ExternalSpillableMap.DiskMapType.ROCKS_DB.name());
       
props.setProperty(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
 "false");
-      return new HoodieFileGroupReader<>(
-          readerContext,
-          storage,
-          basePath,
-          latestCommitTime,
-          fileSlice,
-          schema,
-          schema,
-          Option.empty(),
-          metaClient,
-          props,
-          start,
-          length,
-          shouldUseRecordPosition);
+      return new HoodieFileGroupReader<>(readerContext, storage, basePath, 
latestCommitTime, fileSlice,
+          schema, schema, Option.empty(), metaClient, props, start, length, 
shouldUseRecordPosition, allowInflightCommits);
     }
   }
 }
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderInflightCommit.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderInflightCommit.java
new file mode 100644
index 00000000000..f006f852541
--- /dev/null
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderInflightCommit.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.model.HoodieAvroRecordMerger;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.testutils.reader.HoodieAvroRecordTestMerger;
+import 
org.apache.hudi.common.testutils.reader.HoodieFileGroupReaderTestHarness;
+import org.apache.hudi.common.testutils.reader.HoodieFileSliceTestUtils;
+import org.apache.hudi.common.testutils.reader.HoodieRecordTestPayload;
+import org.apache.hudi.common.testutils.reader.HoodieTestReaderContext;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA;
+import static 
org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.INSERT;
+import static 
org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.UPDATE;
+import static 
org.apache.hudi.common.testutils.reader.HoodieFileSliceTestUtils.ROW_KEY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestHoodieFileGroupReaderInflightCommit extends 
HoodieFileGroupReaderTestHarness {
+
+  @BeforeAll
+  public static void setUp() throws IOException {
+    HoodieAvroRecordMerger merger = new HoodieAvroRecordTestMerger();
+    readerContext = new HoodieTestReaderContext(
+        Option.of(merger),
+        Option.of(HoodieRecordTestPayload.class.getName()));
+
+    // -------------------------------------------------------------
+    // The test logic is as follows:
+    // 1. Base file contains 10 records,
+    //    whose key values are from 1 to 5,
+    //    whose instant time is "001" and ordering value is 2.
+    // 2. After adding the first base file,
+    //    we update the records with keys from 1 to 3
+    //    with ordering value 3.
+
+    // Specify the key column values for each file.
+    keyRanges = Arrays.asList(
+        new HoodieFileSliceTestUtils.KeyRange(1, 5),
+        new HoodieFileSliceTestUtils.KeyRange(1, 3));
+    // Specify the value of `timestamp` column for each file.
+    timestamps = Arrays.asList(
+        2L, 3L);
+    // Specify the operation type for each file.
+    operationTypes = Arrays.asList(
+        INSERT, UPDATE);
+    // Specify the instant time for each file.
+    instantTimes = Arrays.asList(
+        "001", "002");
+    shouldWritePositions = Arrays.asList(false, false);
+  }
+
+  @BeforeEach
+  public void initialize() throws Exception {
+    setTableName(TestEventTimeMerging.class.getName());
+    initPath(tableName);
+    initMetaClient();
+    initTestDataGenerator(new String[]{PARTITION_PATH});
+    testTable = HoodieTestTable.of(metaClient);
+    setUpMockCommits();
+  }
+
+  @Test
+  public void testInflightDataRead() throws IOException, InterruptedException {
+    // delete the completed instant to convert last update commit to inflight 
commit
+    testTable.moveCompleteCommitToInflight(instantTimes.get(1));
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    // The FileSlice contains a base file and a log file.
+    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(2, false, 
true);
+    List<String> leftKeysExpected = Arrays.asList("1", "2", "3", "4", "5");
+    List<Long> leftTimestampsExpected = Arrays.asList(3L, 3L, 3L, 2L, 2L);
+    List<String> leftKeysActual = new ArrayList<>();
+    List<Long> leftTimestampsActual = new ArrayList<>();
+    while (iterator.hasNext()) {
+      IndexedRecord record = iterator.next();
+      
leftKeysActual.add(record.get(AVRO_SCHEMA.getField(ROW_KEY).pos()).toString());
+      leftTimestampsActual.add((Long) 
record.get(AVRO_SCHEMA.getField("timestamp").pos()));
+    }
+    assertEquals(leftKeysExpected, leftKeysActual);
+    assertEquals(leftTimestampsExpected, leftTimestampsActual);
+  }
+}
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestHarness.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestHarness.java
index a071a7926e2..327fa0e7d00 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestHarness.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestHarness.java
@@ -114,6 +114,11 @@ public class HoodieFileGroupReaderTestHarness extends 
HoodieCommonTestHarness {
 
   protected ClosableIterator<IndexedRecord> getFileGroupIterator(int numFiles, 
boolean shouldReadPositions)
       throws IOException, InterruptedException {
+    return getFileGroupIterator(numFiles, shouldReadPositions, false);
+  }
+
+  protected ClosableIterator<IndexedRecord> getFileGroupIterator(int numFiles, 
boolean shouldReadPositions, boolean allowInflightCommits)
+      throws IOException, InterruptedException {
     assert (numFiles >= 1 && numFiles <= keyRanges.size());
 
     HoodieStorage hoodieStorage = new HoodieHadoopStorage(basePath, 
storageConf);
@@ -143,8 +148,8 @@ public class HoodieFileGroupReaderTestHarness extends 
HoodieCommonTestHarness {
             properties,
             hoodieStorage,
             readerContext,
-            metaClient
-        );
+            metaClient,
+            allowInflightCommits);
 
     fileGroupReader.initRecordIterators();
     return fileGroupReader.getClosableIterator();
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
index e74b48efd65..9100ffad49d 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
@@ -144,7 +144,7 @@ public class HoodieFileGroupReaderBasedRecordReader 
implements RecordReader<Null
     this.fileGroupReader = new HoodieFileGroupReader<>(readerContext, 
metaClient.getStorage(), tableBasePath,
         latestCommitTime, getFileSliceFromSplit(fileSplit, 
getFs(tableBasePath, jobConfCopy), tableBasePath),
         tableSchema, requestedSchema, Option.empty(), metaClient, props, 
fileSplit.getStart(),
-        fileSplit.getLength(), false);
+        fileSplit.getLength(), false, false);
     this.fileGroupReader.initRecordIterators();
     // it expects the partition columns to be at the end
     Schema outputSchema = HoodieAvroUtils.generateProjectionSchema(tableSchema,
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index cf8c211d373..5fcdc3b06a8 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -167,20 +167,8 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tablePath: String,
                 .builder().setConf(storageConf).setBasePath(tablePath).build
               val props = metaClient.getTableConfig.getProps
               options.foreach(kv => props.setProperty(kv._1, kv._2))
-              val reader = new HoodieFileGroupReader[InternalRow](
-                readerContext,
-                new HoodieHadoopStorage(metaClient.getBasePath, storageConf),
-                tablePath,
-                queryTimestamp,
-                fileSlice,
-                dataAvroSchema,
-                requestedAvroSchema,
-                internalSchemaOpt,
-                metaClient,
-                props,
-                file.start,
-                file.length,
-                shouldUseRecordPosition)
+              val reader = new 
HoodieFileGroupReader[InternalRow](readerContext, new 
HoodieHadoopStorage(metaClient.getBasePath, storageConf), tablePath, 
queryTimestamp,
+                fileSlice, dataAvroSchema, requestedAvroSchema, 
internalSchemaOpt, metaClient, props, file.start, file.length, 
shouldUseRecordPosition, false)
               reader.initRecordIterators()
               // Append partition values to rows and project to output schema
               appendPartitionAndProject(

Reply via email to