This is an automated email from the ASF dual-hosted git repository.

vhs pushed a commit to branch rfc-blob-cleaner
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 3b696b33b308ab9030e25ce3801b2d0ef3a17541
Author: voon <[email protected]>
AuthorDate: Fri Mar 20 15:54:30 2026 +0800

    Update rfc-100 with batch search instead of random sequential search
---
 rfc/rfc-100/rfc-100-blob-cleaner-design.md |  87 ++++++-----
 rfc/rfc-100/rfc-100-blob-cleaner.md        | 225 ++++++++++++++++-------------
 2 files changed, 173 insertions(+), 139 deletions(-)

diff --git a/rfc/rfc-100/rfc-100-blob-cleaner-design.md 
b/rfc/rfc-100/rfc-100-blob-cleaner-design.md
index ddb895859ba4..a7f80e14bcad 100644
--- a/rfc/rfc-100/rfc-100-blob-cleaner-design.md
+++ b/rfc/rfc-100/rfc-100-blob-cleaner-design.md
@@ -307,8 +307,13 @@ path_to_record_keys = 
mdtMetadata.readSecondaryIndexDataTableRecordKeysWithKeys(
     HoodieListData.eager(candidate_paths), indexPartitionName)
     .groupBy(pair -> pair.getKey())
 
-// Step 2: For each candidate, resolve record keys to file group IDs
-// Short-circuit: stop as soon as ONE record key lives in a non-cleaned FG
+// Step 2: Batch record index lookup -- ONE call for ALL record keys
+// Sorts keys internally, single sequential forward-scan through HFile.
+all_record_keys = path_to_record_keys.values().flatMap()
+all_locations = mdtMetadata.readRecordIndexLocations(
+    HoodieListData.eager(all_record_keys))             // -> Map<recordKey, 
(partition, fileId)>
+
+// Step 3: In-memory resolution with short-circuit per candidate
 for path in candidate_paths:
     record_keys = path_to_record_keys.getOrDefault(path, [])
 
@@ -318,24 +323,25 @@ for path in candidate_paths:
 
     found_live_reference = false
     for rk in record_keys:
-        location = mdtMetadata.readRecordIndex(rk)         // -> (partition, 
fileId)
-        if location.fileId NOT in cleaned_fg_ids:
+        location = all_locations.get(rk)
+        if location != null and location.fileId NOT in cleaned_fg_ids:
             found_live_reference = true
-            break                                           // short-circuit: 
retain
+            break                                           // short-circuit 
(in-memory)
 
     if not found_live_reference:
         external_deletes.addAll(candidates for this path)   // all refs in 
cleaned FGs
 ```
 
-**Cost model.** The lookup is a two-hop process: (1) prefix scan on the 
secondary index returning
-record keys, (2) record index lookup per record key to resolve to a file group 
ID. The short-circuit
-optimization makes the common case fast:
+**Cost model.** The lookup is a three-step process: (1) batched prefix scan on 
the secondary index,
+(2) batched record index lookup for all record keys in a single sorted HFile 
scan, (3) in-memory
+resolution with per-candidate short-circuit. Steps 1 and 2 are each a single 
I/O pass; step 3 is
+pure in-memory hash set lookups (~0ms).
 
-| Scenario                  | Step 1                | Step 2 per candidate     
| Total        |
-|---------------------------|-----------------------|--------------------------|--------------|
-| Blob still live (common)  | 1 prefix scan         | 1-3 record index lookups 
| O(N)         |
-| Blob truly orphaned       | 1 prefix scan         | R_i record index lookups 
| O(N * R_avg) |
-| Blob has 0 refs (deleted) | 1 prefix scan (empty) | 0                        
| O(N)         |
+| Step                        | I/O                                       | 
Cost                     |
+|-----------------------------|-----------------------------------------|--------------------------|
+| 1. Prefix scan (batched)    | Single MDT call for N candidate paths    | 
~2-5s for 2K candidates  |
+| 2. Record index (batched)   | Single sorted HFile forward-scan         | 
~1-2s for 6K record keys |
+| 3. In-memory resolution     | Hash set checks (cleaned_fg_ids)         | 
~0ms                     |
 
 **Index definition.** Uses the existing `HoodieIndexDefinition` mechanism with
 `sourceFields = ["<blob_col>", "reference", "external_path"]`. The nested 
field path is supported
@@ -368,21 +374,25 @@ sequenceDiagram
     participant SI as MDT Secondary Index
     participant RI as MDT Record Index
 
-    C->>SI: Batched prefix scan<br/>candidate_paths (N paths)
-    SI-->>C: Map<path, List<recordKey>>
+    Note over C: Step 1: Batch prefix scan
+    C->>SI: All candidate paths (N paths, single call)
+    SI-->>C: Map&lt;path, List&lt;recordKey&gt;&gt;
+
+    Note over C: Step 2: Batch record index lookup
+    C->>C: Collect all record keys from all candidates
+    C->>RI: readRecordIndexLocations(all record keys)
+    Note over RI: Sort keys → single sequential<br/>forward-scan through HFile
+    RI-->>C: Map&lt;recordKey, (partition, fileId)&gt;
 
+    Note over C: Step 3: In-memory resolution
     loop For each candidate path
-        alt No record keys returned
+        alt No record keys for this path
             Note right of C: Globally orphaned → DELETE
         else Has record keys
-            loop For each record key (short-circuit)
-                C->>RI: Lookup record key
-                RI-->>C: (partition, fileId)
-                alt fileId NOT in cleaned_fg_ids
-                    Note right of C: Live reference found<br/>SHORT-CIRCUIT → 
RETAIN
-                end
-            end
-            alt All record keys in cleaned FGs
+            C->>C: Check each location.fileId<br/>against cleaned_fg_ids 
(in-memory)
+            alt Any fileId NOT in cleaned_fg_ids
+                Note right of C: Live reference → RETAIN
+            else All in cleaned FGs
                 Note right of C: Globally orphaned → DELETE
             end
         end
@@ -657,21 +667,20 @@ cleaning:
 
 ### Back-of-Envelope: Example 7 (50K FGs, 2K External Candidates)
 
-| Parameter                               | Value    | Notes                   
                |
-|-----------------------------------------|----------|-----------------------------------------|
-| FGs cleaned this cycle                  | 500      | 1% of table             
                |
-| Stage 1: reads per FG                   | ~6       | 3 retained + 3 expired 
slices           |
-| Stage 1: total reads                    | 3,000    | Parallelized across 
executors, ~20s     |
-| External blob candidates                | 2,000    | Locally orphaned in 
cleaned FGs         |
-| Avg refs per candidate                  | 3        | Typical: video in a few 
playlists       |
-| **Common case (most blobs still live)** |          |                         
                |
-| Stage 2: prefix scans (batched)         | 2,000    | One MDT call, ~5s       
                |
-| Stage 2: record index hops              | ~2,000   | Short-circuit: 1 per 
candidate, ~5s     |
-| **Total Stage 2**                       | **~10s** |                         
                |
-| **Worst case (all blobs orphaned)**     |          |                         
                |
-| Stage 2: record index hops              | ~6,000   | No short-circuit, ~15s  
                |
-| **Total Stage 2**                       | **~20s** |                         
                |
-| Comparison: naive full-table scan       | 12.5TB   | 50K FGs * 5 slices * 
50MB = prohibitive |
+| Parameter                               | Value     | Notes                  
                        |
+|-----------------------------------------|-----------|-------------------------------------------------|
+| FGs cleaned this cycle                  | 500       | 1% of table            
                        |
+| Stage 1: reads per FG                   | ~6        | 3 retained + 3 expired 
slices                  |
+| Stage 1: total reads                    | 3,000     | Parallelized across 
executors, ~20s            |
+| External blob candidates                | 2,000     | Locally orphaned in 
cleaned FGs                |
+| Avg refs per candidate                  | 3         | Typical: video in a 
few playlists              |
+| Total record keys                       | 6,000     | 2,000 * 3              
                       |
+| **Stage 2 cost**                        |           |                        
                        |
+| Step 1: batched prefix scan             | 1 call    | Returns 6K record 
keys, ~2-5s                  |
+| Step 2: batched record index lookup     | 1 call    | 6K keys sorted, single 
HFile scan, ~1-2s       |
+| Step 3: in-memory resolution            | 6K checks | Hash set lookups 
against cleaned_fg_ids, ~0ms  |
+| **Total Stage 2**                       | **~3-7s** |                        
                        |
+| Comparison: naive full-table scan       | 12.5TB    | 50K FGs * 5 slices * 
50MB = prohibitive        |
 
 ### Memory Budget
 
diff --git a/rfc/rfc-100/rfc-100-blob-cleaner.md 
b/rfc/rfc-100/rfc-100-blob-cleaner.md
index 4e41b596a1aa..1392333a08d1 100644
--- a/rfc/rfc-100/rfc-100-blob-cleaner.md
+++ b/rfc/rfc-100/rfc-100-blob-cleaner.md
@@ -274,9 +274,21 @@ path_to_record_keys: Map<String, List<String>> =
     .collectAsList()
     .groupBy(pair -> pair.getKey())  // group by external_path
 
-// Step 2: For each candidate path, resolve record keys to file group IDs
-// via the record index. Use short-circuit: stop as soon as ONE record key
-// resolves to a non-cleaned file group.
+// Step 2: Batch record index lookup for ALL record keys across ALL candidates.
+// readRecordIndexLocations sorts keys internally and performs a single
+// sequential forward-scan through the record index HFile. This is vastly
+// more efficient than per-key random lookups.
+all_record_keys = path_to_record_keys.values().stream()
+    .flatMap(List::stream).collect(toList())
+
+all_locations: Map<String, HoodieRecordGlobalLocation> =
+    mdtMetadata.readRecordIndexLocations(
+        HoodieListData.eager(all_record_keys))
+    .collectAsList()
+    .toMap(pair -> pair.getKey(), pair -> pair.getValue())
+
+// Step 3: In-memory resolution with per-candidate short-circuit.
+// All I/O is complete -- this is pure hash set lookups (~0ms).
 external_deletes = List<BlobRef>()
 
 for path in candidate_paths:
@@ -287,14 +299,12 @@ for path in candidate_paths:
         external_deletes.addAll(external_candidates.filter(r -> r.path == 
path))
         continue
 
-    // Short-circuit: check record keys one at a time via record index.
-    // Stop as soon as we find one that lives in a non-cleaned FG.
     found_live_reference = false
     for rk in record_keys:
-        location = mdtMetadata.readRecordIndex(rk)  // -> (partition, fileId, 
instant)
-        if location.fileId NOT in cleaned_fg_ids:
+        location = all_locations.get(rk)
+        if location != null and location.fileId NOT in cleaned_fg_ids:
             found_live_reference = true
-            break                                    // short-circuit: retain 
this blob
+            break                                    // short-circuit 
(in-memory)
 
     if not found_live_reference:
         // All references are in cleaned FGs -> globally orphaned
@@ -311,64 +321,64 @@ and scans all matching entries in the MDT's HFile/Parquet 
structure. Each matchi
 record key. Then, the record index is consulted to resolve each record key to a
 `HoodieRecordGlobalLocation` containing the `(partition, fileId, instantTime)`.
 
-This two-hop resolution means the cost per candidate is NOT O(1) -- it is 
proportional to the
-number of records referencing that candidate's path.
+The resolution is a three-step pipeline with two batched I/O passes and one 
in-memory pass:
 
-**Cost model (corrected).** Let:
+**Cost model.** Let:
 - N = number of distinct candidate paths
-- R_i = number of records referencing candidate path i
-- R_avg = average references per candidate
-
-The worst-case cost is:
+- R_total = total record keys across all candidates = sum(R_i) = N * R_avg
 
 ```
-Step 1 (prefix scans):  N prefix scans on MDT secondary index partition.
-                        Cost: O(N) I/O rounds, each scanning R_i entries.
-                        Total entries scanned: sum(R_i) = N * R_avg.
-
-Step 2 (record index):  For each candidate, resolve record keys to file groups.
-                        With short-circuit: stop after first non-cleaned FG 
hit.
-                        Best case: 1 record index lookup per candidate (common 
case).
-                        Worst case: R_i lookups if ALL refs are in cleaned FGs.
+Step 1 (prefix scans):    Batched into a single call to
+                          readSecondaryIndexDataTableRecordKeysWithKeys().
+                          Cost: 1 I/O pass over the secondary index partition.
+                          Returns R_total record keys grouped by candidate 
path.
+
+Step 2 (record index):    ALL R_total record keys batched into a single call to
+                          readRecordIndexLocations(). The API sorts keys 
internally
+                          and performs a single sequential forward-scan 
through the
+                          record index HFile. No random seeks.
+                          Cost: 1 I/O pass (sequential scan).
+
+Step 3 (in-memory):       For each candidate, check its record keys' resolved
+                          locations against cleaned_fg_ids (hash set).
+                          Short-circuit: stop at first non-cleaned FG hit.
+                          Cost: ~0ms (pure in-memory).
 ```
 
-The **short-circuit optimization** is critical. For the common case where a 
shared blob is still
-referenced by at least one non-cleaned FG, Step 2 terminates after checking a 
small number of
-record keys -- typically 1-3 lookups. The worst case (all R_i references are 
in cleaned FGs) is
-rare: it means every single FG that references this blob is being cleaned 
simultaneously.
-
-| Scenario                          | Step 1 cost      | Step 2 cost per 
candidate | Total            |
-|-----------------------------------|------------------|---------------------------|------------------|
-| Blob still live (common)          | 1 prefix scan    | 1-3 record index 
lookups  | O(N)             |
-| Blob truly orphaned (all refs cleaned) | 1 prefix scan | R_i record index 
lookups | O(N * R_avg)     |
-| Blob has 0 refs (already deleted) | 1 prefix scan (empty) | 0                
    | O(N)             |
+| Step | I/O pattern              | Cost                                       
  |
+|------|--------------------------|----------------------------------------------|
+| 1    | 1 batched MDT call       | O(N) prefix scans, returns R_total entries 
  |
+| 2    | 1 batched MDT call       | Sorted forward-scan for R_total keys       
  |
+| 3    | In-memory hash set       | O(R_total) worst case, short-circuit 
typical |
 
-**Batching.** All candidate paths are batched into a single call to
-`readSecondaryIndexDataTableRecordKeysWithKeys(HoodieData<String>, 
partitionName)`. This allows the
-MDT reader to optimize I/O across multiple prefix scans (e.g., coalesce reads 
from the same HFile
-block). The API already accepts `HoodieData<String>` for batched input.
+**Batching.** Both Step 1 and Step 2 use batch APIs that accept 
`HoodieData<String>`. Step 1 uses
+`readSecondaryIndexDataTableRecordKeysWithKeys()`. Step 2 uses 
`readRecordIndexLocations()`. Each
+performs a single sorted scan through the respective HFile partition, 
coalescing reads from
+adjacent blocks. No per-key random I/O.
 
 **Back-of-envelope for Example 7 (revised):**
 
-| Parameter                       | Value        | Notes                       
                 |
-|---------------------------------|--------------|----------------------------------------------|
-| Candidate paths (N)             | 2,000        | Locally orphaned in 500 
cleaned FGs          |
-| Avg refs per candidate (R_avg)  | 3            | Typical: video in a few 
playlists            |
-| Popular blob refs (worst case)  | 10,000       | A viral video in many 
playlists              |
-| **Scenario: most blobs still live** |           |                            
                  |
-| Step 1: prefix scans            | 2,000        | Batched into one MDT call, 
~5s               |
-| Step 2: record index lookups    | ~2,000       | 1 per candidate 
(short-circuit), ~5s         |
-| **Total**                       | **~10s**     | Dominated by MDT I/O        
                 |
-| **Scenario: all blobs orphaned**|              |                             
                 |
-| Step 1: prefix scans            | 2,000        | Returns 6,000 record keys 
total, ~5s         |
-| Step 2: record index lookups    | 6,000        | All checked (no 
short-circuit), ~15s         |
-| **Total**                       | **~20s**     | Acceptable                  
                 |
-| **Worst case: one viral blob**  |              |                             
                 |
-| Step 1: prefix scan for 1 blob  | 1            | Returns 10,000 record keys, 
~2s              |
-| Step 2: record index lookups    | 10,000       | If all in cleaned FGs 
(unlikely), ~25s       |
-| **Total for that blob**         | **~27s**     | One blob dominates; others 
are O(1)          |
-
-Even in the worst case, this is far cheaper than the fallback table scan 
(12.5TB for Example 7).
+| Parameter                          | Value        | Notes                    
                       |
+|------------------------------------|--------------|--------------------------------------------------|
+| Candidate paths (N)                | 2,000        | Locally orphaned in 500 
cleaned FGs             |
+| Avg refs per candidate (R_avg)     | 3            | Typical: video in a few 
playlists               |
+| Total record keys (R_total)        | 6,000        | N * R_avg                
                       |
+| Popular blob refs (viral example)  | 10,000       | A viral video in many 
playlists                 |
+| **Step 1: batched prefix scan**    | 1 MDT call   | Returns 6K record keys 
(or 10K+ with viral), ~2-5s |
+| **Step 2: batched record index**   | 1 MDT call   | 6K keys sorted, single 
HFile scan, ~1-2s        |
+| **Step 3: in-memory resolution**   | 6K checks    | Hash set lookups, ~0ms   
                        |
+| **Total Stage 2**                  | **~3-7s**    | Two I/O passes + 
in-memory                       |
+| **Viral blob (10K refs)**          |              |                          
                        |
+| Step 1: prefix scan                | 1 MDT call   | Returns 10K record keys, 
~2-3s                   |
+| Step 2: record index               | 1 MDT call   | 10K keys sorted, single 
HFile scan, ~2-3s        |
+| Step 3: in-memory resolution       | 10K checks   | ~0ms                     
                        |
+| **Total for viral blob**           | **~4-6s**    | Sequential scan, not 10K 
random seeks             |
+
+Both steps use batch APIs (`readSecondaryIndexDataTableRecordKeysWithKeys` and
+`readRecordIndexLocations`) that sort keys internally and perform sequential 
forward-scans
+through HFile blocks. No per-key random I/O.
+
+Even in the viral blob case, this is far cheaper than the fallback table scan 
(12.5TB for Example 7).
 
 **Index definition.** The index is defined as a secondary index on the blob 
reference's
 `external_path` field using the existing `HoodieIndexDefinition` mechanism:
@@ -1243,10 +1253,10 @@ does not delete data files. The blob cleaner's inputs 
are unaffected by archival
 ### C13: Non-path-dispatched blobs require cross-FG verification at scale
 
 **Satisfied.** Stage 2 uses an MDT secondary index with batched prefix scans 
and short-circuit
-record index hops. The cost is proportional to candidates and their reference 
fan-out
-(O(candidates * refs_per_candidate) worst case, O(candidates) amortized with 
short-circuit), not
-the total table size. For the Example 7 scenario (10M videos, 50K FGs, 2,000 
candidates), the cost
-is ~10s (common case) to ~20s (all orphaned) -- not 12.5TB of data reads. The 
fallback table scan
+record index hops. The cost is two sequential I/O passes (batched prefix scan 
+ batched record index
+forward-scan) + in-memory resolution, proportional to candidates and their 
record key count, not
+the total table size. For the Example 7 scenario (10M videos, 50K FGs, 2,000 
candidates, 6K record
+keys), the cost is ~3-7s -- not 12.5TB of data reads. The fallback table scan
 has a circuit breaker to prevent degenerate cases. See Section 3.2.1 for the 
detailed cost model.
 
 ---
@@ -1426,13 +1436,12 @@ recovery. No coordination challenges. The refactoring 
is confined to:
 
 For Flow 2 workloads where cross-FG sharing is the common case (C13), the MDT 
secondary index
 `secondary_index_blob_external_path` maps each external blob path to the 
records that reference it.
-Verification uses a batched prefix scan on the secondary index, then a 
short-circuiting record
-index hop per candidate. The lookup is NOT a simple point lookup -- it is a 
prefix scan with fan-out
-proportional to references per candidate. The short-circuit optimization makes 
the common case fast:
-stop checking a candidate as soon as one non-cleaned FG reference is found.
+Verification is a three-step batched pipeline: (1) batched prefix scan on the 
secondary index
+returning all record keys, (2) batched record index lookup for all record keys 
in a single sorted
+HFile forward-scan, (3) in-memory resolution with per-candidate short-circuit.
 
-**Cost:** O(candidates * refs_per_candidate) worst case for prefix scans. 
O(candidates) amortized
-for record index hops when most blobs are still live (short-circuit fires 
after 1-3 hops). See
+**Cost:** Two I/O passes (one prefix scan, one record index scan) + O(R_total) 
in-memory checks.
+The I/O is sequential (sorted forward-scans through HFiles), not random 
per-key lookups. See
 Section 3.2.1 for the detailed cost model and analysis.
 
 **Scaling analysis for Example 7 (media company, revised):**
@@ -1444,13 +1453,12 @@ Section 3.2.1 for the detailed cost model and analysis.
 | FGs cleaned this cycle          | 500          | 1% of table                 
                 |
 | External blob candidates        | 2,000        | Locally orphaned in cleaned 
FGs              |
 | Avg refs per candidate          | 3            | Typical: video in a few 
playlists            |
-| **Common case (most still live)** |             |                            
                  |
-| Prefix scans (batched)          | 2,000        | One MDT call, returns ~6K 
record keys, ~5s   |
-| Record index hops               | ~2,000       | Short-circuit: 1 per 
candidate, ~5s          |
-| **Total Stage 2 time**          | **~10s**     |                             
                 |
-| **Worst case (all orphaned)**   |              |                             
                 |
-| Record index hops               | ~6,000       | No short-circuit possible, 
~15s              |
-| **Total Stage 2 time**          | **~20s**     |                             
                 |
+| Total record keys (R_total)     | 6,000        | N * R_avg                   
                 |
+| **Stage 2 (batched pipeline)**  |              |                             
                 |
+| Step 1: batched prefix scan     | 1 MDT call   | Returns 6K record keys, 
~2-5s                |
+| Step 2: batched record index    | 1 MDT call   | 6K keys sorted, single 
HFile scan, ~1-2s     |
+| Step 3: in-memory resolution    | 6K checks    | Hash set lookups, ~0ms      
                 |
+| **Total Stage 2 time**          | **~3-7s**    |                             
                 |
 | Comparison: naive table scan    | 12.5TB read  | 50K FGs * 5 slices * 50MB = 
prohibitive      |
 
 The index maintenance cost is borne by the existing MDT write pipeline via
@@ -1799,24 +1807,32 @@ the size of one set (expired_refs only).
 
 **With MDT secondary index (primary path):**
 
-The lookup is a two-hop process: (1) prefix scan on secondary index returning 
record keys, then
-(2) record index lookup per record key to resolve to a file group ID. The 
short-circuit
-optimization stops Step 2 as soon as one non-cleaned FG reference is found per 
candidate.
-
-| Parameter                           | Value                | Notes           
                           |
-|-------------------------------------|----------------------|--------------------------------------------|
-| Candidates (N)                      | N                    | Only 
locally-orphaned external blobs       |
-| Step 1: prefix scans                | N (batched)          | One call to 
readSecondaryIndex...WithKeys  |
-| Record keys returned                | N * R_avg            | R_avg = avg 
refs per candidate             |
-| Step 2: record index hops (common)  | ~N                   | Short-circuit: 
1-3 per candidate           |
-| Step 2: record index hops (worst)   | N * R_avg            | All refs in 
cleaned FGs (rare)             |
-| Latency per MDT I/O (cloud)         | 10-50ms              | S3/GCS HFile 
read                          |
-| Memory                              | O(N * R_avg)         | Record keys + 
lookup results               |
-
-For Example 7 (2,000 candidates, R_avg=3):
-- Common case (most still live): ~10s (5s prefix scans + 5s record index with 
short-circuit).
-- Worst case (all orphaned): ~20s (5s prefix scans + 15s for 6K record index 
lookups).
-- One viral blob (10K refs, all in cleaned FGs): ~27s for that blob alone.
+The lookup is a three-step pipeline with two batched I/O passes and one 
in-memory pass:
+(1) batched prefix scan on secondary index returning all record keys, (2) 
batched record index
+lookup for all record keys in a single sorted HFile forward-scan, (3) 
in-memory resolution with
+per-candidate short-circuit.
+
+| Parameter                             | Value                | Notes         
                                      |
+|---------------------------------------|----------------------|------------------------------------------------------|
+| Candidates (N)                        | N                    | Only 
locally-orphaned external blobs                |
+| Step 1: batched prefix scan           | 1 MDT call           | 
readSecondaryIndex...WithKeys for N paths           |
+| Record keys returned (R_total)        | N * R_avg            | R_avg = avg 
refs per candidate                      |
+| Step 2: batched record index          | 1 MDT call           | 
readRecordIndexLocations: sorted forward-scan        |
+| Step 3: in-memory resolution          | R_total checks       | Hash set 
lookups against cleaned_fg_ids              |
+| Memory                                | O(N * R_avg)         | Record keys + 
lookup results (in-memory after batch) |
+
+For Example 7 (2,000 candidates, R_avg=3, R_total=6K):
+- Step 1: batched prefix scan returning 6K record keys, ~2-5s.
+- Step 2: batched record index lookup, 6K keys sorted, single HFile scan, 
~1-2s.
+- Step 3: in-memory resolution, ~0ms.
+- **Total: ~3-7s.**
+
+For a viral blob (10K refs):
+- Step 1: prefix scan for that blob returns 10K keys, ~2-3s.
+- Step 2: 10K keys in a single sorted HFile scan, ~2-3s.
+- Step 3: in-memory, ~0ms.
+- **Total: ~4-6s** (not 27s -- the previous estimate incorrectly assumed 10K 
sequential random
+  lookups instead of a single batched forward-scan).
 
 **Without MDT secondary index (fallback table scan):**
 
@@ -1954,7 +1970,7 @@ function planBlobCleanup(table, fileGroupCleanResults, 
context, config):
     if not external_candidates.isEmpty():
         indexPartition = "secondary_index_blob_external_path"
         if isSecondaryIndexFullyBuilt(table.getMetaClient(), indexPartition):
-            // Primary path: batched prefix scan + short-circuit record index 
hop
+            // Primary path: two batched I/O passes + in-memory resolution
             candidate_paths = external_candidates.map(r -> r.path).distinct()
 
             // Step 1: Batched prefix scan on secondary index
@@ -1964,7 +1980,15 @@ function planBlobCleanup(table, fileGroupCleanResults, 
context, config):
                 .collectAsList()
                 .groupBy(pair -> pair.getKey())
 
-            // Step 2: For each path, short-circuit record index hop
+            // Step 2: Batch record index lookup for ALL record keys
+            all_record_keys = path_to_record_keys.values().stream()
+                .flatMap(List::stream).collect(toList())
+            all_locations = mdtMetadata
+                
.readRecordIndexLocations(HoodieListData.eager(all_record_keys))
+                .collectAsList()
+                .toMap(pair -> pair.getKey(), pair -> pair.getValue())
+
+            // Step 3: In-memory resolution with per-candidate short-circuit
             for path in candidate_paths:
                 record_keys = path_to_record_keys.getOrDefault(path, [])
                 if record_keys is empty:
@@ -1974,10 +1998,10 @@ function planBlobCleanup(table, fileGroupCleanResults, 
context, config):
 
                 found_live = false
                 for rk in record_keys:
-                    loc = mdtMetadata.readRecordIndex(rk)
-                    if loc.fileId NOT in cleaned_fg_ids:
+                    loc = all_locations.get(rk)
+                    if loc != null and loc.fileId NOT in cleaned_fg_ids:
                         found_live = true
-                        break                        // short-circuit
+                        break                        // short-circuit 
(in-memory)
 
                 if not found_live:
                     external_deletes.addAll(
@@ -2104,10 +2128,11 @@ properties (C11, P3) make this both correct and 
sufficient. No new infrastructur
 triggered.
 
 **Flow 2 (non-path-dispatched external blobs):** First-class cross-FG 
verification using an MDT
-secondary index on `reference.external_path` with a short-circuit 
optimization. The lookup is a
-batched prefix scan on the secondary index followed by record index hops, with 
short-circuit on
-first non-cleaned FG reference found. Cost is O(candidates) amortized in the 
common case,
-O(candidates * refs_per_candidate) worst case -- not O(table_size). The index 
uses existing MDT
+secondary index on `reference.external_path`. The lookup is a three-step 
batched pipeline:
+(1) batched prefix scan on the secondary index, (2) batched record index 
lookup for all record
+keys in a single sorted HFile forward-scan, (3) in-memory resolution with 
per-candidate
+short-circuit. Cost is two sequential I/O passes + O(R_total) in-memory checks 
-- not
+O(table_size). For Example 7 (2K candidates, 6K record keys): ~3-7s. The index 
uses existing MDT
 secondary index infrastructure: `HoodieIndexDefinition` with nested 
`sourceFields`,
 `SecondaryIndexRecordGenerationUtils` for maintenance, 
`readSecondaryIndexDataTableRecordKeysWithKeys`
 for batched lookups. Nested struct field paths 
(`<blob_col>.reference.external_path`) are verified

Reply via email to