This is an automated email from the ASF dual-hosted git repository.
timbrown pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 7510b1b243ec perf: Bloom filter improvements for memory usage (#18015)
7510b1b243ec is described below
commit 7510b1b243ecca7ee70421bbe5f2e1e1bd188cca
Author: Tim Brown <[email protected]>
AuthorDate: Fri Jan 30 10:14:43 2026 -0500
perf: Bloom filter improvements for memory usage (#18015)
* initial iter
* remove redundant list
* fix boundary case
* address comments
---
.../org/apache/hudi/index/HoodieIndexUtils.java | 27 +++++++------
.../index/bloom/HoodieBloomIndexCheckFunction.java | 45 +++++++++++-----------
.../bloom/ListBasedHoodieBloomIndexHelper.java | 8 ++--
.../org/apache/hudi/io/HoodieKeyLookupHandle.java | 11 +++---
.../org/apache/hudi/io/HoodieKeyLookupResult.java | 4 +-
.../index/bloom/TestFlinkHoodieBloomIndex.java | 9 +++--
.../bloom/HoodieBloomFilterProbingResult.java | 4 +-
.../index/bloom/HoodieFileProbingFunction.java | 42 ++++++++++++++++----
.../HoodieMetadataBloomFilterProbingFunction.java | 4 +-
.../index/bloom/SparkHoodieBloomIndexHelper.java | 12 +++---
.../hudi/index/bloom/TestHoodieBloomIndex.java | 10 ++---
11 files changed, 101 insertions(+), 75 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
index 54b889b478c6..39e7369fd312 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
@@ -84,12 +84,12 @@ import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
-import java.util.stream.Collectors;
import static java.util.stream.Collectors.toList;
import static
org.apache.hudi.common.config.HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP;
@@ -276,28 +276,27 @@ public class HoodieIndexUtils {
* @param storage
* @return List of pairs of candidate keys and positions that are available
in the file
*/
- public static List<Pair<String, Long>> filterKeysFromFile(StoragePath
filePath,
- List<String>
candidateRecordKeys,
- HoodieStorage
storage) throws HoodieIndexException {
+ public static Collection<Pair<String, Long>> filterKeysFromFile(StoragePath
filePath,
+ Set<String>
candidateRecordKeys,
+
HoodieStorage storage) throws HoodieIndexException {
checkArgument(FSUtils.isBaseFile(filePath));
- List<Pair<String, Long>> foundRecordKeys = new ArrayList<>();
+ if (candidateRecordKeys.isEmpty()) {
+ return Collections.emptyList();
+ }
log.info("Going to filter {} keys from file {}",
candidateRecordKeys.size(), filePath);
try (HoodieFileReader fileReader = HoodieIOFactory.getIOFactory(storage)
.getReaderFactory(HoodieRecordType.AVRO)
.getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, filePath)) {
// Load all rowKeys from the file, to double-confirm
- if (!candidateRecordKeys.isEmpty()) {
- HoodieTimer timer = HoodieTimer.start();
- Set<Pair<String, Long>> fileRowKeys =
fileReader.filterRowKeys(candidateRecordKeys.stream().collect(Collectors.toSet()));
- foundRecordKeys.addAll(fileRowKeys);
- log.info("Checked keys against file {}, in {} ms. #candidates ({})
#found ({})", filePath,
- timer.endTimer(), candidateRecordKeys.size(),
foundRecordKeys.size());
- log.debug("Keys matching for file {} => {}", filePath,
foundRecordKeys);
- }
+ HoodieTimer timer = HoodieTimer.start();
+ Set<Pair<String, Long>> fileRowKeys =
fileReader.filterRowKeys(candidateRecordKeys);
+ log.info("Checked keys against file {}, in {} ms. #candidates ({})
#found ({})", filePath,
+ timer.endTimer(), candidateRecordKeys.size(), fileRowKeys.size());
+ log.debug("Keys matching for file {} => {}", filePath, fileRowKeys);
+ return fileRowKeys;
} catch (Exception e) {
throw new HoodieIndexException("Error checking candidate keys against
file.", e);
}
- return foundRecordKeys;
}
/**
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndexCheckFunction.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndexCheckFunction.java
index 52b504e9ab16..9f8b51563a65 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndexCheckFunction.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndexCheckFunction.java
@@ -18,7 +18,6 @@
package org.apache.hudi.index.bloom;
-import org.apache.hudi.client.utils.LazyIterableIterator;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.util.collection.Pair;
@@ -30,20 +29,18 @@ import org.apache.hudi.io.HoodieKeyLookupResult;
import org.apache.hudi.table.HoodieTable;
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.Iterator;
-import java.util.List;
import java.util.function.Function;
/**
* Function accepting a tuple of {@link HoodieFileGroupId} and a record key
and producing
- * a list of {@link HoodieKeyLookupResult} for every file identified by the
file-group ids
+ * {@link HoodieKeyLookupResult} for every file identified by the file-group
ids
*
* @param <I> type of the tuple of {@code (HoodieFileGroupId, <record-key>)}.
Note that this is
* parameterized as generic such that this code could be reused for
Spark as well
*/
-public class HoodieBloomIndexCheckFunction<I>
- implements Function<Iterator<I>, Iterator<List<HoodieKeyLookupResult>>>,
Serializable {
+public class HoodieBloomIndexCheckFunction<I>
+ implements Function<Iterator<I>, Iterator<HoodieKeyLookupResult>>,
Serializable {
private final HoodieTable hoodieTable;
@@ -63,26 +60,30 @@ public class HoodieBloomIndexCheckFunction<I>
}
@Override
- public Iterator<List<HoodieKeyLookupResult>> apply(Iterator<I>
fileGroupIdRecordKeyPairIterator) {
+ public Iterator<HoodieKeyLookupResult> apply(Iterator<I>
fileGroupIdRecordKeyPairIterator) {
return new LazyKeyCheckIterator(fileGroupIdRecordKeyPairIterator);
}
- protected class LazyKeyCheckIterator extends LazyIterableIterator<I,
List<HoodieKeyLookupResult>> {
+ protected class LazyKeyCheckIterator implements
Iterator<HoodieKeyLookupResult> {
- private HoodieKeyLookupHandle keyLookupHandle;
+ private final Iterator<I> filePartitionRecordKeyPairItr;
+ private HoodieKeyLookupHandle keyLookupHandle = null;
- LazyKeyCheckIterator(Iterator<I> filePartitionRecordKeyTripletItr) {
- super(filePartitionRecordKeyTripletItr);
+ LazyKeyCheckIterator(Iterator<I> filePartitionRecordKeyPairItr) {
+ this.filePartitionRecordKeyPairItr = filePartitionRecordKeyPairItr;
}
@Override
- protected List<HoodieKeyLookupResult> computeNext() {
+ public boolean hasNext() {
+ return keyLookupHandle != null ||
filePartitionRecordKeyPairItr.hasNext();
+ }
- List<HoodieKeyLookupResult> ret = new ArrayList<>();
+ @Override
+ public HoodieKeyLookupResult next() {
try {
// process one file in each go.
- while (inputItr.hasNext()) {
- I tuple = inputItr.next();
+ while (filePartitionRecordKeyPairItr.hasNext()) {
+ I tuple = filePartitionRecordKeyPairItr.next();
HoodieFileGroupId fileGroupId = fileGroupIdExtractor.apply(tuple);
String recordKey = recordKeyExtractor.apply(tuple);
@@ -102,17 +103,17 @@ public class HoodieBloomIndexCheckFunction<I>
keyLookupHandle.addKey(recordKey);
} else {
// do the actual checking of file & break out
- ret.add(keyLookupHandle.getLookupResult());
+ HoodieKeyLookupResult result = keyLookupHandle.getLookupResult();
keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable,
partitionPathFilePair);
keyLookupHandle.addKey(recordKey);
- break;
+ return result;
}
}
- // handle case, where we ran out of input, close pending work, update
return val
- if (!inputItr.hasNext()) {
- ret.add(keyLookupHandle.getLookupResult());
- }
+ // handle case, where we ran out of input, close pending work, return
result
+ HoodieKeyLookupResult result = keyLookupHandle.getLookupResult();
+ keyLookupHandle = null;
+ return result;
} catch (Throwable e) {
if (e instanceof HoodieException) {
throw (HoodieException) e;
@@ -120,8 +121,6 @@ public class HoodieBloomIndexCheckFunction<I>
throw new HoodieIndexException("Error checking bloom filter index. ",
e);
}
-
- return ret;
}
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java
index cd539036794a..c6f99f7e04b0 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java
@@ -34,7 +34,6 @@ import org.apache.hudi.table.HoodieTable;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
-import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@@ -67,10 +66,9 @@ public class ListBasedHoodieBloomIndexHelper extends
BaseHoodieBloomIndexHelper
CollectionUtils.toStream(
new HoodieBloomIndexCheckFunction<Pair<HoodieFileGroupId,
String>>(hoodieTable, config, Pair::getLeft, Pair::getRight)
.apply(fileComparisonPairList.iterator())
- )
- .flatMap(Collection::stream)
- .filter(lr -> lr.getMatchingRecordKeysAndPositions().size() > 0)
- .collect(toList());
+ )
+ .filter(lr -> !lr.getMatchingRecordKeysAndPositions().isEmpty())
+ .collect(toList());
return context.parallelize(keyLookupResults).flatMap(lookupResult ->
lookupResult.getMatchingRecordKeysAndPositions().stream()
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
index e16d6f118e7d..be860d89d887 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
@@ -31,8 +31,9 @@ import org.apache.hudi.table.HoodieTable;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS;
@@ -43,13 +44,13 @@ import static
org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS;
public class HoodieKeyLookupHandle<T, I, K, O> extends HoodieReadHandle<T, I,
K, O> {
private final BloomFilter bloomFilter;
- private final List<String> candidateRecordKeys;
+ private final Set<String> candidateRecordKeys;
private long totalKeysChecked;
public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K,
O> hoodieTable,
Pair<String, String> partitionPathFileIDPair) {
super(config, hoodieTable, partitionPathFileIDPair);
- this.candidateRecordKeys = new ArrayList<>();
+ this.candidateRecordKeys = new HashSet<>();
this.totalKeysChecked = 0;
this.bloomFilter = getBloomFilter();
}
@@ -94,7 +95,7 @@ public class HoodieKeyLookupHandle<T, I, K, O> extends
HoodieReadHandle<T, I, K,
log.debug("#The candidate row keys for {} => {}", partitionPathFileIDPair,
candidateRecordKeys);
HoodieBaseFile baseFile = getLatestBaseFile();
- List<Pair<String, Long>> matchingKeysAndPositions =
HoodieIndexUtils.filterKeysFromFile(
+ Collection<Pair<String, Long>> matchingKeysAndPositions =
HoodieIndexUtils.filterKeysFromFile(
baseFile.getStoragePath(), candidateRecordKeys,
hoodieTable.getStorage());
log.info("Total records ({}), bloom filter candidates ({})/fp({}), actual
matches ({})", totalKeysChecked,
candidateRecordKeys.size(), candidateRecordKeys.size() -
matchingKeysAndPositions.size(), matchingKeysAndPositions.size());
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupResult.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupResult.java
index 12ede1c2b525..16849e44e8de 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupResult.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupResult.java
@@ -23,7 +23,7 @@ import org.apache.hudi.common.util.collection.Pair;
import lombok.AllArgsConstructor;
import lombok.Getter;
-import java.util.List;
+import java.util.Collection;
/**
* Encapsulates the result from a key lookup.
@@ -35,6 +35,6 @@ public class HoodieKeyLookupResult {
private final String fileId;
private final String partitionPath;
private final String baseInstantTime;
- private final List<Pair<String, Long>> matchingRecordKeysAndPositions;
+ private final Collection<Pair<String, Long>> matchingRecordKeysAndPositions;
}
diff --git
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
index 66c87b3ff507..f753f7052aa2 100644
---
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
+++
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
@@ -48,10 +48,12 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Stream;
import static java.util.Arrays.asList;
@@ -216,11 +218,10 @@ public class TestFlinkHoodieBloomIndex extends
HoodieFlinkClientTestHarness {
assertFalse(filter.mightContain(record4.getRecordKey()));
// Compare with file
- List<String> uuids = asList(record1.getRecordKey(),
record2.getRecordKey(), record3.getRecordKey(), record4.getRecordKey());
+ Set<String> uuids = Set.of(record1.getRecordKey(), record2.getRecordKey(),
record3.getRecordKey(), record4.getRecordKey());
- HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath).build();
- List<Pair<String, Long>> results = HoodieIndexUtils.filterKeysFromFile(
- new StoragePath(java.nio.file.Paths.get(basePath, partition,
filename).toString()), uuids, metaClient.getStorage());
+ List<Pair<String, Long>> results = new
ArrayList<>(HoodieIndexUtils.filterKeysFromFile(
+ new StoragePath(Paths.get(basePath, partition, filename).toString()),
uuids, metaClient.getStorage()));
assertEquals(results.size(), 2);
assertTrue(results.get(0).getLeft().equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0")
||
results.get(1).getLeft().equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0"));
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomFilterProbingResult.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomFilterProbingResult.java
index 91363e7dd037..195fe5c29f29 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomFilterProbingResult.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomFilterProbingResult.java
@@ -21,10 +21,10 @@ package org.apache.hudi.index.bloom;
import lombok.Value;
-import java.util.List;
+import java.util.Set;
@Value
class HoodieBloomFilterProbingResult {
- List<String> candidateKeys;
+ Set<String> candidateKeys;
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java
index 563e4dac2de5..bafeffdecce1 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java
@@ -35,11 +35,13 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.broadcast.Broadcast;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
import scala.Tuple2;
@@ -51,7 +53,7 @@ import scala.Tuple2;
*/
@Slf4j
public class HoodieFileProbingFunction implements
- FlatMapFunction<Iterator<Tuple2<HoodieFileGroupId,
HoodieBloomFilterProbingResult>>, List<HoodieKeyLookupResult>> {
+ FlatMapFunction<Iterator<Tuple2<HoodieFileGroupId,
HoodieBloomFilterProbingResult>>, HoodieKeyLookupResult> {
// Assuming each file bloom filter takes up 512K, sizing the max file count
// per batch so that the total fetched bloom filters would not cross 128 MB.
@@ -67,19 +69,43 @@ public class HoodieFileProbingFunction implements
}
@Override
- public Iterator<List<HoodieKeyLookupResult>>
call(Iterator<Tuple2<HoodieFileGroupId, HoodieBloomFilterProbingResult>>
tuple2Iterator) throws Exception {
+ public Iterator<HoodieKeyLookupResult>
call(Iterator<Tuple2<HoodieFileGroupId, HoodieBloomFilterProbingResult>>
tuple2Iterator) throws Exception {
return new BloomIndexLazyKeyCheckIterator(tuple2Iterator);
}
private class BloomIndexLazyKeyCheckIterator
- extends LazyIterableIterator<Tuple2<HoodieFileGroupId,
HoodieBloomFilterProbingResult>, List<HoodieKeyLookupResult>> {
+ extends LazyIterableIterator<Tuple2<HoodieFileGroupId,
HoodieBloomFilterProbingResult>, HoodieKeyLookupResult> {
+
+ private List<HoodieKeyLookupResult> currentBatch;
+ private int currentBatchIndex;
public BloomIndexLazyKeyCheckIterator(Iterator<Tuple2<HoodieFileGroupId,
HoodieBloomFilterProbingResult>> tuple2Iterator) {
super(tuple2Iterator);
+ this.currentBatch = Collections.emptyList();
+ this.currentBatchIndex = 0;
}
@Override
- protected List<HoodieKeyLookupResult> computeNext() {
+ protected HoodieKeyLookupResult computeNext() {
+ // If we have results left in the current batch, return next one
+ if (currentBatchIndex < currentBatch.size()) {
+ return currentBatch.get(currentBatchIndex++);
+ }
+
+ // Need to fetch next batch
+ currentBatch = fetchNextBatch();
+ currentBatchIndex = 0;
+
+ // If new batch is empty, we're done
+ if (currentBatch.isEmpty()) {
+ return null;
+ }
+
+ // Return first element from new batch
+ return currentBatch.get(currentBatchIndex++);
+ }
+
+ private List<HoodieKeyLookupResult> fetchNextBatch() {
// Partition path and file name pair to list of keys
final Map<Pair<String, HoodieBaseFile>, HoodieBloomFilterProbingResult>
fileToLookupResults = new HashMap<>();
final Map<String, HoodieBaseFile> fileIDBaseFileMap = new HashMap<>();
@@ -120,18 +146,18 @@ public class HoodieFileProbingFunction implements
final String fileId =
partitionPathFileNamePair.getRight().getFileId();
ValidationUtils.checkState(!fileId.isEmpty());
- List<String> candidateRecordKeys =
bloomFilterKeyLookupResult.getCandidateKeys();
+ Set<String> candidateRecordKeys =
bloomFilterKeyLookupResult.getCandidateKeys();
// TODO add assertion that file is checked only once
final HoodieBaseFile dataFile = fileIDBaseFileMap.get(fileId);
- List<Pair<String, Long>> matchingKeysAndPositions =
HoodieIndexUtils.filterKeysFromFile(
+ Collection<Pair<String, Long>> matchingKeysAndPositions =
HoodieIndexUtils.filterKeysFromFile(
dataFile.getStoragePath(), candidateRecordKeys,
HoodieStorageUtils.getStorage(dataFile.getStoragePath(), storageConf));
log.debug(
- String.format("Bloom filter candidates (%d) / false positives
(%d), actual matches (%d)",
+ "Bloom filter candidates ({}) / false positives ({}), actual
matches ({})",
candidateRecordKeys.size(), candidateRecordKeys.size() -
matchingKeysAndPositions.size(),
- matchingKeysAndPositions.size()));
+ matchingKeysAndPositions.size());
return new HoodieKeyLookupResult(fileId, partitionPath,
dataFile.getCommitTime(), matchingKeysAndPositions);
})
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomFilterProbingFunction.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomFilterProbingFunction.java
index bdcc55db2a99..bbc8e63f1f94 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomFilterProbingFunction.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomFilterProbingFunction.java
@@ -39,9 +39,11 @@ import org.apache.spark.broadcast.Broadcast;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
import scala.Tuple2;
@@ -138,7 +140,7 @@ public class HoodieMetadataBloomFilterProbingFunction
implements
}
final BloomFilter fileBloomFilter =
fileToBloomFilterMap.get(partitionPathFileNamePair);
- List<String> candidateRecordKeys = new ArrayList<>();
+ Set<String> candidateRecordKeys = new HashSet<>();
hoodieKeyList.forEach(hoodieKey -> {
if (fileBloomFilter.mightContain(hoodieKey.getRecordKey())) {
candidateRecordKeys.add(hoodieKey.getRecordKey());
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
index fb8504267331..5367abcad391 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
@@ -100,7 +100,7 @@ public class SparkHoodieBloomIndexHelper extends
BaseHoodieBloomIndexHelper {
log.info("Input parallelism: {}, Index parallelism: {}", inputParallelism,
targetParallelism);
JavaPairRDD<HoodieFileGroupId, String> fileComparisonsRDD =
HoodieJavaRDD.getJavaRDD(fileComparisonPairs);
- JavaRDD<List<HoodieKeyLookupResult>> keyLookupResultRDD;
+ JavaRDD<HoodieKeyLookupResult> keyLookupResultRDD;
if (config.getBloomIndexUseMetadata()
&& hoodieTable.getMetaClient().getTableConfig().getMetadataPartitions()
@@ -181,14 +181,14 @@ public class SparkHoodieBloomIndexHelper extends
BaseHoodieBloomIndexHelper {
.mapPartitions(new HoodieSparkBloomIndexCheckFunction(hoodieTable,
config), true);
}
- return HoodieJavaPairRDD.of(keyLookupResultRDD.flatMap(List::iterator)
- .filter(lr -> lr.getMatchingRecordKeysAndPositions().size() > 0)
+ return HoodieJavaPairRDD.of(keyLookupResultRDD
+ .filter(lr -> !lr.getMatchingRecordKeysAndPositions().isEmpty())
.flatMapToPair(lookupResult ->
lookupResult.getMatchingRecordKeysAndPositions().stream()
.map(recordKeyAndPosition -> new Tuple2<>(
new HoodieKey(recordKeyAndPosition.getLeft(),
lookupResult.getPartitionPath()),
new HoodieRecordLocation(lookupResult.getBaseInstantTime(),
lookupResult.getFileId(),
recordKeyAndPosition.getRight())))
- .collect(Collectors.toList()).iterator()));
+ .iterator()));
}
private static class FileGroupIdComparator implements
Comparator<Tuple2<HoodieFileGroupId, String>>, Serializable {
@@ -326,7 +326,7 @@ public class SparkHoodieBloomIndexHelper extends
BaseHoodieBloomIndexHelper {
}
public static class HoodieSparkBloomIndexCheckFunction extends
HoodieBloomIndexCheckFunction<Tuple2<HoodieFileGroupId, String>>
- implements FlatMapFunction<Iterator<Tuple2<HoodieFileGroupId, String>>,
List<HoodieKeyLookupResult>> {
+ implements FlatMapFunction<Iterator<Tuple2<HoodieFileGroupId, String>>,
HoodieKeyLookupResult> {
public HoodieSparkBloomIndexCheckFunction(HoodieTable hoodieTable,
HoodieWriteConfig config) {
@@ -334,7 +334,7 @@ public class SparkHoodieBloomIndexHelper extends
BaseHoodieBloomIndexHelper {
}
@Override
- public Iterator<List<HoodieKeyLookupResult>>
call(Iterator<Tuple2<HoodieFileGroupId, String>>
fileGroupIdRecordKeyPairIterator) {
+ public Iterator<HoodieKeyLookupResult>
call(Iterator<Tuple2<HoodieFileGroupId, String>>
fileGroupIdRecordKeyPairIterator) {
TaskContext taskContext = TaskContext.get();
log.info("HoodieSparkBloomIndexCheckFunction with stageId : {}, stage
attempt no: {}, taskId : {}, task attempt no : {}, task attempt id : {} ",
taskContext.stageId(), taskContext.stageAttemptNumber(),
taskContext.partitionId(), taskContext.attemptNumber(),
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
index 3a17863e0ab4..995893e413cd 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
@@ -63,6 +63,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -311,12 +312,11 @@ public class TestHoodieBloomIndex extends
TestHoodieMetadataBase {
assertFalse(filter.mightContain(record4.getRecordKey()));
// Compare with file
- List<String> uuids =
- Arrays.asList(record1.getRecordKey(), record2.getRecordKey(),
record3.getRecordKey(), record4.getRecordKey());
+ Set<String> uuids =
+ Set.of(record1.getRecordKey(), record2.getRecordKey(),
record3.getRecordKey(), record4.getRecordKey());
- HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath).build();
- List<Pair<String, Long>> results = HoodieIndexUtils.filterKeysFromFile(
- new StoragePath(Paths.get(basePath, partition, filename).toString()),
uuids, storage);
+ List<Pair<String, Long>> results = new
ArrayList<>(HoodieIndexUtils.filterKeysFromFile(
+ new StoragePath(Paths.get(basePath, partition, filename).toString()),
uuids, storage));
assertEquals(results.size(), 2);
assertTrue(results.get(0).getLeft().equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0")