This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f693bed0170 [HUDI-8673] Simple and Global-Simple Index fetch location
parallelism improvements (#12445)
f693bed0170 is described below
commit f693bed017090b37a4afe9a5a9068b894e643820
Author: Tim Brown <[email protected]>
AuthorDate: Mon Dec 9 03:12:59 2024 -0600
[HUDI-8673] Simple and Global-Simple Index fetch location parallelism
improvements (#12445)
---
.../hudi/index/simple/HoodieGlobalSimpleIndex.java | 13 +-
.../hudi/index/simple/HoodieSimpleIndex.java | 27 ++--
.../hudi/io/HoodieKeyLocationFetchHandle.java | 20 ++-
.../hudi/index/simple/TestGlobalSimpleIndex.java | 156 ++++++++++++++++++++
.../apache/hudi/index/simple/TestSimpleIndex.java | 158 +++++++++++++++++++++
.../hudi/testutils/HoodieWriteableTestTable.java | 14 +-
.../hudi/io/TestHoodieKeyLocationFetchHandle.java | 11 +-
.../apache/hudi/common/util/FileFormatUtils.java | 11 +-
.../org/apache/hudi/common/util/HFileUtils.java | 4 +-
.../java/org/apache/hudi/common/util/OrcUtils.java | 19 +--
.../org/apache/hudi/common/util/ParquetUtils.java | 17 +--
.../apache/hudi/common/util/TestParquetUtils.java | 19 ++-
12 files changed, 389 insertions(+), 80 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java
index 3c76ff17935..4abc3cb1d87 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java
@@ -69,11 +69,8 @@ public class HoodieGlobalSimpleIndex extends
HoodieSimpleIndex {
HoodieData<HoodieRecord<R>> inputRecords, HoodieEngineContext context,
HoodieTable hoodieTable) {
List<Pair<String, HoodieBaseFile>> latestBaseFiles =
getAllBaseFilesInTable(context, hoodieTable);
- int configuredSimpleIndexParallelism =
config.getGlobalSimpleIndexParallelism();
- int fetchParallelism =
- configuredSimpleIndexParallelism > 0 ?
configuredSimpleIndexParallelism : inputRecords.deduceNumPartitions();
HoodiePairData<String, HoodieRecordGlobalLocation> allKeysAndLocations =
- fetchRecordGlobalLocations(context, hoodieTable, fetchParallelism,
latestBaseFiles);
+ fetchRecordGlobalLocations(context, hoodieTable, latestBaseFiles);
boolean mayContainDuplicateLookup =
hoodieTable.getMetaClient().getTableType() == MERGE_ON_READ;
boolean shouldUpdatePartitionPath =
config.getGlobalSimpleIndexUpdatePartitionPath() && hoodieTable.isPartitioned();
return tagGlobalLocationBackToRecords(inputRecords, allKeysAndLocations,
@@ -81,13 +78,13 @@ public class HoodieGlobalSimpleIndex extends
HoodieSimpleIndex {
}
private HoodiePairData<String, HoodieRecordGlobalLocation>
fetchRecordGlobalLocations(
- HoodieEngineContext context, HoodieTable hoodieTable, int parallelism,
+ HoodieEngineContext context, HoodieTable hoodieTable,
List<Pair<String, HoodieBaseFile>> baseFiles) {
- int fetchParallelism = Math.max(1, Math.min(baseFiles.size(),
parallelism));
+ int parallelism = getParallelism(config.getGlobalSimpleIndexParallelism(),
baseFiles.size());
- return context.parallelize(baseFiles, fetchParallelism)
+ return context.parallelize(baseFiles, parallelism)
.flatMap(partitionPathBaseFile -> new
HoodieKeyLocationFetchHandle(config, hoodieTable, partitionPathBaseFile,
keyGeneratorOpt)
- .globalLocations().iterator())
+ .globalLocations())
.mapToPair(e -> (Pair<String, HoodieRecordGlobalLocation>) e);
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieSimpleIndex.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieSimpleIndex.java
index 99ffc1b47e6..db6d02b5624 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieSimpleIndex.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieSimpleIndex.java
@@ -107,16 +107,10 @@ public class HoodieSimpleIndex
.getString(HoodieIndexConfig.SIMPLE_INDEX_INPUT_STORAGE_LEVEL_VALUE));
}
- int deduceNumParallelism = inputRecords.deduceNumPartitions();
- int configuredSimpleIndexParallelism = config.getSimpleIndexParallelism();
- // NOTE: Target parallelism could be overridden by the config
- int fetchParallelism =
- configuredSimpleIndexParallelism > 0 ?
configuredSimpleIndexParallelism : deduceNumParallelism;
HoodiePairData<HoodieKey, HoodieRecord<R>> keyedInputRecords =
inputRecords.mapToPair(record -> new ImmutablePair<>(record.getKey(),
record));
HoodiePairData<HoodieKey, HoodieRecordLocation> existingLocationsOnTable =
- fetchRecordLocationsForAffectedPartitions(keyedInputRecords.keys(),
context, hoodieTable,
- fetchParallelism);
+ fetchRecordLocationsForAffectedPartitions(keyedInputRecords.keys(),
context, hoodieTable);
HoodieData<HoodieRecord<R>> taggedRecords =
keyedInputRecords.leftOuterJoin(existingLocationsOnTable).map(entry ->
{
@@ -137,27 +131,30 @@ public class HoodieSimpleIndex
* @param hoodieKeys {@link HoodieData} of {@link HoodieKey}s for which
locations are fetched
* @param context instance of {@link HoodieEngineContext} to use
* @param hoodieTable instance of {@link HoodieTable} of interest
- * @param parallelism parallelism to use
* @return {@link HoodiePairData} of {@link HoodieKey} and {@link
HoodieRecordLocation}
*/
protected HoodiePairData<HoodieKey, HoodieRecordLocation>
fetchRecordLocationsForAffectedPartitions(
- HoodieData<HoodieKey> hoodieKeys, HoodieEngineContext context,
HoodieTable hoodieTable,
- int parallelism) {
+ HoodieData<HoodieKey> hoodieKeys, HoodieEngineContext context,
HoodieTable hoodieTable) {
List<String> affectedPartitionPathList =
hoodieKeys.map(HoodieKey::getPartitionPath).distinct(hoodieKeys.deduceNumPartitions()).collectAsList();
List<Pair<String, HoodieBaseFile>> latestBaseFiles =
getLatestBaseFilesForAllPartitions(affectedPartitionPathList, context,
hoodieTable);
- return fetchRecordLocations(context, hoodieTable, parallelism,
latestBaseFiles);
+ return fetchRecordLocations(context, hoodieTable, latestBaseFiles);
}
protected HoodiePairData<HoodieKey, HoodieRecordLocation>
fetchRecordLocations(
- HoodieEngineContext context, HoodieTable hoodieTable, int parallelism,
+ HoodieEngineContext context, HoodieTable hoodieTable,
List<Pair<String, HoodieBaseFile>> baseFiles) {
- int fetchParallelism = Math.max(1, Math.min(baseFiles.size(),
parallelism));
+ int parallelism = getParallelism(config.getSimpleIndexParallelism(),
baseFiles.size());
- return context.parallelize(baseFiles, fetchParallelism)
+ return context.parallelize(baseFiles, parallelism)
.flatMap(partitionPathBaseFile -> new
HoodieKeyLocationFetchHandle(config, hoodieTable, partitionPathBaseFile,
keyGeneratorOpt)
- .locations().iterator())
+ .locations())
.mapToPair(e -> (Pair<HoodieKey, HoodieRecordLocation>) e);
}
+
+ protected int getParallelism(int configuredParallelism, int
numberOfBaseFiles) {
+ int parallelism = configuredParallelism > 0 && configuredParallelism <
numberOfBaseFiles ? configuredParallelism : numberOfBaseFiles;
+ return Math.max(1, parallelism);
+ }
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
index a6245fa8950..cf5f61677b8 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
@@ -24,15 +24,14 @@ import
org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.table.HoodieTable;
-import java.util.List;
-import java.util.stream.Stream;
-
/**
* {@link HoodieRecordLocation} fetch handle for all records from {@link
HoodieBaseFile} of interest.
*
@@ -50,7 +49,7 @@ public class HoodieKeyLocationFetchHandle<T, I, K, O> extends
HoodieReadHandle<T
this.keyGeneratorOpt = keyGeneratorOpt;
}
- private List<Pair<HoodieKey, Long>>
fetchRecordKeysWithPositions(HoodieBaseFile baseFile) {
+ private ClosableIterator<Pair<HoodieKey, Long>>
fetchRecordKeysWithPositions(HoodieBaseFile baseFile) {
FileFormatUtils fileFormatUtils =
HoodieIOFactory.getIOFactory(hoodieTable.getStorage())
.getFileFormatUtils(baseFile.getStoragePath());
if (keyGeneratorOpt.isPresent()) {
@@ -60,19 +59,18 @@ public class HoodieKeyLocationFetchHandle<T, I, K, O>
extends HoodieReadHandle<T
}
}
- public Stream<Pair<HoodieKey, HoodieRecordLocation>> locations() {
+ public ClosableIterator<Pair<HoodieKey, HoodieRecordLocation>> locations() {
HoodieBaseFile baseFile = partitionPathBaseFilePair.getRight();
String commitTime = baseFile.getCommitTime();
String fileId = baseFile.getFileId();
- return fetchRecordKeysWithPositions(baseFile).stream()
- .map(entry -> Pair.of(entry.getLeft(),
- new HoodieRecordLocation(commitTime, fileId, entry.getRight())));
+ return new
CloseableMappingIterator<>(fetchRecordKeysWithPositions(baseFile),
+ entry -> Pair.of(entry.getLeft(), new HoodieRecordLocation(commitTime,
fileId, entry.getRight())));
}
- public Stream<Pair<String, HoodieRecordGlobalLocation>> globalLocations() {
+ public ClosableIterator<Pair<String, HoodieRecordGlobalLocation>>
globalLocations() {
HoodieBaseFile baseFile = partitionPathBaseFilePair.getRight();
- return fetchRecordKeysWithPositions(baseFile).stream()
- .map(entry -> Pair.of(entry.getLeft().getRecordKey(),
+ return new
CloseableMappingIterator<>(fetchRecordKeysWithPositions(baseFile),
+ entry -> Pair.of(entry.getLeft().getRecordKey(),
new HoodieRecordGlobalLocation(
entry.getLeft().getPartitionPath(), baseFile.getCommitTime(),
baseFile.getFileId(), entry.getRight())));
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/simple/TestGlobalSimpleIndex.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/simple/TestGlobalSimpleIndex.java
new file mode 100644
index 00000000000..d1f2cb375f2
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/simple/TestGlobalSimpleIndex.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.simple;
+
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodieListData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
+import org.apache.hudi.common.engine.LocalTaskContextSupplier;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.testutils.RawTripTestPayload;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.testutils.HoodieWriteableTestTable;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static
org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class TestGlobalSimpleIndex extends HoodieCommonTestHarness {
+ private static final Schema SCHEMA =
getSchemaFromResource(TestGlobalSimpleIndex.class, "/exampleSchema.avsc", true);
+
+ @BeforeEach
+ void setUp() throws Exception {
+ initPath();
+ initMetaClient();
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testTagLocation(boolean manuallySetPartitions) throws Exception {
+ String partition1 = "2016/01/31";
+ String partition2 = "2016/01/26";
+ String rowKey1 = UUID.randomUUID().toString();
+ String rowKey2 = UUID.randomUUID().toString();
+ String rowKey3 = UUID.randomUUID().toString();
+ String rowKey4 = UUID.randomUUID().toString();
+ String recordStr1 = "{\"_row_key\":\"" + rowKey1 +
"\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
+ String recordStr2 = "{\"_row_key\":\"" + rowKey2 +
"\",\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}";
+ String recordStr3 = "{\"_row_key\":\"" + rowKey3 +
"\",\"time\":\"2016-01-26T03:16:41.415Z\",\"number\":15}";
+ String recordStr4 = "{\"_row_key\":\"" + rowKey4 +
"\",\"time\":\"2015-01-31T03:16:41.415Z\",\"number\":32}";
+ RawTripTestPayload payload1 = new RawTripTestPayload(recordStr1);
+ HoodieRecord record1 = new HoodieAvroRecord(
+ new HoodieKey(payload1.getRowKey(), payload1.getPartitionPath()),
payload1);
+ RawTripTestPayload payload2 = new RawTripTestPayload(recordStr2);
+ HoodieRecord record2 = new HoodieAvroRecord(
+ new HoodieKey(payload2.getRowKey(), payload2.getPartitionPath()),
payload2);
+ RawTripTestPayload payload3 = new RawTripTestPayload(recordStr3);
+ HoodieRecord record3 = new HoodieAvroRecord(
+ new HoodieKey(payload3.getRowKey(), payload3.getPartitionPath()),
payload3);
+ HoodieRecord record3WithNewPartition = new HoodieAvroRecord(
+ new HoodieKey(payload3.getRowKey(), partition1), payload3);
+ RawTripTestPayload payload4 = new RawTripTestPayload(recordStr4);
+ HoodieAvroRecord record4 = new HoodieAvroRecord(
+ new HoodieKey(payload4.getRowKey(), payload4.getPartitionPath()),
payload4);
+ HoodieData<HoodieRecord<HoodieAvroRecord>> records =
HoodieListData.eager(Arrays.asList(record1, record2, record3WithNewPartition,
record4));
+
+ HoodieWriteConfig config = makeConfig(manuallySetPartitions);
+ Configuration conf = new Configuration(false);
+ HoodieEngineContext context = new
HoodieLocalEngineContext(metaClient.getStorageConf());
+ HoodieTable table = mock(HoodieTable.class, RETURNS_DEEP_STUBS);
+ when(table.getConfig()).thenReturn(config);
+ when(table.getMetaClient()).thenReturn(metaClient);
+ when(table.getStorage()).thenReturn(metaClient.getStorage());
+ HoodieGlobalSimpleIndex globalSimpleIndex = new
HoodieGlobalSimpleIndex(config, Option.empty());
+ HoodieData<HoodieRecord<HoodieAvroRecord>> taggedRecordRDD =
globalSimpleIndex.tagLocation(records, context, table);
+
assertFalse(taggedRecordRDD.collectAsList().stream().anyMatch(HoodieRecord::isCurrentLocationKnown));
+
+ HoodieStorage hoodieStorage = new HoodieHadoopStorage(basePath, conf);
+ HoodieWriteableTestTable testTable = new
HoodieWriteableTestTable(basePath, hoodieStorage, metaClient, SCHEMA, null,
null, Option.of(context));
+
+ String fileId1 = UUID.randomUUID().toString();
+ String fileId2 = UUID.randomUUID().toString();
+ String fileId3 = UUID.randomUUID().toString();
+ TaskContextSupplier localTaskContextSupplier = new
LocalTaskContextSupplier();
+ StoragePath filePath1 = testTable.addCommit("001").withInserts(partition1,
fileId1, Collections.singletonList(record1), localTaskContextSupplier);
+ StoragePath filePath2 = testTable.addCommit("002").withInserts(partition1,
fileId2, Collections.singletonList(record2), localTaskContextSupplier);
+ StoragePath filePath3 = testTable.addCommit("003").withInserts(partition2,
fileId3, Collections.singletonList(record3), localTaskContextSupplier);
+
+ String timestamp =
metaClient.reloadActiveTimeline().lastInstant().get().requestedTime();
+ when(table.getBaseFileOnlyView().getLatestBaseFilesBeforeOrOn(partition1,
timestamp))
+ .thenReturn(Stream.of(new
HoodieBaseFile(hoodieStorage.getPathInfo(filePath1)), new
HoodieBaseFile(hoodieStorage.getPathInfo(filePath2))));
+ when(table.getBaseFileOnlyView().getLatestBaseFilesBeforeOrOn(partition2,
timestamp))
+ .thenReturn(Stream.of(new
HoodieBaseFile(hoodieStorage.getPathInfo(filePath3))));
+
+ taggedRecordRDD = globalSimpleIndex.tagLocation(records, context, table);
+ Map<String, Option<String>> expectedRecordKeyToFileId = new HashMap<>();
+ expectedRecordKeyToFileId.put(rowKey1, Option.of(fileId1));
+ expectedRecordKeyToFileId.put(rowKey2, Option.of(fileId2));
+ expectedRecordKeyToFileId.put(rowKey3, Option.of(fileId3));
+ expectedRecordKeyToFileId.put(rowKey4, Option.empty());
+ Map<String, Option<String>> actualRecordKeyToFileId =
taggedRecordRDD.collectAsList().stream()
+ .collect(Collectors.toMap(HoodieRecord::getRecordKey, record ->
record.isCurrentLocationKnown() ?
Option.of(record.getCurrentLocation().getFileId()) : Option.empty()));
+ assertEquals(expectedRecordKeyToFileId, actualRecordKeyToFileId);
+ }
+
+ private HoodieWriteConfig makeConfig(boolean manuallySetPartitions) {
+ Properties props = new Properties();
+ props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
"_row_key");
+ return HoodieWriteConfig.newBuilder()
+ .withPath(basePath)
+ .withIndexConfig(HoodieIndexConfig.newBuilder()
+ .fromProperties(props)
+ .withIndexType(HoodieIndex.IndexType.GLOBAL_SIMPLE)
+ .withGlobalSimpleIndexParallelism(manuallySetPartitions ? 1 : 0)
+ .build())
+ .build();
+ }
+}
+
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/simple/TestSimpleIndex.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/simple/TestSimpleIndex.java
new file mode 100644
index 00000000000..4e2956440e3
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/simple/TestSimpleIndex.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.simple;
+
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodieListData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
+import org.apache.hudi.common.engine.LocalTaskContextSupplier;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.testutils.RawTripTestPayload;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.testutils.HoodieWriteableTestTable;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static
org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class TestSimpleIndex extends HoodieCommonTestHarness {
+ private static final Schema SCHEMA =
getSchemaFromResource(TestSimpleIndex.class, "/exampleSchema.avsc", true);
+
+ @BeforeEach
+ void setUp() throws Exception {
+ initPath();
+ initMetaClient();
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testTagLocation(boolean manuallySetPartitions) throws Exception {
+ String partition1 = "2016/01/31";
+ String partition2 = "2016/01/26";
+ String rowKey1 = UUID.randomUUID().toString();
+ String rowKey2 = UUID.randomUUID().toString();
+ String rowKey3 = UUID.randomUUID().toString();
+ String rowKey4 = UUID.randomUUID().toString();
+ String recordStr1 = "{\"_row_key\":\"" + rowKey1 +
"\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
+ String recordStr2 = "{\"_row_key\":\"" + rowKey2 +
"\",\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}";
+ String recordStr3 = "{\"_row_key\":\"" + rowKey3 +
"\",\"time\":\"2016-01-26T03:16:41.415Z\",\"number\":15}";
+ String recordStr4 = "{\"_row_key\":\"" + rowKey4 +
"\",\"time\":\"2015-01-31T03:16:41.415Z\",\"number\":32}";
+ RawTripTestPayload payload1 = new RawTripTestPayload(recordStr1);
+ HoodieRecord record1 = new HoodieAvroRecord(
+ new HoodieKey(payload1.getRowKey(), payload1.getPartitionPath()),
payload1);
+ RawTripTestPayload payload2 = new RawTripTestPayload(recordStr2);
+ HoodieRecord record2 = new HoodieAvroRecord(
+ new HoodieKey(payload2.getRowKey(), payload2.getPartitionPath()),
payload2);
+ RawTripTestPayload payload3 = new RawTripTestPayload(recordStr3);
+ HoodieRecord record3 = new HoodieAvroRecord(
+ new HoodieKey(payload3.getRowKey(), payload3.getPartitionPath()),
payload3);
+ HoodieRecord record3WithNewPartition = new HoodieAvroRecord(
+ new HoodieKey(payload3.getRowKey(), partition1), payload3);
+ RawTripTestPayload payload4 = new RawTripTestPayload(recordStr4);
+ HoodieAvroRecord record4 = new HoodieAvroRecord(
+ new HoodieKey(payload4.getRowKey(), payload4.getPartitionPath()),
payload4);
+ HoodieData<HoodieRecord<HoodieAvroRecord>> records =
HoodieListData.eager(Arrays.asList(record1, record2, record3WithNewPartition,
record4));
+
+ HoodieWriteConfig config = makeConfig(manuallySetPartitions);
+ Configuration conf = new Configuration(false);
+
+ HoodieEngineContext context = new
HoodieLocalEngineContext(metaClient.getStorageConf());
+ HoodieTable table = mock(HoodieTable.class, RETURNS_DEEP_STUBS);
+ when(table.getConfig()).thenReturn(config);
+ when(table.getMetaClient()).thenReturn(metaClient);
+ when(table.getStorage()).thenReturn(metaClient.getStorage());
+ HoodieSimpleIndex simpleIndex = new HoodieSimpleIndex(config,
Option.empty());
+ HoodieData<HoodieRecord<HoodieAvroRecord>> taggedRecordRDD =
simpleIndex.tagLocation(records, context, table);
+
assertFalse(taggedRecordRDD.collectAsList().stream().anyMatch(HoodieRecord::isCurrentLocationKnown));
+
+ HoodieStorage hoodieStorage = new HoodieHadoopStorage(basePath, conf);
+ HoodieWriteableTestTable testTable = new
HoodieWriteableTestTable(basePath, hoodieStorage, metaClient, SCHEMA, null,
null, Option.of(context));
+
+ String fileId1 = UUID.randomUUID().toString();
+ String fileId2 = UUID.randomUUID().toString();
+ String fileId3 = UUID.randomUUID().toString();
+ TaskContextSupplier localTaskContextSupplier = new
LocalTaskContextSupplier();
+ StoragePath filePath1 = testTable.addCommit("001").withInserts(partition1,
fileId1, Collections.singletonList(record1), localTaskContextSupplier);
+ StoragePath filePath2 = testTable.addCommit("002").withInserts(partition1,
fileId2, Collections.singletonList(record2), localTaskContextSupplier);
+ testTable.addCommit("003").withInserts(partition2, fileId3,
Collections.singletonList(record3), localTaskContextSupplier);
+
+ String timestamp =
metaClient.reloadActiveTimeline().lastInstant().get().requestedTime();
+ when(table.getBaseFileOnlyView().getLatestBaseFilesBeforeOrOn(partition1,
timestamp))
+ .thenReturn(Stream.of(new
HoodieBaseFile(hoodieStorage.getPathInfo(filePath1)), new
HoodieBaseFile(hoodieStorage.getPathInfo(filePath2))));
+
when(table.getBaseFileOnlyView().getLatestBaseFilesBeforeOrOn("2015/01/31",
timestamp))
+ .thenReturn(Stream.empty());
+
+ taggedRecordRDD = simpleIndex.tagLocation(records, context, table);
+ Map<String, Option<String>> expectedRecordKeyToFileId = new HashMap<>();
+ expectedRecordKeyToFileId.put(rowKey1, Option.of(fileId1));
+ expectedRecordKeyToFileId.put(rowKey2, Option.of(fileId2));
+ // record3 has a new partition so will not be tagged
+ expectedRecordKeyToFileId.put(rowKey3, Option.empty());
+ expectedRecordKeyToFileId.put(rowKey4, Option.empty());
+ Map<String, Option<String>> actualRecordKeyToFileId =
taggedRecordRDD.collectAsList().stream()
+ .collect(Collectors.toMap(HoodieRecord::getRecordKey, record ->
record.isCurrentLocationKnown() ?
Option.of(record.getCurrentLocation().getFileId()) : Option.empty()));
+ assertEquals(expectedRecordKeyToFileId, actualRecordKeyToFileId);
+ }
+
+ private HoodieWriteConfig makeConfig(boolean manuallySetPartitions) {
+ Properties props = new Properties();
+ props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
"_row_key");
+ return HoodieWriteConfig.newBuilder()
+ .withPath(basePath)
+ .withIndexConfig(HoodieIndexConfig.newBuilder()
+ .fromProperties(props)
+ .withIndexType(HoodieIndex.IndexType.SIMPLE)
+ .withSimpleIndexParallelism(manuallySetPartitions ? 1 : 0)
+ .build())
+ .build();
+ }
+}
+
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
index 6e0aed544f4..677e371ae56 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
@@ -73,7 +73,7 @@ public class HoodieWriteableTestTable extends
HoodieMetadataTestTable {
private static final Logger LOG =
LoggerFactory.getLogger(HoodieWriteableTestTable.class);
protected final Schema schema;
- protected final BloomFilter filter;
+ protected final Option<BloomFilter> filter;
protected final boolean populateMetaFields;
protected HoodieWriteableTestTable(String basePath, HoodieStorage storage,
@@ -88,13 +88,13 @@ public class HoodieWriteableTestTable extends
HoodieMetadataTestTable {
this(basePath, storage, metaClient, schema, filter, metadataWriter,
Option.empty());
}
- protected HoodieWriteableTestTable(String basePath, HoodieStorage storage,
+ public HoodieWriteableTestTable(String basePath, HoodieStorage storage,
HoodieTableMetaClient metaClient, Schema
schema,
BloomFilter filter,
HoodieTableMetadataWriter metadataWriter,
Option<HoodieEngineContext> context) {
super(basePath, storage, metaClient, metadataWriter, context);
this.schema = schema;
- this.filter = filter;
+ this.filter = Option.ofNullable(filter);
this.populateMetaFields = metaClient.getTableConfig().populateMetaFields();
}
@@ -121,7 +121,7 @@ public class HoodieWriteableTestTable extends
HoodieMetadataTestTable {
if
(HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().equals(HoodieFileFormat.PARQUET))
{
HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(
- new AvroSchemaConverter().convert(schema), schema,
Option.of(filter), new Properties());
+ new AvroSchemaConverter().convert(schema), schema, filter, new
Properties());
HoodieParquetConfig<HoodieAvroWriteSupport> config = new
HoodieParquetConfig<>(writeSupport, CompressionCodecName.GZIP,
ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE,
120 * 1024 * 1024,
storage.getConf(),
Double.parseDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue()),
true);
@@ -135,7 +135,7 @@ public class HoodieWriteableTestTable extends
HoodieMetadataTestTable {
HoodieAvroUtils.addCommitMetadataToRecord(avroRecord,
currentInstantTime, String.valueOf(seqId++));
HoodieAvroUtils.addHoodieKeyToRecord(avroRecord,
record.getRecordKey(), record.getPartitionPath(), fileName);
writer.writeAvro(record.getRecordKey(), avroRecord);
- filter.add(record.getRecordKey());
+ filter.ifPresent(f -> f.add(record.getRecordKey()));
} else {
writer.writeAvro(record.getRecordKey(), avroRecord);
}
@@ -146,7 +146,7 @@ public class HoodieWriteableTestTable extends
HoodieMetadataTestTable {
int orcStripSize =
Integer.parseInt(HoodieStorageConfig.ORC_STRIPE_SIZE.defaultValue());
int orcBlockSize =
Integer.parseInt(HoodieStorageConfig.ORC_BLOCK_SIZE.defaultValue());
int maxFileSize =
Integer.parseInt(HoodieStorageConfig.ORC_FILE_MAX_SIZE.defaultValue());
- HoodieOrcConfig config = new HoodieOrcConfig(conf, CompressionKind.ZLIB,
orcStripSize, orcBlockSize, maxFileSize, filter);
+ HoodieOrcConfig config = new HoodieOrcConfig(conf, CompressionKind.ZLIB,
orcStripSize, orcBlockSize, maxFileSize, filter.orElse(null));
try (HoodieAvroOrcWriter writer = new HoodieAvroOrcWriter(
currentInstantTime,
new StoragePath(Paths.get(basePath, partition, fileName).toString()),
@@ -157,7 +157,7 @@ public class HoodieWriteableTestTable extends
HoodieMetadataTestTable {
HoodieAvroUtils.addCommitMetadataToRecord(avroRecord,
currentInstantTime, String.valueOf(seqId++));
HoodieAvroUtils.addHoodieKeyToRecord(avroRecord,
record.getRecordKey(), record.getPartitionPath(), fileName);
writer.writeAvro(record.getRecordKey(), avroRecord);
- filter.add(record.getRecordKey());
+ filter.ifPresent(f -> f.add(record.getRecordKey()));
}
}
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java
index 63b8cc5de5c..2e53c7cadf8 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
@@ -50,7 +51,6 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -107,10 +107,11 @@ public class TestHoodieKeyLocationFetchHandle extends
HoodieSparkClientTestHarne
for (Tuple2<String, HoodieBaseFile> entry : partitionPathFileIdPairs) {
HoodieKeyLocationFetchHandle fetcherHandle = new
HoodieKeyLocationFetchHandle(config, hoodieTable, Pair.of(entry._1, entry._2),
populateMetaFields ? Option.empty() : Option.of(keyGenerator));
- Iterator<Pair<HoodieKey, HoodieRecordLocation>> result =
fetcherHandle.locations().iterator();
- List<Tuple2<HoodieKey, HoodieRecordLocation>> actualList = new
ArrayList<>();
- result.forEachRemaining(x -> actualList.add(new Tuple2<>(x.getLeft(),
x.getRight())));
- assertEquals(expectedList.get(new Tuple2<>(entry._1,
entry._2.getFileId())), actualList);
+ try (ClosableIterator<Pair<HoodieKey, HoodieRecordLocation>> result =
fetcherHandle.locations()) {
+ List<Tuple2<HoodieKey, HoodieRecordLocation>> actualList = new
ArrayList<>();
+ result.forEachRemaining(x -> actualList.add(new Tuple2<>(x.getLeft(),
x.getRight())));
+ assertEquals(expectedList.get(new Tuple2<>(entry._1,
entry._2.getFileId())), actualList);
+ }
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java
index fea44ef2a75..39aa1f807e4 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java
@@ -40,6 +40,7 @@ import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -187,7 +188,7 @@ public abstract class FileFormatUtils {
* @param filePath the data file path.
* @return {@link List} of pairs of {@link HoodieKey} and position fetched
from the data file.
*/
- public abstract List<Pair<HoodieKey, Long>>
fetchRecordKeysWithPositions(HoodieStorage storage, StoragePath filePath);
+ public abstract ClosableIterator<Pair<HoodieKey, Long>>
fetchRecordKeysWithPositions(HoodieStorage storage, StoragePath filePath);
/**
* Provides a closable iterator for reading the given data file.
@@ -216,11 +217,11 @@ public abstract class FileFormatUtils {
* @param storage {@link HoodieStorage} instance.
* @param filePath the data file path.
* @param keyGeneratorOpt instance of KeyGenerator.
- * @return {@link List} of pairs of {@link HoodieKey} and position fetched
from the data file.
+ * @return {@link Iterator} of pairs of {@link HoodieKey} and position
fetched from the data file.
*/
- public abstract List<Pair<HoodieKey, Long>>
fetchRecordKeysWithPositions(HoodieStorage storage,
-
StoragePath filePath,
-
Option<BaseKeyGenerator> keyGeneratorOpt);
+ public abstract ClosableIterator<Pair<HoodieKey, Long>>
fetchRecordKeysWithPositions(HoodieStorage storage,
+
StoragePath filePath,
+
Option<BaseKeyGenerator> keyGeneratorOpt);
/**
* Read the Avro schema of the data file.
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
index 28b0dc2c42c..27fcba8d1a1 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
@@ -109,7 +109,7 @@ public class HFileUtils extends FileFormatUtils {
}
@Override
- public List<Pair<HoodieKey, Long>>
fetchRecordKeysWithPositions(HoodieStorage storage, StoragePath filePath) {
+ public ClosableIterator<Pair<HoodieKey, Long>>
fetchRecordKeysWithPositions(HoodieStorage storage, StoragePath filePath) {
throw new UnsupportedOperationException("HFileUtils does not support
fetchRecordKeysWithPositions");
}
@@ -124,7 +124,7 @@ public class HFileUtils extends FileFormatUtils {
}
@Override
- public List<Pair<HoodieKey, Long>>
fetchRecordKeysWithPositions(HoodieStorage storage, StoragePath filePath,
Option<BaseKeyGenerator> keyGeneratorOpt) {
+ public ClosableIterator<Pair<HoodieKey, Long>>
fetchRecordKeysWithPositions(HoodieStorage storage, StoragePath filePath,
Option<BaseKeyGenerator> keyGeneratorOpt) {
throw new UnsupportedOperationException("HFileUtils does not support
fetchRecordKeysWithPositions");
}
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
index fad371ac7af..0ecf79400ff 100644
--- a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
+++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
@@ -58,6 +59,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import static org.apache.hudi.common.util.BinaryUtil.toBytes;
@@ -89,28 +91,21 @@ public class OrcUtils extends FileFormatUtils {
* @return {@link List} of {@link HoodieKey}s fetched from the ORC file
*/
@Override
- public List<Pair<HoodieKey, Long>>
fetchRecordKeysWithPositions(HoodieStorage storage, StoragePath filePath) {
+ public ClosableIterator<Pair<HoodieKey, Long>>
fetchRecordKeysWithPositions(HoodieStorage storage, StoragePath filePath) {
return fetchRecordKeysWithPositions(storage, filePath, Option.empty());
}
@Override
- public List<Pair<HoodieKey, Long>>
fetchRecordKeysWithPositions(HoodieStorage storage, StoragePath filePath,
Option<BaseKeyGenerator> keyGeneratorOpt) {
+ public ClosableIterator<Pair<HoodieKey, Long>>
fetchRecordKeysWithPositions(HoodieStorage storage, StoragePath filePath,
Option<BaseKeyGenerator> keyGeneratorOpt) {
try {
if (!storage.exists(filePath)) {
- return Collections.emptyList();
+ return ClosableIterator.wrap(Collections.emptyIterator());
}
} catch (IOException e) {
throw new HoodieIOException("Failed to read from ORC file:" + filePath,
e);
}
- List<Pair<HoodieKey, Long>> hoodieKeysAndPositions = new ArrayList<>();
- long position = 0;
- try (ClosableIterator<HoodieKey> iterator = getHoodieKeyIterator(storage,
filePath, keyGeneratorOpt)) {
- while (iterator.hasNext()) {
- hoodieKeysAndPositions.add(Pair.of(iterator.next(), position));
- position++;
- }
- }
- return hoodieKeysAndPositions;
+ AtomicLong position = new AtomicLong(0);
+ return new CloseableMappingIterator<>(getHoodieKeyIterator(storage,
filePath, keyGeneratorOpt), key -> Pair.of(key, position.getAndIncrement()));
}
@Override
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
index b833748dc45..eb50cd43ea8 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.MetadataNotFoundException;
@@ -72,6 +73,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -174,7 +176,7 @@ public class ParquetUtils extends FileFormatUtils {
* @return {@link List} of pairs of {@link HoodieKey} and row position
fetched from the parquet file
*/
@Override
- public List<Pair<HoodieKey, Long>>
fetchRecordKeysWithPositions(HoodieStorage storage, StoragePath filePath) {
+ public ClosableIterator<Pair<HoodieKey, Long>>
fetchRecordKeysWithPositions(HoodieStorage storage, StoragePath filePath) {
return fetchRecordKeysWithPositions(storage, filePath, Option.empty());
}
@@ -223,16 +225,9 @@ public class ParquetUtils extends FileFormatUtils {
* @return {@link List} of pairs of {@link HoodieKey} and row position
fetched from the parquet file
*/
@Override
- public List<Pair<HoodieKey, Long>>
fetchRecordKeysWithPositions(HoodieStorage storage, StoragePath filePath,
Option<BaseKeyGenerator> keyGeneratorOpt) {
- List<Pair<HoodieKey, Long>> hoodieKeysAndPositions = new ArrayList<>();
- long position = 0;
- try (ClosableIterator<HoodieKey> iterator = getHoodieKeyIterator(storage,
filePath, keyGeneratorOpt)) {
- while (iterator.hasNext()) {
- hoodieKeysAndPositions.add(Pair.of(iterator.next(), position));
- position++;
- }
- return hoodieKeysAndPositions;
- }
+ public ClosableIterator<Pair<HoodieKey, Long>>
fetchRecordKeysWithPositions(HoodieStorage storage, StoragePath filePath,
Option<BaseKeyGenerator> keyGeneratorOpt) {
+ AtomicLong position = new AtomicLong(0);
+ return new CloseableMappingIterator<>(getHoodieKeyIterator(storage,
filePath, keyGeneratorOpt), key -> Pair.of(key, position.getAndIncrement()));
}
/**
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java
index 248350c7a82..9732602ea19 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.storage.StoragePath;
@@ -154,8 +155,13 @@ public class TestParquetUtils extends
HoodieCommonTestHarness {
writeParquetFile(typeCode, filePath, rowKeys, schema, true, partitionPath);
// Read and verify
- List<Pair<HoodieKey, Long>> fetchedRows =
parquetUtils.fetchRecordKeysWithPositions(
- HoodieTestUtils.getStorage(filePath), new StoragePath(filePath));
+ List<Pair<HoodieKey, Long>> fetchedRows = new ArrayList<>();
+ try (ClosableIterator<Pair<HoodieKey, Long>> iter =
parquetUtils.fetchRecordKeysWithPositions(
+ HoodieTestUtils.getStorage(filePath), new StoragePath(filePath))) {
+ while (iter.hasNext()) {
+ fetchedRows.add(iter.next());
+ }
+ }
assertEquals(rowKeys.size(), fetchedRows.size(), "Total count does not
match");
for (Pair<HoodieKey, Long> entry : fetchedRows) {
@@ -180,9 +186,14 @@ public class TestParquetUtils extends
HoodieCommonTestHarness {
false, "abc", "def");
// Read and verify
- List<Pair<HoodieKey, Long>> fetchedRows =
parquetUtils.fetchRecordKeysWithPositions(
+ List<Pair<HoodieKey, Long>> fetchedRows = new ArrayList<>();
+ try (ClosableIterator<Pair<HoodieKey, Long>> iter =
parquetUtils.fetchRecordKeysWithPositions(
HoodieTestUtils.getStorage(filePath), new StoragePath(filePath),
- Option.of(new TestBaseKeyGen("abc", "def")));
+ Option.of(new TestBaseKeyGen("abc", "def")))) {
+ while (iter.hasNext()) {
+ fetchedRows.add(iter.next());
+ }
+ }
assertEquals(rowKeys.size(), fetchedRows.size(), "Total count does not
match");
for (Pair<HoodieKey, Long> entry : fetchedRows) {