This is an automated email from the ASF dual-hosted git repository.

timbrown pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7510b1b243ec perf: Bloom filter improvements for memory usage (#18015)
7510b1b243ec is described below

commit 7510b1b243ecca7ee70421bbe5f2e1e1bd188cca
Author: Tim Brown <[email protected]>
AuthorDate: Fri Jan 30 10:14:43 2026 -0500

    perf: Bloom filter improvements for memory usage (#18015)
    
    * initial iter
    
    * remove redundant list
    
    * fix boundary case
    
    * address comments
---
 .../org/apache/hudi/index/HoodieIndexUtils.java    | 27 +++++++------
 .../index/bloom/HoodieBloomIndexCheckFunction.java | 45 +++++++++++-----------
 .../bloom/ListBasedHoodieBloomIndexHelper.java     |  8 ++--
 .../org/apache/hudi/io/HoodieKeyLookupHandle.java  | 11 +++---
 .../org/apache/hudi/io/HoodieKeyLookupResult.java  |  4 +-
 .../index/bloom/TestFlinkHoodieBloomIndex.java     |  9 +++--
 .../bloom/HoodieBloomFilterProbingResult.java      |  4 +-
 .../index/bloom/HoodieFileProbingFunction.java     | 42 ++++++++++++++++----
 .../HoodieMetadataBloomFilterProbingFunction.java  |  4 +-
 .../index/bloom/SparkHoodieBloomIndexHelper.java   | 12 +++---
 .../hudi/index/bloom/TestHoodieBloomIndex.java     | 10 ++---
 11 files changed, 101 insertions(+), 75 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
index 54b889b478c6..39e7369fd312 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
@@ -84,12 +84,12 @@ import lombok.extern.slf4j.Slf4j;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 import static java.util.stream.Collectors.toList;
 import static 
org.apache.hudi.common.config.HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP;
@@ -276,28 +276,27 @@ public class HoodieIndexUtils {
    * @param storage
    * @return List of pairs of candidate keys and positions that are available 
in the file
    */
-  public static List<Pair<String, Long>> filterKeysFromFile(StoragePath 
filePath,
-                                                            List<String> 
candidateRecordKeys,
-                                                            HoodieStorage 
storage) throws HoodieIndexException {
+  public static Collection<Pair<String, Long>> filterKeysFromFile(StoragePath 
filePath,
+                                                                  Set<String> 
candidateRecordKeys,
+                                                                  
HoodieStorage storage) throws HoodieIndexException {
     checkArgument(FSUtils.isBaseFile(filePath));
-    List<Pair<String, Long>> foundRecordKeys = new ArrayList<>();
+    if (candidateRecordKeys.isEmpty()) {
+      return Collections.emptyList();
+    }
     log.info("Going to filter {} keys from file {}", 
candidateRecordKeys.size(), filePath);
     try (HoodieFileReader fileReader = HoodieIOFactory.getIOFactory(storage)
         .getReaderFactory(HoodieRecordType.AVRO)
         .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, filePath)) {
       // Load all rowKeys from the file, to double-confirm
-      if (!candidateRecordKeys.isEmpty()) {
-        HoodieTimer timer = HoodieTimer.start();
-        Set<Pair<String, Long>> fileRowKeys = 
fileReader.filterRowKeys(candidateRecordKeys.stream().collect(Collectors.toSet()));
-        foundRecordKeys.addAll(fileRowKeys);
-        log.info("Checked keys against file {}, in {} ms. #candidates ({}) 
#found ({})", filePath,
-            timer.endTimer(), candidateRecordKeys.size(), 
foundRecordKeys.size());
-        log.debug("Keys matching for file {} => {}", filePath, 
foundRecordKeys);
-      }
+      HoodieTimer timer = HoodieTimer.start();
+      Set<Pair<String, Long>> fileRowKeys = 
fileReader.filterRowKeys(candidateRecordKeys);
+      log.info("Checked keys against file {}, in {} ms. #candidates ({}) 
#found ({})", filePath,
+          timer.endTimer(), candidateRecordKeys.size(), fileRowKeys.size());
+      log.debug("Keys matching for file {} => {}", filePath, fileRowKeys);
+      return fileRowKeys;
     } catch (Exception e) {
       throw new HoodieIndexException("Error checking candidate keys against 
file.", e);
     }
-    return foundRecordKeys;
   }
 
   /**
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndexCheckFunction.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndexCheckFunction.java
index 52b504e9ab16..9f8b51563a65 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndexCheckFunction.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndexCheckFunction.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.index.bloom;
 
-import org.apache.hudi.client.utils.LazyIterableIterator;
 import org.apache.hudi.common.function.SerializableFunction;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.util.collection.Pair;
@@ -30,20 +29,18 @@ import org.apache.hudi.io.HoodieKeyLookupResult;
 import org.apache.hudi.table.HoodieTable;
 
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.List;
 import java.util.function.Function;
 
 /**
  * Function accepting a tuple of {@link HoodieFileGroupId} and a record key 
and producing
- * a list of {@link HoodieKeyLookupResult} for every file identified by the 
file-group ids
+ * {@link HoodieKeyLookupResult} for every file identified by the file-group 
ids
  *
  * @param <I> type of the tuple of {@code (HoodieFileGroupId, <record-key>)}. 
Note that this is
  *           parameterized as generic such that this code could be reused for 
Spark as well
  */
-public class HoodieBloomIndexCheckFunction<I> 
-    implements Function<Iterator<I>, Iterator<List<HoodieKeyLookupResult>>>, 
Serializable {
+public class HoodieBloomIndexCheckFunction<I>
+    implements Function<Iterator<I>, Iterator<HoodieKeyLookupResult>>, 
Serializable {
 
   private final HoodieTable hoodieTable;
 
@@ -63,26 +60,30 @@ public class HoodieBloomIndexCheckFunction<I>
   }
 
   @Override
-  public Iterator<List<HoodieKeyLookupResult>> apply(Iterator<I> 
fileGroupIdRecordKeyPairIterator) {
+  public Iterator<HoodieKeyLookupResult> apply(Iterator<I> 
fileGroupIdRecordKeyPairIterator) {
     return new LazyKeyCheckIterator(fileGroupIdRecordKeyPairIterator);
   }
 
-  protected class LazyKeyCheckIterator extends LazyIterableIterator<I, 
List<HoodieKeyLookupResult>> {
+  protected class LazyKeyCheckIterator implements 
Iterator<HoodieKeyLookupResult> {
 
-    private HoodieKeyLookupHandle keyLookupHandle;
+    private final Iterator<I> filePartitionRecordKeyPairItr;
+    private HoodieKeyLookupHandle keyLookupHandle = null;
 
-    LazyKeyCheckIterator(Iterator<I> filePartitionRecordKeyTripletItr) {
-      super(filePartitionRecordKeyTripletItr);
+    LazyKeyCheckIterator(Iterator<I> filePartitionRecordKeyPairItr) {
+      this.filePartitionRecordKeyPairItr = filePartitionRecordKeyPairItr;
     }
 
     @Override
-    protected List<HoodieKeyLookupResult> computeNext() {
+    public boolean hasNext() {
+      return keyLookupHandle != null || 
filePartitionRecordKeyPairItr.hasNext();
+    }
 
-      List<HoodieKeyLookupResult> ret = new ArrayList<>();
+    @Override
+    public HoodieKeyLookupResult next() {
       try {
         // process one file in each go.
-        while (inputItr.hasNext()) {
-          I tuple = inputItr.next();
+        while (filePartitionRecordKeyPairItr.hasNext()) {
+          I tuple = filePartitionRecordKeyPairItr.next();
 
           HoodieFileGroupId fileGroupId = fileGroupIdExtractor.apply(tuple);
           String recordKey = recordKeyExtractor.apply(tuple);
@@ -102,17 +103,17 @@ public class HoodieBloomIndexCheckFunction<I>
             keyLookupHandle.addKey(recordKey);
           } else {
             // do the actual checking of file & break out
-            ret.add(keyLookupHandle.getLookupResult());
+            HoodieKeyLookupResult result = keyLookupHandle.getLookupResult();
             keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, 
partitionPathFilePair);
             keyLookupHandle.addKey(recordKey);
-            break;
+            return result;
           }
         }
 
-        // handle case, where we ran out of input, close pending work, update 
return val
-        if (!inputItr.hasNext()) {
-          ret.add(keyLookupHandle.getLookupResult());
-        }
+        // handle case, where we ran out of input, close pending work, return 
result
+        HoodieKeyLookupResult result = keyLookupHandle.getLookupResult();
+        keyLookupHandle = null;
+        return result;
       } catch (Throwable e) {
         if (e instanceof HoodieException) {
           throw (HoodieException) e;
@@ -120,8 +121,6 @@ public class HoodieBloomIndexCheckFunction<I>
 
         throw new HoodieIndexException("Error checking bloom filter index. ", 
e);
       }
-
-      return ret;
     }
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java
index cd539036794a..c6f99f7e04b0 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java
@@ -34,7 +34,6 @@ import org.apache.hudi.table.HoodieTable;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 
-import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
@@ -67,10 +66,9 @@ public class ListBasedHoodieBloomIndexHelper extends 
BaseHoodieBloomIndexHelper
         CollectionUtils.toStream(
             new HoodieBloomIndexCheckFunction<Pair<HoodieFileGroupId, 
String>>(hoodieTable, config, Pair::getLeft, Pair::getRight)
                 .apply(fileComparisonPairList.iterator())
-            )
-            .flatMap(Collection::stream)
-            .filter(lr -> lr.getMatchingRecordKeysAndPositions().size() > 0)
-            .collect(toList());
+        )
+        .filter(lr -> !lr.getMatchingRecordKeysAndPositions().isEmpty())
+        .collect(toList());
 
     return context.parallelize(keyLookupResults).flatMap(lookupResult ->
         lookupResult.getMatchingRecordKeysAndPositions().stream()
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
index e16d6f118e7d..be860d89d887 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
@@ -31,8 +31,9 @@ import org.apache.hudi.table.HoodieTable;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
 
 import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS;
 
@@ -43,13 +44,13 @@ import static 
org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS;
 public class HoodieKeyLookupHandle<T, I, K, O> extends HoodieReadHandle<T, I, 
K, O> {
 
   private final BloomFilter bloomFilter;
-  private final List<String> candidateRecordKeys;
+  private final Set<String> candidateRecordKeys;
   private long totalKeysChecked;
 
   public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K, 
O> hoodieTable,
                                Pair<String, String> partitionPathFileIDPair) {
     super(config, hoodieTable, partitionPathFileIDPair);
-    this.candidateRecordKeys = new ArrayList<>();
+    this.candidateRecordKeys = new HashSet<>();
     this.totalKeysChecked = 0;
     this.bloomFilter = getBloomFilter();
   }
@@ -94,7 +95,7 @@ public class HoodieKeyLookupHandle<T, I, K, O> extends 
HoodieReadHandle<T, I, K,
     log.debug("#The candidate row keys for {} => {}", partitionPathFileIDPair, 
candidateRecordKeys);
 
     HoodieBaseFile baseFile = getLatestBaseFile();
-    List<Pair<String, Long>> matchingKeysAndPositions = 
HoodieIndexUtils.filterKeysFromFile(
+    Collection<Pair<String, Long>> matchingKeysAndPositions = 
HoodieIndexUtils.filterKeysFromFile(
         baseFile.getStoragePath(), candidateRecordKeys, 
hoodieTable.getStorage());
     log.info("Total records ({}), bloom filter candidates ({})/fp({}), actual 
matches ({})", totalKeysChecked,
             candidateRecordKeys.size(), candidateRecordKeys.size() - 
matchingKeysAndPositions.size(), matchingKeysAndPositions.size());
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupResult.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupResult.java
index 12ede1c2b525..16849e44e8de 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupResult.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupResult.java
@@ -23,7 +23,7 @@ import org.apache.hudi.common.util.collection.Pair;
 import lombok.AllArgsConstructor;
 import lombok.Getter;
 
-import java.util.List;
+import java.util.Collection;
 
 /**
  * Encapsulates the result from a key lookup.
@@ -35,6 +35,6 @@ public class HoodieKeyLookupResult {
   private final String fileId;
   private final String partitionPath;
   private final String baseInstantTime;
-  private final List<Pair<String, Long>> matchingRecordKeysAndPositions;
+  private final Collection<Pair<String, Long>> matchingRecordKeysAndPositions;
 }
 
diff --git 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
index 66c87b3ff507..f753f7052aa2 100644
--- 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
+++ 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
@@ -48,10 +48,12 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Stream;
 
 import static java.util.Arrays.asList;
@@ -216,11 +218,10 @@ public class TestFlinkHoodieBloomIndex extends 
HoodieFlinkClientTestHarness {
     assertFalse(filter.mightContain(record4.getRecordKey()));
 
     // Compare with file
-    List<String> uuids = asList(record1.getRecordKey(), 
record2.getRecordKey(), record3.getRecordKey(), record4.getRecordKey());
+    Set<String> uuids = Set.of(record1.getRecordKey(), record2.getRecordKey(), 
record3.getRecordKey(), record4.getRecordKey());
 
-    HoodieWriteConfig config = 
HoodieWriteConfig.newBuilder().withPath(basePath).build();
-    List<Pair<String, Long>> results = HoodieIndexUtils.filterKeysFromFile(
-        new StoragePath(java.nio.file.Paths.get(basePath, partition, 
filename).toString()), uuids, metaClient.getStorage());
+    List<Pair<String, Long>> results = new 
ArrayList<>(HoodieIndexUtils.filterKeysFromFile(
+        new StoragePath(Paths.get(basePath, partition, filename).toString()), 
uuids, metaClient.getStorage()));
     assertEquals(results.size(), 2);
     
assertTrue(results.get(0).getLeft().equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0")
         || 
results.get(1).getLeft().equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0"));
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomFilterProbingResult.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomFilterProbingResult.java
index 91363e7dd037..195fe5c29f29 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomFilterProbingResult.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomFilterProbingResult.java
@@ -21,10 +21,10 @@ package org.apache.hudi.index.bloom;
 
 import lombok.Value;
 
-import java.util.List;
+import java.util.Set;
 
 @Value
 class HoodieBloomFilterProbingResult {
 
-  List<String> candidateKeys;
+  Set<String> candidateKeys;
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java
index 563e4dac2de5..bafeffdecce1 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java
@@ -35,11 +35,13 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.broadcast.Broadcast;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import scala.Tuple2;
@@ -51,7 +53,7 @@ import scala.Tuple2;
  */
 @Slf4j
 public class HoodieFileProbingFunction implements
-    FlatMapFunction<Iterator<Tuple2<HoodieFileGroupId, 
HoodieBloomFilterProbingResult>>, List<HoodieKeyLookupResult>> {
+    FlatMapFunction<Iterator<Tuple2<HoodieFileGroupId, 
HoodieBloomFilterProbingResult>>, HoodieKeyLookupResult> {
 
   // Assuming each file bloom filter takes up 512K, sizing the max file count
   // per batch so that the total fetched bloom filters would not cross 128 MB.
@@ -67,19 +69,43 @@ public class HoodieFileProbingFunction implements
   }
 
   @Override
-  public Iterator<List<HoodieKeyLookupResult>> 
call(Iterator<Tuple2<HoodieFileGroupId, HoodieBloomFilterProbingResult>> 
tuple2Iterator) throws Exception {
+  public Iterator<HoodieKeyLookupResult> 
call(Iterator<Tuple2<HoodieFileGroupId, HoodieBloomFilterProbingResult>> 
tuple2Iterator) throws Exception {
     return new BloomIndexLazyKeyCheckIterator(tuple2Iterator);
   }
 
   private class BloomIndexLazyKeyCheckIterator
-      extends LazyIterableIterator<Tuple2<HoodieFileGroupId, 
HoodieBloomFilterProbingResult>, List<HoodieKeyLookupResult>> {
+      extends LazyIterableIterator<Tuple2<HoodieFileGroupId, 
HoodieBloomFilterProbingResult>, HoodieKeyLookupResult> {
+
+    private List<HoodieKeyLookupResult> currentBatch;
+    private int currentBatchIndex;
 
     public BloomIndexLazyKeyCheckIterator(Iterator<Tuple2<HoodieFileGroupId, 
HoodieBloomFilterProbingResult>> tuple2Iterator) {
       super(tuple2Iterator);
+      this.currentBatch = Collections.emptyList();
+      this.currentBatchIndex = 0;
     }
 
     @Override
-    protected List<HoodieKeyLookupResult> computeNext() {
+    protected HoodieKeyLookupResult computeNext() {
+      // If we have results left in the current batch, return next one
+      if (currentBatchIndex < currentBatch.size()) {
+        return currentBatch.get(currentBatchIndex++);
+      }
+
+      // Need to fetch next batch
+      currentBatch = fetchNextBatch();
+      currentBatchIndex = 0;
+
+      // If new batch is empty, we're done
+      if (currentBatch.isEmpty()) {
+        return null;
+      }
+
+      // Return first element from new batch
+      return currentBatch.get(currentBatchIndex++);
+    }
+
+    private List<HoodieKeyLookupResult> fetchNextBatch() {
       // Partition path and file name pair to list of keys
       final Map<Pair<String, HoodieBaseFile>, HoodieBloomFilterProbingResult> 
fileToLookupResults = new HashMap<>();
       final Map<String, HoodieBaseFile> fileIDBaseFileMap = new HashMap<>();
@@ -120,18 +146,18 @@ public class HoodieFileProbingFunction implements
             final String fileId = 
partitionPathFileNamePair.getRight().getFileId();
             ValidationUtils.checkState(!fileId.isEmpty());
 
-            List<String> candidateRecordKeys = 
bloomFilterKeyLookupResult.getCandidateKeys();
+            Set<String> candidateRecordKeys = 
bloomFilterKeyLookupResult.getCandidateKeys();
 
             // TODO add assertion that file is checked only once
 
             final HoodieBaseFile dataFile = fileIDBaseFileMap.get(fileId);
-            List<Pair<String, Long>> matchingKeysAndPositions = 
HoodieIndexUtils.filterKeysFromFile(
+            Collection<Pair<String, Long>> matchingKeysAndPositions = 
HoodieIndexUtils.filterKeysFromFile(
                 dataFile.getStoragePath(), candidateRecordKeys, 
HoodieStorageUtils.getStorage(dataFile.getStoragePath(), storageConf));
 
             log.debug(
-                String.format("Bloom filter candidates (%d) / false positives 
(%d), actual matches (%d)",
+                "Bloom filter candidates ({}) / false positives ({}), actual 
matches ({})",
                     candidateRecordKeys.size(), candidateRecordKeys.size() - 
matchingKeysAndPositions.size(),
-                    matchingKeysAndPositions.size()));
+                    matchingKeysAndPositions.size());
 
             return new HoodieKeyLookupResult(fileId, partitionPath, 
dataFile.getCommitTime(), matchingKeysAndPositions);
           })
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomFilterProbingFunction.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomFilterProbingFunction.java
index bdcc55db2a99..bbc8e63f1f94 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomFilterProbingFunction.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomFilterProbingFunction.java
@@ -39,9 +39,11 @@ import org.apache.spark.broadcast.Broadcast;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import scala.Tuple2;
@@ -138,7 +140,7 @@ public class HoodieMetadataBloomFilterProbingFunction 
implements
             }
             final BloomFilter fileBloomFilter = 
fileToBloomFilterMap.get(partitionPathFileNamePair);
 
-            List<String> candidateRecordKeys = new ArrayList<>();
+            Set<String> candidateRecordKeys = new HashSet<>();
             hoodieKeyList.forEach(hoodieKey -> {
               if (fileBloomFilter.mightContain(hoodieKey.getRecordKey())) {
                 candidateRecordKeys.add(hoodieKey.getRecordKey());
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
index fb8504267331..5367abcad391 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
@@ -100,7 +100,7 @@ public class SparkHoodieBloomIndexHelper extends 
BaseHoodieBloomIndexHelper {
     log.info("Input parallelism: {}, Index parallelism: {}", inputParallelism, 
targetParallelism);
 
     JavaPairRDD<HoodieFileGroupId, String> fileComparisonsRDD = 
HoodieJavaRDD.getJavaRDD(fileComparisonPairs);
-    JavaRDD<List<HoodieKeyLookupResult>> keyLookupResultRDD;
+    JavaRDD<HoodieKeyLookupResult> keyLookupResultRDD;
 
     if (config.getBloomIndexUseMetadata()
         && hoodieTable.getMetaClient().getTableConfig().getMetadataPartitions()
@@ -181,14 +181,14 @@ public class SparkHoodieBloomIndexHelper extends 
BaseHoodieBloomIndexHelper {
           .mapPartitions(new HoodieSparkBloomIndexCheckFunction(hoodieTable, 
config), true);
     }
 
-    return HoodieJavaPairRDD.of(keyLookupResultRDD.flatMap(List::iterator)
-        .filter(lr -> lr.getMatchingRecordKeysAndPositions().size() > 0)
+    return HoodieJavaPairRDD.of(keyLookupResultRDD
+        .filter(lr -> !lr.getMatchingRecordKeysAndPositions().isEmpty())
         .flatMapToPair(lookupResult -> 
lookupResult.getMatchingRecordKeysAndPositions().stream()
             .map(recordKeyAndPosition -> new Tuple2<>(
                 new HoodieKey(recordKeyAndPosition.getLeft(), 
lookupResult.getPartitionPath()),
                 new HoodieRecordLocation(lookupResult.getBaseInstantTime(), 
lookupResult.getFileId(),
                     recordKeyAndPosition.getRight())))
-            .collect(Collectors.toList()).iterator()));
+            .iterator()));
   }
 
   private static class FileGroupIdComparator implements 
Comparator<Tuple2<HoodieFileGroupId, String>>, Serializable {
@@ -326,7 +326,7 @@ public class SparkHoodieBloomIndexHelper extends 
BaseHoodieBloomIndexHelper {
   }
 
   public static class HoodieSparkBloomIndexCheckFunction extends 
HoodieBloomIndexCheckFunction<Tuple2<HoodieFileGroupId, String>>
-      implements FlatMapFunction<Iterator<Tuple2<HoodieFileGroupId, String>>, 
List<HoodieKeyLookupResult>> {
+      implements FlatMapFunction<Iterator<Tuple2<HoodieFileGroupId, String>>, 
HoodieKeyLookupResult> {
 
     public HoodieSparkBloomIndexCheckFunction(HoodieTable hoodieTable,
                                               HoodieWriteConfig config) {
@@ -334,7 +334,7 @@ public class SparkHoodieBloomIndexHelper extends 
BaseHoodieBloomIndexHelper {
     }
 
     @Override
-    public Iterator<List<HoodieKeyLookupResult>> 
call(Iterator<Tuple2<HoodieFileGroupId, String>> 
fileGroupIdRecordKeyPairIterator) {
+    public Iterator<HoodieKeyLookupResult> 
call(Iterator<Tuple2<HoodieFileGroupId, String>> 
fileGroupIdRecordKeyPairIterator) {
       TaskContext taskContext = TaskContext.get();
       log.info("HoodieSparkBloomIndexCheckFunction with stageId : {}, stage 
attempt no: {}, taskId : {}, task attempt no : {}, task attempt id : {} ",
           taskContext.stageId(), taskContext.stageAttemptNumber(), 
taskContext.partitionId(), taskContext.attemptNumber(),
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
index 3a17863e0ab4..995893e413cd 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
@@ -63,6 +63,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -311,12 +312,11 @@ public class TestHoodieBloomIndex extends 
TestHoodieMetadataBase {
     assertFalse(filter.mightContain(record4.getRecordKey()));
 
     // Compare with file
-    List<String> uuids =
-        Arrays.asList(record1.getRecordKey(), record2.getRecordKey(), 
record3.getRecordKey(), record4.getRecordKey());
+    Set<String> uuids =
+        Set.of(record1.getRecordKey(), record2.getRecordKey(), 
record3.getRecordKey(), record4.getRecordKey());
 
-    HoodieWriteConfig config = 
HoodieWriteConfig.newBuilder().withPath(basePath).build();
-    List<Pair<String, Long>> results = HoodieIndexUtils.filterKeysFromFile(
-        new StoragePath(Paths.get(basePath, partition, filename).toString()), 
uuids, storage);
+    List<Pair<String, Long>> results = new 
ArrayList<>(HoodieIndexUtils.filterKeysFromFile(
+        new StoragePath(Paths.get(basePath, partition, filename).toString()), 
uuids, storage));
 
     assertEquals(results.size(), 2);
     
assertTrue(results.get(0).getLeft().equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0")

Reply via email to