This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f693bed0170 [HUDI-8673] Simple and Global-Simple Index fetch location 
parallelism improvements (#12445)
f693bed0170 is described below

commit f693bed017090b37a4afe9a5a9068b894e643820
Author: Tim Brown <[email protected]>
AuthorDate: Mon Dec 9 03:12:59 2024 -0600

    [HUDI-8673] Simple and Global-Simple Index fetch location parallelism 
improvements (#12445)
---
 .../hudi/index/simple/HoodieGlobalSimpleIndex.java |  13 +-
 .../hudi/index/simple/HoodieSimpleIndex.java       |  27 ++--
 .../hudi/io/HoodieKeyLocationFetchHandle.java      |  20 ++-
 .../hudi/index/simple/TestGlobalSimpleIndex.java   | 156 ++++++++++++++++++++
 .../apache/hudi/index/simple/TestSimpleIndex.java  | 158 +++++++++++++++++++++
 .../hudi/testutils/HoodieWriteableTestTable.java   |  14 +-
 .../hudi/io/TestHoodieKeyLocationFetchHandle.java  |  11 +-
 .../apache/hudi/common/util/FileFormatUtils.java   |  11 +-
 .../org/apache/hudi/common/util/HFileUtils.java    |   4 +-
 .../java/org/apache/hudi/common/util/OrcUtils.java |  19 +--
 .../org/apache/hudi/common/util/ParquetUtils.java  |  17 +--
 .../apache/hudi/common/util/TestParquetUtils.java  |  19 ++-
 12 files changed, 389 insertions(+), 80 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java
index 3c76ff17935..4abc3cb1d87 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java
@@ -69,11 +69,8 @@ public class HoodieGlobalSimpleIndex extends 
HoodieSimpleIndex {
       HoodieData<HoodieRecord<R>> inputRecords, HoodieEngineContext context,
       HoodieTable hoodieTable) {
     List<Pair<String, HoodieBaseFile>> latestBaseFiles = 
getAllBaseFilesInTable(context, hoodieTable);
-    int configuredSimpleIndexParallelism = 
config.getGlobalSimpleIndexParallelism();
-    int fetchParallelism =
-        configuredSimpleIndexParallelism > 0 ? 
configuredSimpleIndexParallelism : inputRecords.deduceNumPartitions();
     HoodiePairData<String, HoodieRecordGlobalLocation> allKeysAndLocations =
-        fetchRecordGlobalLocations(context, hoodieTable, fetchParallelism, 
latestBaseFiles);
+        fetchRecordGlobalLocations(context, hoodieTable, latestBaseFiles);
     boolean mayContainDuplicateLookup = 
hoodieTable.getMetaClient().getTableType() == MERGE_ON_READ;
     boolean shouldUpdatePartitionPath = 
config.getGlobalSimpleIndexUpdatePartitionPath() && hoodieTable.isPartitioned();
     return tagGlobalLocationBackToRecords(inputRecords, allKeysAndLocations,
@@ -81,13 +78,13 @@ public class HoodieGlobalSimpleIndex extends 
HoodieSimpleIndex {
   }
 
   private HoodiePairData<String, HoodieRecordGlobalLocation> 
fetchRecordGlobalLocations(
-      HoodieEngineContext context, HoodieTable hoodieTable, int parallelism,
+      HoodieEngineContext context, HoodieTable hoodieTable,
       List<Pair<String, HoodieBaseFile>> baseFiles) {
-    int fetchParallelism = Math.max(1, Math.min(baseFiles.size(), 
parallelism));
+    int parallelism = getParallelism(config.getGlobalSimpleIndexParallelism(), 
baseFiles.size());
 
-    return context.parallelize(baseFiles, fetchParallelism)
+    return context.parallelize(baseFiles, parallelism)
         .flatMap(partitionPathBaseFile -> new 
HoodieKeyLocationFetchHandle(config, hoodieTable, partitionPathBaseFile, 
keyGeneratorOpt)
-            .globalLocations().iterator())
+            .globalLocations())
         .mapToPair(e -> (Pair<String, HoodieRecordGlobalLocation>) e);
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieSimpleIndex.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieSimpleIndex.java
index 99ffc1b47e6..db6d02b5624 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieSimpleIndex.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieSimpleIndex.java
@@ -107,16 +107,10 @@ public class HoodieSimpleIndex
           
.getString(HoodieIndexConfig.SIMPLE_INDEX_INPUT_STORAGE_LEVEL_VALUE));
     }
 
-    int deduceNumParallelism = inputRecords.deduceNumPartitions();
-    int configuredSimpleIndexParallelism = config.getSimpleIndexParallelism();
-    // NOTE: Target parallelism could be overridden by the config
-    int fetchParallelism =
-        configuredSimpleIndexParallelism > 0 ? 
configuredSimpleIndexParallelism : deduceNumParallelism;
     HoodiePairData<HoodieKey, HoodieRecord<R>> keyedInputRecords =
         inputRecords.mapToPair(record -> new ImmutablePair<>(record.getKey(), 
record));
     HoodiePairData<HoodieKey, HoodieRecordLocation> existingLocationsOnTable =
-        fetchRecordLocationsForAffectedPartitions(keyedInputRecords.keys(), 
context, hoodieTable,
-            fetchParallelism);
+        fetchRecordLocationsForAffectedPartitions(keyedInputRecords.keys(), 
context, hoodieTable);
 
     HoodieData<HoodieRecord<R>> taggedRecords =
         keyedInputRecords.leftOuterJoin(existingLocationsOnTable).map(entry -> 
{
@@ -137,27 +131,30 @@ public class HoodieSimpleIndex
    * @param hoodieKeys  {@link HoodieData} of {@link HoodieKey}s for which 
locations are fetched
    * @param context     instance of {@link HoodieEngineContext} to use
    * @param hoodieTable instance of {@link HoodieTable} of interest
-   * @param parallelism parallelism to use
    * @return {@link HoodiePairData} of {@link HoodieKey} and {@link 
HoodieRecordLocation}
    */
   protected HoodiePairData<HoodieKey, HoodieRecordLocation> 
fetchRecordLocationsForAffectedPartitions(
-      HoodieData<HoodieKey> hoodieKeys, HoodieEngineContext context, 
HoodieTable hoodieTable,
-      int parallelism) {
+      HoodieData<HoodieKey> hoodieKeys, HoodieEngineContext context, 
HoodieTable hoodieTable) {
     List<String> affectedPartitionPathList =
         
hoodieKeys.map(HoodieKey::getPartitionPath).distinct(hoodieKeys.deduceNumPartitions()).collectAsList();
     List<Pair<String, HoodieBaseFile>> latestBaseFiles =
         getLatestBaseFilesForAllPartitions(affectedPartitionPathList, context, 
hoodieTable);
-    return fetchRecordLocations(context, hoodieTable, parallelism, 
latestBaseFiles);
+    return fetchRecordLocations(context, hoodieTable, latestBaseFiles);
   }
 
   protected HoodiePairData<HoodieKey, HoodieRecordLocation> 
fetchRecordLocations(
-      HoodieEngineContext context, HoodieTable hoodieTable, int parallelism,
+      HoodieEngineContext context, HoodieTable hoodieTable,
       List<Pair<String, HoodieBaseFile>> baseFiles) {
-    int fetchParallelism = Math.max(1, Math.min(baseFiles.size(), 
parallelism));
+    int parallelism = getParallelism(config.getSimpleIndexParallelism(), 
baseFiles.size());
 
-    return context.parallelize(baseFiles, fetchParallelism)
+    return context.parallelize(baseFiles, parallelism)
         .flatMap(partitionPathBaseFile -> new 
HoodieKeyLocationFetchHandle(config, hoodieTable, partitionPathBaseFile, 
keyGeneratorOpt)
-            .locations().iterator())
+            .locations())
         .mapToPair(e -> (Pair<HoodieKey, HoodieRecordLocation>) e);
   }
+
+  protected int getParallelism(int configuredParallelism, int 
numberOfBaseFiles) {
+    int parallelism = configuredParallelism > 0 && configuredParallelism < 
numberOfBaseFiles ? configuredParallelism : numberOfBaseFiles;
+    return Math.max(1, parallelism);
+  }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
index a6245fa8950..cf5f61677b8 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
@@ -24,15 +24,14 @@ import 
org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.util.FileFormatUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.table.HoodieTable;
 
-import java.util.List;
-import java.util.stream.Stream;
-
 /**
  * {@link HoodieRecordLocation} fetch handle for all records from {@link 
HoodieBaseFile} of interest.
  *
@@ -50,7 +49,7 @@ public class HoodieKeyLocationFetchHandle<T, I, K, O> extends 
HoodieReadHandle<T
     this.keyGeneratorOpt = keyGeneratorOpt;
   }
 
-  private List<Pair<HoodieKey, Long>> 
fetchRecordKeysWithPositions(HoodieBaseFile baseFile) {
+  private ClosableIterator<Pair<HoodieKey, Long>> 
fetchRecordKeysWithPositions(HoodieBaseFile baseFile) {
     FileFormatUtils fileFormatUtils = 
HoodieIOFactory.getIOFactory(hoodieTable.getStorage())
         .getFileFormatUtils(baseFile.getStoragePath());
     if (keyGeneratorOpt.isPresent()) {
@@ -60,19 +59,18 @@ public class HoodieKeyLocationFetchHandle<T, I, K, O> 
extends HoodieReadHandle<T
     }
   }
 
-  public Stream<Pair<HoodieKey, HoodieRecordLocation>> locations() {
+  public ClosableIterator<Pair<HoodieKey, HoodieRecordLocation>> locations() {
     HoodieBaseFile baseFile = partitionPathBaseFilePair.getRight();
     String commitTime = baseFile.getCommitTime();
     String fileId = baseFile.getFileId();
-    return fetchRecordKeysWithPositions(baseFile).stream()
-        .map(entry -> Pair.of(entry.getLeft(),
-            new HoodieRecordLocation(commitTime, fileId, entry.getRight())));
+    return new 
CloseableMappingIterator<>(fetchRecordKeysWithPositions(baseFile),
+        entry -> Pair.of(entry.getLeft(), new HoodieRecordLocation(commitTime, 
fileId, entry.getRight())));
   }
 
-  public Stream<Pair<String, HoodieRecordGlobalLocation>> globalLocations() {
+  public ClosableIterator<Pair<String, HoodieRecordGlobalLocation>> 
globalLocations() {
     HoodieBaseFile baseFile = partitionPathBaseFilePair.getRight();
-    return fetchRecordKeysWithPositions(baseFile).stream()
-        .map(entry -> Pair.of(entry.getLeft().getRecordKey(),
+    return new 
CloseableMappingIterator<>(fetchRecordKeysWithPositions(baseFile),
+        entry -> Pair.of(entry.getLeft().getRecordKey(),
             new HoodieRecordGlobalLocation(
                 entry.getLeft().getPartitionPath(), baseFile.getCommitTime(),
                 baseFile.getFileId(), entry.getRight())));
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/simple/TestGlobalSimpleIndex.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/simple/TestGlobalSimpleIndex.java
new file mode 100644
index 00000000000..d1f2cb375f2
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/simple/TestGlobalSimpleIndex.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.simple;
+
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodieListData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
+import org.apache.hudi.common.engine.LocalTaskContextSupplier;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.testutils.RawTripTestPayload;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.testutils.HoodieWriteableTestTable;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class TestGlobalSimpleIndex extends HoodieCommonTestHarness {
+  private static final Schema SCHEMA = 
getSchemaFromResource(TestGlobalSimpleIndex.class, "/exampleSchema.avsc", true);
+
+  @BeforeEach
+  void setUp() throws Exception {
+    initPath();
+    initMetaClient();
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {false, true})
+  void testTagLocation(boolean manuallySetPartitions) throws Exception {
+    String partition1 = "2016/01/31";
+    String partition2 = "2016/01/26";
+    String rowKey1 = UUID.randomUUID().toString();
+    String rowKey2 = UUID.randomUUID().toString();
+    String rowKey3 = UUID.randomUUID().toString();
+    String rowKey4 = UUID.randomUUID().toString();
+    String recordStr1 = "{\"_row_key\":\"" + rowKey1 + 
"\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
+    String recordStr2 = "{\"_row_key\":\"" + rowKey2 + 
"\",\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}";
+    String recordStr3 = "{\"_row_key\":\"" + rowKey3 + 
"\",\"time\":\"2016-01-26T03:16:41.415Z\",\"number\":15}";
+    String recordStr4 = "{\"_row_key\":\"" + rowKey4 + 
"\",\"time\":\"2015-01-31T03:16:41.415Z\",\"number\":32}";
+    RawTripTestPayload payload1 = new RawTripTestPayload(recordStr1);
+    HoodieRecord record1 = new HoodieAvroRecord(
+        new HoodieKey(payload1.getRowKey(), payload1.getPartitionPath()), 
payload1);
+    RawTripTestPayload payload2 = new RawTripTestPayload(recordStr2);
+    HoodieRecord record2 = new HoodieAvroRecord(
+        new HoodieKey(payload2.getRowKey(), payload2.getPartitionPath()), 
payload2);
+    RawTripTestPayload payload3 = new RawTripTestPayload(recordStr3);
+    HoodieRecord record3 = new HoodieAvroRecord(
+        new HoodieKey(payload3.getRowKey(), payload3.getPartitionPath()), 
payload3);
+    HoodieRecord record3WithNewPartition = new HoodieAvroRecord(
+        new HoodieKey(payload3.getRowKey(), partition1), payload3);
+    RawTripTestPayload payload4 = new RawTripTestPayload(recordStr4);
+    HoodieAvroRecord record4 = new HoodieAvroRecord(
+        new HoodieKey(payload4.getRowKey(), payload4.getPartitionPath()), 
payload4);
+    HoodieData<HoodieRecord<HoodieAvroRecord>> records = 
HoodieListData.eager(Arrays.asList(record1, record2, record3WithNewPartition, 
record4));
+
+    HoodieWriteConfig config = makeConfig(manuallySetPartitions);
+    Configuration conf = new Configuration(false);
+    HoodieEngineContext context = new 
HoodieLocalEngineContext(metaClient.getStorageConf());
+    HoodieTable table = mock(HoodieTable.class, RETURNS_DEEP_STUBS);
+    when(table.getConfig()).thenReturn(config);
+    when(table.getMetaClient()).thenReturn(metaClient);
+    when(table.getStorage()).thenReturn(metaClient.getStorage());
+    HoodieGlobalSimpleIndex globalSimpleIndex = new 
HoodieGlobalSimpleIndex(config, Option.empty());
+    HoodieData<HoodieRecord<HoodieAvroRecord>> taggedRecordRDD = 
globalSimpleIndex.tagLocation(records, context, table);
+    
assertFalse(taggedRecordRDD.collectAsList().stream().anyMatch(HoodieRecord::isCurrentLocationKnown));
+
+    HoodieStorage hoodieStorage = new HoodieHadoopStorage(basePath, conf);
+    HoodieWriteableTestTable testTable = new 
HoodieWriteableTestTable(basePath, hoodieStorage, metaClient, SCHEMA, null, 
null, Option.of(context));
+
+    String fileId1 = UUID.randomUUID().toString();
+    String fileId2 = UUID.randomUUID().toString();
+    String fileId3 = UUID.randomUUID().toString();
+    TaskContextSupplier localTaskContextSupplier = new 
LocalTaskContextSupplier();
+    StoragePath filePath1 = testTable.addCommit("001").withInserts(partition1, 
fileId1, Collections.singletonList(record1), localTaskContextSupplier);
+    StoragePath filePath2 = testTable.addCommit("002").withInserts(partition1, 
fileId2, Collections.singletonList(record2), localTaskContextSupplier);
+    StoragePath filePath3 = testTable.addCommit("003").withInserts(partition2, 
fileId3, Collections.singletonList(record3), localTaskContextSupplier);
+
+    String timestamp = 
metaClient.reloadActiveTimeline().lastInstant().get().requestedTime();
+    when(table.getBaseFileOnlyView().getLatestBaseFilesBeforeOrOn(partition1, 
timestamp))
+        .thenReturn(Stream.of(new 
HoodieBaseFile(hoodieStorage.getPathInfo(filePath1)), new 
HoodieBaseFile(hoodieStorage.getPathInfo(filePath2))));
+    when(table.getBaseFileOnlyView().getLatestBaseFilesBeforeOrOn(partition2, 
timestamp))
+        .thenReturn(Stream.of(new 
HoodieBaseFile(hoodieStorage.getPathInfo(filePath3))));
+
+    taggedRecordRDD = globalSimpleIndex.tagLocation(records, context, table);
+    Map<String, Option<String>> expectedRecordKeyToFileId = new HashMap<>();
+    expectedRecordKeyToFileId.put(rowKey1, Option.of(fileId1));
+    expectedRecordKeyToFileId.put(rowKey2, Option.of(fileId2));
+    expectedRecordKeyToFileId.put(rowKey3, Option.of(fileId3));
+    expectedRecordKeyToFileId.put(rowKey4, Option.empty());
+    Map<String, Option<String>> actualRecordKeyToFileId = 
taggedRecordRDD.collectAsList().stream()
+        .collect(Collectors.toMap(HoodieRecord::getRecordKey, record -> 
record.isCurrentLocationKnown() ? 
Option.of(record.getCurrentLocation().getFileId()) : Option.empty()));
+    assertEquals(expectedRecordKeyToFileId, actualRecordKeyToFileId);
+  }
+
+  private HoodieWriteConfig makeConfig(boolean manuallySetPartitions) {
+    Properties props = new Properties();
+    props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), 
"_row_key");
+    return HoodieWriteConfig.newBuilder()
+        .withPath(basePath)
+        .withIndexConfig(HoodieIndexConfig.newBuilder()
+            .fromProperties(props)
+            .withIndexType(HoodieIndex.IndexType.GLOBAL_SIMPLE)
+            .withGlobalSimpleIndexParallelism(manuallySetPartitions ? 1 : 0)
+            .build())
+        .build();
+  }
+}
+
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/simple/TestSimpleIndex.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/simple/TestSimpleIndex.java
new file mode 100644
index 00000000000..4e2956440e3
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/simple/TestSimpleIndex.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.simple;
+
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodieListData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
+import org.apache.hudi.common.engine.LocalTaskContextSupplier;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.testutils.RawTripTestPayload;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.testutils.HoodieWriteableTestTable;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class TestSimpleIndex extends HoodieCommonTestHarness {
+  private static final Schema SCHEMA = 
getSchemaFromResource(TestSimpleIndex.class, "/exampleSchema.avsc", true);
+
+  @BeforeEach
+  void setUp() throws Exception {
+    initPath();
+    initMetaClient();
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {false, true})
+  void testTagLocation(boolean manuallySetPartitions) throws Exception {
+    String partition1 = "2016/01/31";
+    String partition2 = "2016/01/26";
+    String rowKey1 = UUID.randomUUID().toString();
+    String rowKey2 = UUID.randomUUID().toString();
+    String rowKey3 = UUID.randomUUID().toString();
+    String rowKey4 = UUID.randomUUID().toString();
+    String recordStr1 = "{\"_row_key\":\"" + rowKey1 + 
"\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
+    String recordStr2 = "{\"_row_key\":\"" + rowKey2 + 
"\",\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}";
+    String recordStr3 = "{\"_row_key\":\"" + rowKey3 + 
"\",\"time\":\"2016-01-26T03:16:41.415Z\",\"number\":15}";
+    String recordStr4 = "{\"_row_key\":\"" + rowKey4 + 
"\",\"time\":\"2015-01-31T03:16:41.415Z\",\"number\":32}";
+    RawTripTestPayload payload1 = new RawTripTestPayload(recordStr1);
+    HoodieRecord record1 = new HoodieAvroRecord(
+        new HoodieKey(payload1.getRowKey(), payload1.getPartitionPath()), 
payload1);
+    RawTripTestPayload payload2 = new RawTripTestPayload(recordStr2);
+    HoodieRecord record2 = new HoodieAvroRecord(
+        new HoodieKey(payload2.getRowKey(), payload2.getPartitionPath()), 
payload2);
+    RawTripTestPayload payload3 = new RawTripTestPayload(recordStr3);
+    HoodieRecord record3 = new HoodieAvroRecord(
+        new HoodieKey(payload3.getRowKey(), payload3.getPartitionPath()), 
payload3);
+    HoodieRecord record3WithNewPartition = new HoodieAvroRecord(
+        new HoodieKey(payload3.getRowKey(), partition1), payload3);
+    RawTripTestPayload payload4 = new RawTripTestPayload(recordStr4);
+    HoodieAvroRecord record4 = new HoodieAvroRecord(
+        new HoodieKey(payload4.getRowKey(), payload4.getPartitionPath()), 
payload4);
+    HoodieData<HoodieRecord<HoodieAvroRecord>> records = 
HoodieListData.eager(Arrays.asList(record1, record2, record3WithNewPartition, 
record4));
+
+    HoodieWriteConfig config = makeConfig(manuallySetPartitions);
+    Configuration conf = new Configuration(false);
+
+    HoodieEngineContext context = new 
HoodieLocalEngineContext(metaClient.getStorageConf());
+    HoodieTable table = mock(HoodieTable.class, RETURNS_DEEP_STUBS);
+    when(table.getConfig()).thenReturn(config);
+    when(table.getMetaClient()).thenReturn(metaClient);
+    when(table.getStorage()).thenReturn(metaClient.getStorage());
+    HoodieSimpleIndex simpleIndex = new HoodieSimpleIndex(config, 
Option.empty());
+    HoodieData<HoodieRecord<HoodieAvroRecord>> taggedRecordRDD = 
simpleIndex.tagLocation(records, context, table);
+    
assertFalse(taggedRecordRDD.collectAsList().stream().anyMatch(HoodieRecord::isCurrentLocationKnown));
+
+    HoodieStorage hoodieStorage = new HoodieHadoopStorage(basePath, conf);
+    HoodieWriteableTestTable testTable = new 
HoodieWriteableTestTable(basePath, hoodieStorage, metaClient, SCHEMA, null, 
null, Option.of(context));
+
+    String fileId1 = UUID.randomUUID().toString();
+    String fileId2 = UUID.randomUUID().toString();
+    String fileId3 = UUID.randomUUID().toString();
+    TaskContextSupplier localTaskContextSupplier = new 
LocalTaskContextSupplier();
+    StoragePath filePath1 = testTable.addCommit("001").withInserts(partition1, 
fileId1, Collections.singletonList(record1), localTaskContextSupplier);
+    StoragePath filePath2 = testTable.addCommit("002").withInserts(partition1, 
fileId2, Collections.singletonList(record2), localTaskContextSupplier);
+    testTable.addCommit("003").withInserts(partition2, fileId3, 
Collections.singletonList(record3), localTaskContextSupplier);
+
+    String timestamp = 
metaClient.reloadActiveTimeline().lastInstant().get().requestedTime();
+    when(table.getBaseFileOnlyView().getLatestBaseFilesBeforeOrOn(partition1, 
timestamp))
+        .thenReturn(Stream.of(new 
HoodieBaseFile(hoodieStorage.getPathInfo(filePath1)), new 
HoodieBaseFile(hoodieStorage.getPathInfo(filePath2))));
+    
when(table.getBaseFileOnlyView().getLatestBaseFilesBeforeOrOn("2015/01/31", 
timestamp))
+        .thenReturn(Stream.empty());
+
+    taggedRecordRDD = simpleIndex.tagLocation(records, context, table);
+    Map<String, Option<String>> expectedRecordKeyToFileId = new HashMap<>();
+    expectedRecordKeyToFileId.put(rowKey1, Option.of(fileId1));
+    expectedRecordKeyToFileId.put(rowKey2, Option.of(fileId2));
+    // record3 has a new partition so will not be tagged
+    expectedRecordKeyToFileId.put(rowKey3, Option.empty());
+    expectedRecordKeyToFileId.put(rowKey4, Option.empty());
+    Map<String, Option<String>> actualRecordKeyToFileId = 
taggedRecordRDD.collectAsList().stream()
+        .collect(Collectors.toMap(HoodieRecord::getRecordKey, record -> 
record.isCurrentLocationKnown() ? 
Option.of(record.getCurrentLocation().getFileId()) : Option.empty()));
+    assertEquals(expectedRecordKeyToFileId, actualRecordKeyToFileId);
+  }
+
+  private HoodieWriteConfig makeConfig(boolean manuallySetPartitions) {
+    Properties props = new Properties();
+    props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), 
"_row_key");
+    return HoodieWriteConfig.newBuilder()
+        .withPath(basePath)
+        .withIndexConfig(HoodieIndexConfig.newBuilder()
+            .fromProperties(props)
+            .withIndexType(HoodieIndex.IndexType.SIMPLE)
+            .withSimpleIndexParallelism(manuallySetPartitions ? 1 : 0)
+            .build())
+        .build();
+  }
+}
+
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
index 6e0aed544f4..677e371ae56 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
@@ -73,7 +73,7 @@ public class HoodieWriteableTestTable extends 
HoodieMetadataTestTable {
   private static final Logger LOG = 
LoggerFactory.getLogger(HoodieWriteableTestTable.class);
 
   protected final Schema schema;
-  protected final BloomFilter filter;
+  protected final Option<BloomFilter> filter;
   protected final boolean populateMetaFields;
 
   protected HoodieWriteableTestTable(String basePath, HoodieStorage storage,
@@ -88,13 +88,13 @@ public class HoodieWriteableTestTable extends 
HoodieMetadataTestTable {
     this(basePath, storage, metaClient, schema, filter, metadataWriter, 
Option.empty());
   }
 
-  protected HoodieWriteableTestTable(String basePath, HoodieStorage storage,
+  public HoodieWriteableTestTable(String basePath, HoodieStorage storage,
                                      HoodieTableMetaClient metaClient, Schema 
schema,
                                      BloomFilter filter, 
HoodieTableMetadataWriter metadataWriter,
                                      Option<HoodieEngineContext> context) {
     super(basePath, storage, metaClient, metadataWriter, context);
     this.schema = schema;
-    this.filter = filter;
+    this.filter = Option.ofNullable(filter);
     this.populateMetaFields = metaClient.getTableConfig().populateMetaFields();
   }
 
@@ -121,7 +121,7 @@ public class HoodieWriteableTestTable extends 
HoodieMetadataTestTable {
 
     if 
(HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().equals(HoodieFileFormat.PARQUET))
 {
       HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(
-          new AvroSchemaConverter().convert(schema), schema, 
Option.of(filter), new Properties());
+          new AvroSchemaConverter().convert(schema), schema, filter, new 
Properties());
       HoodieParquetConfig<HoodieAvroWriteSupport> config = new 
HoodieParquetConfig<>(writeSupport, CompressionCodecName.GZIP,
           ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 
120 * 1024 * 1024,
           storage.getConf(), 
Double.parseDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue()),
 true);
@@ -135,7 +135,7 @@ public class HoodieWriteableTestTable extends 
HoodieMetadataTestTable {
             HoodieAvroUtils.addCommitMetadataToRecord(avroRecord, 
currentInstantTime, String.valueOf(seqId++));
             HoodieAvroUtils.addHoodieKeyToRecord(avroRecord, 
record.getRecordKey(), record.getPartitionPath(), fileName);
             writer.writeAvro(record.getRecordKey(), avroRecord);
-            filter.add(record.getRecordKey());
+            filter.ifPresent(f -> f.add(record.getRecordKey()));
           } else {
             writer.writeAvro(record.getRecordKey(), avroRecord);
           }
@@ -146,7 +146,7 @@ public class HoodieWriteableTestTable extends 
HoodieMetadataTestTable {
       int orcStripSize = 
Integer.parseInt(HoodieStorageConfig.ORC_STRIPE_SIZE.defaultValue());
       int orcBlockSize = 
Integer.parseInt(HoodieStorageConfig.ORC_BLOCK_SIZE.defaultValue());
       int maxFileSize = 
Integer.parseInt(HoodieStorageConfig.ORC_FILE_MAX_SIZE.defaultValue());
-      HoodieOrcConfig config = new HoodieOrcConfig(conf, CompressionKind.ZLIB, 
orcStripSize, orcBlockSize, maxFileSize, filter);
+      HoodieOrcConfig config = new HoodieOrcConfig(conf, CompressionKind.ZLIB, 
orcStripSize, orcBlockSize, maxFileSize, filter.orElse(null));
       try (HoodieAvroOrcWriter writer = new HoodieAvroOrcWriter(
           currentInstantTime,
           new StoragePath(Paths.get(basePath, partition, fileName).toString()),
@@ -157,7 +157,7 @@ public class HoodieWriteableTestTable extends 
HoodieMetadataTestTable {
           HoodieAvroUtils.addCommitMetadataToRecord(avroRecord, 
currentInstantTime, String.valueOf(seqId++));
           HoodieAvroUtils.addHoodieKeyToRecord(avroRecord, 
record.getRecordKey(), record.getPartitionPath(), fileName);
           writer.writeAvro(record.getRecordKey(), avroRecord);
-          filter.add(record.getRecordKey());
+          filter.ifPresent(f -> f.add(record.getRecordKey()));
         }
       }
     }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java
index 63b8cc5de5c..2e53c7cadf8 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
@@ -50,7 +51,6 @@ import org.junit.jupiter.params.provider.ValueSource;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -107,10 +107,11 @@ public class TestHoodieKeyLocationFetchHandle extends 
HoodieSparkClientTestHarne
     for (Tuple2<String, HoodieBaseFile> entry : partitionPathFileIdPairs) {
       HoodieKeyLocationFetchHandle fetcherHandle = new 
HoodieKeyLocationFetchHandle(config, hoodieTable, Pair.of(entry._1, entry._2),
           populateMetaFields ? Option.empty() : Option.of(keyGenerator));
-      Iterator<Pair<HoodieKey, HoodieRecordLocation>> result = 
fetcherHandle.locations().iterator();
-      List<Tuple2<HoodieKey, HoodieRecordLocation>> actualList = new 
ArrayList<>();
-      result.forEachRemaining(x -> actualList.add(new Tuple2<>(x.getLeft(), 
x.getRight())));
-      assertEquals(expectedList.get(new Tuple2<>(entry._1, 
entry._2.getFileId())), actualList);
+      try (ClosableIterator<Pair<HoodieKey, HoodieRecordLocation>> result = 
fetcherHandle.locations()) {
+        List<Tuple2<HoodieKey, HoodieRecordLocation>> actualList = new 
ArrayList<>();
+        result.forEachRemaining(x -> actualList.add(new Tuple2<>(x.getLeft(), 
x.getRight())));
+        assertEquals(expectedList.get(new Tuple2<>(entry._1, 
entry._2.getFileId())), actualList);
+      }
     }
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java
index fea44ef2a75..39aa1f807e4 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java
@@ -40,6 +40,7 @@ import javax.annotation.Nonnull;
 
 import java.io.IOException;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -187,7 +188,7 @@ public abstract class FileFormatUtils {
    * @param filePath the data file path.
    * @return {@link List} of pairs of {@link HoodieKey} and position fetched 
from the data file.
    */
-  public abstract List<Pair<HoodieKey, Long>> 
fetchRecordKeysWithPositions(HoodieStorage storage, StoragePath filePath);
+  public abstract ClosableIterator<Pair<HoodieKey, Long>> 
fetchRecordKeysWithPositions(HoodieStorage storage, StoragePath filePath);
 
   /**
    * Provides a closable iterator for reading the given data file.
@@ -216,11 +217,11 @@ public abstract class FileFormatUtils {
    * @param storage         {@link HoodieStorage} instance.
    * @param filePath        the data file path.
    * @param keyGeneratorOpt instance of KeyGenerator.
-   * @return {@link List} of pairs of {@link HoodieKey} and position fetched 
from the data file.
+   * @return {@link Iterator} of pairs of {@link HoodieKey} and position 
fetched from the data file.
    */
-  public abstract List<Pair<HoodieKey, Long>> 
fetchRecordKeysWithPositions(HoodieStorage storage,
-                                                                           
StoragePath filePath,
-                                                                           
Option<BaseKeyGenerator> keyGeneratorOpt);
+  public abstract ClosableIterator<Pair<HoodieKey, Long>> 
fetchRecordKeysWithPositions(HoodieStorage storage,
+                                                                               
        StoragePath filePath,
+                                                                               
        Option<BaseKeyGenerator> keyGeneratorOpt);
 
   /**
    * Read the Avro schema of the data file.
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
index 28b0dc2c42c..27fcba8d1a1 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
@@ -109,7 +109,7 @@ public class HFileUtils extends FileFormatUtils {
   }
 
   @Override
-  public List<Pair<HoodieKey, Long>> 
fetchRecordKeysWithPositions(HoodieStorage storage, StoragePath filePath) {
+  public ClosableIterator<Pair<HoodieKey, Long>> 
fetchRecordKeysWithPositions(HoodieStorage storage, StoragePath filePath) {
     throw new UnsupportedOperationException("HFileUtils does not support 
fetchRecordKeysWithPositions");
   }
 
@@ -124,7 +124,7 @@ public class HFileUtils extends FileFormatUtils {
   }
 
   @Override
-  public List<Pair<HoodieKey, Long>> 
fetchRecordKeysWithPositions(HoodieStorage storage, StoragePath filePath, 
Option<BaseKeyGenerator> keyGeneratorOpt) {
+  public ClosableIterator<Pair<HoodieKey, Long>> 
fetchRecordKeysWithPositions(HoodieStorage storage, StoragePath filePath, 
Option<BaseKeyGenerator> keyGeneratorOpt) {
     throw new UnsupportedOperationException("HFileUtils does not support 
fetchRecordKeysWithPositions");
   }
 
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
index fad371ac7af..0ecf79400ff 100644
--- a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
+++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
@@ -58,6 +59,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
 import static org.apache.hudi.common.util.BinaryUtil.toBytes;
@@ -89,28 +91,21 @@ public class OrcUtils extends FileFormatUtils {
    * @return {@link List} of {@link HoodieKey}s fetched from the ORC file
    */
   @Override
-  public List<Pair<HoodieKey, Long>> 
fetchRecordKeysWithPositions(HoodieStorage storage, StoragePath filePath) {
+  public ClosableIterator<Pair<HoodieKey, Long>> 
fetchRecordKeysWithPositions(HoodieStorage storage, StoragePath filePath) {
     return fetchRecordKeysWithPositions(storage, filePath, Option.empty());
   }
 
   @Override
-  public List<Pair<HoodieKey, Long>> 
fetchRecordKeysWithPositions(HoodieStorage storage, StoragePath filePath, 
Option<BaseKeyGenerator> keyGeneratorOpt) {
+  public ClosableIterator<Pair<HoodieKey, Long>> 
fetchRecordKeysWithPositions(HoodieStorage storage, StoragePath filePath, 
Option<BaseKeyGenerator> keyGeneratorOpt) {
     try {
       if (!storage.exists(filePath)) {
-        return Collections.emptyList();
+        return ClosableIterator.wrap(Collections.emptyIterator());
       }
     } catch (IOException e) {
       throw new HoodieIOException("Failed to read from ORC file:" + filePath, 
e);
     }
-    List<Pair<HoodieKey, Long>> hoodieKeysAndPositions = new ArrayList<>();
-    long position = 0;
-    try (ClosableIterator<HoodieKey> iterator = getHoodieKeyIterator(storage, 
filePath, keyGeneratorOpt)) {
-      while (iterator.hasNext()) {
-        hoodieKeysAndPositions.add(Pair.of(iterator.next(), position));
-        position++;
-      }
-    }
-    return hoodieKeysAndPositions;
+    AtomicLong position = new AtomicLong(0);
+    return new CloseableMappingIterator<>(getHoodieKeyIterator(storage, 
filePath, keyGeneratorOpt), key -> Pair.of(key, position.getAndIncrement()));
   }
 
   @Override
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
index b833748dc45..eb50cd43ea8 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.MetadataNotFoundException;
@@ -72,6 +73,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -174,7 +176,7 @@ public class ParquetUtils extends FileFormatUtils {
    * @return {@link List} of pairs of {@link HoodieKey} and row position 
fetched from the parquet file
    */
   @Override
-  public List<Pair<HoodieKey, Long>> 
fetchRecordKeysWithPositions(HoodieStorage storage, StoragePath filePath) {
+  public ClosableIterator<Pair<HoodieKey, Long>> 
fetchRecordKeysWithPositions(HoodieStorage storage, StoragePath filePath) {
     return fetchRecordKeysWithPositions(storage, filePath, Option.empty());
   }
 
@@ -223,16 +225,9 @@ public class ParquetUtils extends FileFormatUtils {
    * @return {@link List} of pairs of {@link HoodieKey} and row position 
fetched from the parquet file
    */
   @Override
-  public List<Pair<HoodieKey, Long>> 
fetchRecordKeysWithPositions(HoodieStorage storage, StoragePath filePath, 
Option<BaseKeyGenerator> keyGeneratorOpt) {
-    List<Pair<HoodieKey, Long>> hoodieKeysAndPositions = new ArrayList<>();
-    long position = 0;
-    try (ClosableIterator<HoodieKey> iterator = getHoodieKeyIterator(storage, 
filePath, keyGeneratorOpt)) {
-      while (iterator.hasNext()) {
-        hoodieKeysAndPositions.add(Pair.of(iterator.next(), position));
-        position++;
-      }
-      return hoodieKeysAndPositions;
-    }
+  public ClosableIterator<Pair<HoodieKey, Long>> 
fetchRecordKeysWithPositions(HoodieStorage storage, StoragePath filePath, 
Option<BaseKeyGenerator> keyGeneratorOpt) {
+    AtomicLong position = new AtomicLong(0);
+    return new CloseableMappingIterator<>(getHoodieKeyIterator(storage, 
filePath, keyGeneratorOpt), key -> Pair.of(key, position.getAndIncrement()));
   }
 
   /**
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java
index 248350c7a82..9732602ea19 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.storage.StoragePath;
@@ -154,8 +155,13 @@ public class TestParquetUtils extends 
HoodieCommonTestHarness {
     writeParquetFile(typeCode, filePath, rowKeys, schema, true, partitionPath);
 
     // Read and verify
-    List<Pair<HoodieKey, Long>> fetchedRows = 
parquetUtils.fetchRecordKeysWithPositions(
-        HoodieTestUtils.getStorage(filePath), new StoragePath(filePath));
+    List<Pair<HoodieKey, Long>> fetchedRows = new ArrayList<>();
+    try (ClosableIterator<Pair<HoodieKey, Long>> iter = 
parquetUtils.fetchRecordKeysWithPositions(
+        HoodieTestUtils.getStorage(filePath), new StoragePath(filePath))) {
+      while (iter.hasNext()) {
+        fetchedRows.add(iter.next());
+      }
+    }
     assertEquals(rowKeys.size(), fetchedRows.size(), "Total count does not 
match");
 
     for (Pair<HoodieKey, Long> entry : fetchedRows) {
@@ -180,9 +186,14 @@ public class TestParquetUtils extends 
HoodieCommonTestHarness {
         false, "abc", "def");
 
     // Read and verify
-    List<Pair<HoodieKey, Long>> fetchedRows = 
parquetUtils.fetchRecordKeysWithPositions(
+    List<Pair<HoodieKey, Long>> fetchedRows = new ArrayList<>();
+    try (ClosableIterator<Pair<HoodieKey, Long>> iter = 
parquetUtils.fetchRecordKeysWithPositions(
         HoodieTestUtils.getStorage(filePath), new StoragePath(filePath),
-        Option.of(new TestBaseKeyGen("abc", "def")));
+        Option.of(new TestBaseKeyGen("abc", "def")))) {
+      while (iter.hasNext()) {
+        fetchedRows.add(iter.next());
+      }
+    }
     assertEquals(rowKeys.size(), fetchedRows.size(), "Total count does not 
match");
 
     for (Pair<HoodieKey, Long> entry : fetchedRows) {


Reply via email to