This is an automated email from the ASF dual-hosted git repository. vhs pushed a commit to branch rfc-blob-cleaner in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 3b696b33b308ab9030e25ce3801b2d0ef3a17541 Author: voon <[email protected]> AuthorDate: Fri Mar 20 15:54:30 2026 +0800 Update rfc-100 with batch search instead of random sequential search --- rfc/rfc-100/rfc-100-blob-cleaner-design.md | 87 ++++++----- rfc/rfc-100/rfc-100-blob-cleaner.md | 225 ++++++++++++++++------------- 2 files changed, 173 insertions(+), 139 deletions(-) diff --git a/rfc/rfc-100/rfc-100-blob-cleaner-design.md b/rfc/rfc-100/rfc-100-blob-cleaner-design.md index ddb895859ba4..a7f80e14bcad 100644 --- a/rfc/rfc-100/rfc-100-blob-cleaner-design.md +++ b/rfc/rfc-100/rfc-100-blob-cleaner-design.md @@ -307,8 +307,13 @@ path_to_record_keys = mdtMetadata.readSecondaryIndexDataTableRecordKeysWithKeys( HoodieListData.eager(candidate_paths), indexPartitionName) .groupBy(pair -> pair.getKey()) -// Step 2: For each candidate, resolve record keys to file group IDs -// Short-circuit: stop as soon as ONE record key lives in a non-cleaned FG +// Step 2: Batch record index lookup -- ONE call for ALL record keys +// Sorts keys internally, single sequential forward-scan through HFile. +all_record_keys = path_to_record_keys.values().flatMap() +all_locations = mdtMetadata.readRecordIndexLocations( + HoodieListData.eager(all_record_keys)) // -> Map<recordKey, (partition, fileId)> + +// Step 3: In-memory resolution with short-circuit per candidate for path in candidate_paths: record_keys = path_to_record_keys.getOrDefault(path, []) @@ -318,24 +323,25 @@ for path in candidate_paths: found_live_reference = false for rk in record_keys: - location = mdtMetadata.readRecordIndex(rk) // -> (partition, fileId) - if location.fileId NOT in cleaned_fg_ids: + location = all_locations.get(rk) + if location != null and location.fileId NOT in cleaned_fg_ids: found_live_reference = true - break // short-circuit: retain + break // short-circuit (in-memory) if not found_live_reference: external_deletes.addAll(candidates for this path) // all refs in cleaned FGs ``` -**Cost model.** The lookup is a two-hop process: (1) prefix scan on the secondary index returning -record keys, (2) record index lookup per record key to resolve to a file group ID. The short-circuit -optimization makes the common case fast: +**Cost model.** The lookup is a three-step process: (1) batched prefix scan on the secondary index, +(2) batched record index lookup for all record keys in a single sorted HFile scan, (3) in-memory +resolution with per-candidate short-circuit. Steps 1 and 2 are each a single I/O pass; step 3 is +pure in-memory hash set lookups (~0ms). -| Scenario | Step 1 | Step 2 per candidate | Total | -|---------------------------|-----------------------|--------------------------|--------------| -| Blob still live (common) | 1 prefix scan | 1-3 record index lookups | O(N) | -| Blob truly orphaned | 1 prefix scan | R_i record index lookups | O(N * R_avg) | -| Blob has 0 refs (deleted) | 1 prefix scan (empty) | 0 | O(N) | +| Step | I/O | Cost | +|-----------------------------|-----------------------------------------|--------------------------| +| 1. Prefix scan (batched) | Single MDT call for N candidate paths | ~2-5s for 2K candidates | +| 2. Record index (batched) | Single sorted HFile forward-scan | ~1-2s for 6K record keys | +| 3. In-memory resolution | Hash set checks (cleaned_fg_ids) | ~0ms | **Index definition.** Uses the existing `HoodieIndexDefinition` mechanism with `sourceFields = ["<blob_col>", "reference", "external_path"]`. The nested field path is supported @@ -368,21 +374,25 @@ sequenceDiagram participant SI as MDT Secondary Index participant RI as MDT Record Index - C->>SI: Batched prefix scan<br/>candidate_paths (N paths) - SI-->>C: Map<path, List<recordKey>> + Note over C: Step 1: Batch prefix scan + C->>SI: All candidate paths (N paths, single call) + SI-->>C: Map<path, List<recordKey>> + + Note over C: Step 2: Batch record index lookup + C->>C: Collect all record keys from all candidates + C->>RI: readRecordIndexLocations(all record keys) + Note over RI: Sort keys → single sequential<br/>forward-scan through HFile + RI-->>C: Map<recordKey, (partition, fileId)> + Note over C: Step 3: In-memory resolution loop For each candidate path - alt No record keys returned + alt No record keys for this path Note right of C: Globally orphaned → DELETE else Has record keys - loop For each record key (short-circuit) - C->>RI: Lookup record key - RI-->>C: (partition, fileId) - alt fileId NOT in cleaned_fg_ids - Note right of C: Live reference found<br/>SHORT-CIRCUIT → RETAIN - end - end - alt All record keys in cleaned FGs + C->>C: Check each location.fileId<br/>against cleaned_fg_ids (in-memory) + alt Any fileId NOT in cleaned_fg_ids + Note right of C: Live reference → RETAIN + else All in cleaned FGs Note right of C: Globally orphaned → DELETE end end @@ -657,21 +667,20 @@ cleaning: ### Back-of-Envelope: Example 7 (50K FGs, 2K External Candidates) -| Parameter | Value | Notes | -|-----------------------------------------|----------|-----------------------------------------| -| FGs cleaned this cycle | 500 | 1% of table | -| Stage 1: reads per FG | ~6 | 3 retained + 3 expired slices | -| Stage 1: total reads | 3,000 | Parallelized across executors, ~20s | -| External blob candidates | 2,000 | Locally orphaned in cleaned FGs | -| Avg refs per candidate | 3 | Typical: video in a few playlists | -| **Common case (most blobs still live)** | | | -| Stage 2: prefix scans (batched) | 2,000 | One MDT call, ~5s | -| Stage 2: record index hops | ~2,000 | Short-circuit: 1 per candidate, ~5s | -| **Total Stage 2** | **~10s** | | -| **Worst case (all blobs orphaned)** | | | -| Stage 2: record index hops | ~6,000 | No short-circuit, ~15s | -| **Total Stage 2** | **~20s** | | -| Comparison: naive full-table scan | 12.5TB | 50K FGs * 5 slices * 50MB = prohibitive | +| Parameter | Value | Notes | +|-----------------------------------------|-----------|-------------------------------------------------| +| FGs cleaned this cycle | 500 | 1% of table | +| Stage 1: reads per FG | ~6 | 3 retained + 3 expired slices | +| Stage 1: total reads | 3,000 | Parallelized across executors, ~20s | +| External blob candidates | 2,000 | Locally orphaned in cleaned FGs | +| Avg refs per candidate | 3 | Typical: video in a few playlists | +| Total record keys | 6,000 | 2,000 * 3 | +| **Stage 2 cost** | | | +| Step 1: batched prefix scan | 1 call | Returns 6K record keys, ~2-5s | +| Step 2: batched record index lookup | 1 call | 6K keys sorted, single HFile scan, ~1-2s | +| Step 3: in-memory resolution | 6K checks | Hash set lookups against cleaned_fg_ids, ~0ms | +| **Total Stage 2** | **~3-7s** | | +| Comparison: naive full-table scan | 12.5TB | 50K FGs * 5 slices * 50MB = prohibitive | ### Memory Budget diff --git a/rfc/rfc-100/rfc-100-blob-cleaner.md b/rfc/rfc-100/rfc-100-blob-cleaner.md index 4e41b596a1aa..1392333a08d1 100644 --- a/rfc/rfc-100/rfc-100-blob-cleaner.md +++ b/rfc/rfc-100/rfc-100-blob-cleaner.md @@ -274,9 +274,21 @@ path_to_record_keys: Map<String, List<String>> = .collectAsList() .groupBy(pair -> pair.getKey()) // group by external_path -// Step 2: For each candidate path, resolve record keys to file group IDs -// via the record index. Use short-circuit: stop as soon as ONE record key -// resolves to a non-cleaned file group. +// Step 2: Batch record index lookup for ALL record keys across ALL candidates. +// readRecordIndexLocations sorts keys internally and performs a single +// sequential forward-scan through the record index HFile. This is vastly +// more efficient than per-key random lookups. +all_record_keys = path_to_record_keys.values().stream() + .flatMap(List::stream).collect(toList()) + +all_locations: Map<String, HoodieRecordGlobalLocation> = + mdtMetadata.readRecordIndexLocations( + HoodieListData.eager(all_record_keys)) + .collectAsList() + .toMap(pair -> pair.getKey(), pair -> pair.getValue()) + +// Step 3: In-memory resolution with per-candidate short-circuit. +// All I/O is complete -- this is pure hash set lookups (~0ms). external_deletes = List<BlobRef>() for path in candidate_paths: @@ -287,14 +299,12 @@ for path in candidate_paths: external_deletes.addAll(external_candidates.filter(r -> r.path == path)) continue - // Short-circuit: check record keys one at a time via record index. - // Stop as soon as we find one that lives in a non-cleaned FG. found_live_reference = false for rk in record_keys: - location = mdtMetadata.readRecordIndex(rk) // -> (partition, fileId, instant) - if location.fileId NOT in cleaned_fg_ids: + location = all_locations.get(rk) + if location != null and location.fileId NOT in cleaned_fg_ids: found_live_reference = true - break // short-circuit: retain this blob + break // short-circuit (in-memory) if not found_live_reference: // All references are in cleaned FGs -> globally orphaned @@ -311,64 +321,64 @@ and scans all matching entries in the MDT's HFile/Parquet structure. Each matchi record key. Then, the record index is consulted to resolve each record key to a `HoodieRecordGlobalLocation` containing the `(partition, fileId, instantTime)`. -This two-hop resolution means the cost per candidate is NOT O(1) -- it is proportional to the -number of records referencing that candidate's path. +The resolution is a three-step pipeline with two batched I/O passes and one in-memory pass: -**Cost model (corrected).** Let: +**Cost model.** Let: - N = number of distinct candidate paths -- R_i = number of records referencing candidate path i -- R_avg = average references per candidate - -The worst-case cost is: +- R_total = total record keys across all candidates = sum(R_i) = N * R_avg ``` -Step 1 (prefix scans): N prefix scans on MDT secondary index partition. - Cost: O(N) I/O rounds, each scanning R_i entries. - Total entries scanned: sum(R_i) = N * R_avg. - -Step 2 (record index): For each candidate, resolve record keys to file groups. - With short-circuit: stop after first non-cleaned FG hit. - Best case: 1 record index lookup per candidate (common case). - Worst case: R_i lookups if ALL refs are in cleaned FGs. +Step 1 (prefix scans): Batched into a single call to + readSecondaryIndexDataTableRecordKeysWithKeys(). + Cost: 1 I/O pass over the secondary index partition. + Returns R_total record keys grouped by candidate path. + +Step 2 (record index): ALL R_total record keys batched into a single call to + readRecordIndexLocations(). The API sorts keys internally + and performs a single sequential forward-scan through the + record index HFile. No random seeks. + Cost: 1 I/O pass (sequential scan). + +Step 3 (in-memory): For each candidate, check its record keys' resolved + locations against cleaned_fg_ids (hash set). + Short-circuit: stop at first non-cleaned FG hit. + Cost: ~0ms (pure in-memory). ``` -The **short-circuit optimization** is critical. For the common case where a shared blob is still -referenced by at least one non-cleaned FG, Step 2 terminates after checking a small number of -record keys -- typically 1-3 lookups. The worst case (all R_i references are in cleaned FGs) is -rare: it means every single FG that references this blob is being cleaned simultaneously. - -| Scenario | Step 1 cost | Step 2 cost per candidate | Total | -|-----------------------------------|------------------|---------------------------|------------------| -| Blob still live (common) | 1 prefix scan | 1-3 record index lookups | O(N) | -| Blob truly orphaned (all refs cleaned) | 1 prefix scan | R_i record index lookups | O(N * R_avg) | -| Blob has 0 refs (already deleted) | 1 prefix scan (empty) | 0 | O(N) | +| Step | I/O pattern | Cost | +|------|--------------------------|----------------------------------------------| +| 1 | 1 batched MDT call | O(N) prefix scans, returns R_total entries | +| 2 | 1 batched MDT call | Sorted forward-scan for R_total keys | +| 3 | In-memory hash set | O(R_total) worst case, short-circuit typical | -**Batching.** All candidate paths are batched into a single call to -`readSecondaryIndexDataTableRecordKeysWithKeys(HoodieData<String>, partitionName)`. This allows the -MDT reader to optimize I/O across multiple prefix scans (e.g., coalesce reads from the same HFile -block). The API already accepts `HoodieData<String>` for batched input. +**Batching.** Both Step 1 and Step 2 use batch APIs that accept `HoodieData<String>`. Step 1 uses +`readSecondaryIndexDataTableRecordKeysWithKeys()`. Step 2 uses `readRecordIndexLocations()`. Each +performs a single sorted scan through the respective HFile partition, coalescing reads from +adjacent blocks. No per-key random I/O. **Back-of-envelope for Example 7 (revised):** -| Parameter | Value | Notes | -|---------------------------------|--------------|----------------------------------------------| -| Candidate paths (N) | 2,000 | Locally orphaned in 500 cleaned FGs | -| Avg refs per candidate (R_avg) | 3 | Typical: video in a few playlists | -| Popular blob refs (worst case) | 10,000 | A viral video in many playlists | -| **Scenario: most blobs still live** | | | -| Step 1: prefix scans | 2,000 | Batched into one MDT call, ~5s | -| Step 2: record index lookups | ~2,000 | 1 per candidate (short-circuit), ~5s | -| **Total** | **~10s** | Dominated by MDT I/O | -| **Scenario: all blobs orphaned**| | | -| Step 1: prefix scans | 2,000 | Returns 6,000 record keys total, ~5s | -| Step 2: record index lookups | 6,000 | All checked (no short-circuit), ~15s | -| **Total** | **~20s** | Acceptable | -| **Worst case: one viral blob** | | | -| Step 1: prefix scan for 1 blob | 1 | Returns 10,000 record keys, ~2s | -| Step 2: record index lookups | 10,000 | If all in cleaned FGs (unlikely), ~25s | -| **Total for that blob** | **~27s** | One blob dominates; others are O(1) | - -Even in the worst case, this is far cheaper than the fallback table scan (12.5TB for Example 7). +| Parameter | Value | Notes | +|------------------------------------|--------------|--------------------------------------------------| +| Candidate paths (N) | 2,000 | Locally orphaned in 500 cleaned FGs | +| Avg refs per candidate (R_avg) | 3 | Typical: video in a few playlists | +| Total record keys (R_total) | 6,000 | N * R_avg | +| Popular blob refs (viral example) | 10,000 | A viral video in many playlists | +| **Step 1: batched prefix scan** | 1 MDT call | Returns 6K record keys (or 10K+ with viral), ~2-5s | +| **Step 2: batched record index** | 1 MDT call | 6K keys sorted, single HFile scan, ~1-2s | +| **Step 3: in-memory resolution** | 6K checks | Hash set lookups, ~0ms | +| **Total Stage 2** | **~3-7s** | Two I/O passes + in-memory | +| **Viral blob (10K refs)** | | | +| Step 1: prefix scan | 1 MDT call | Returns 10K record keys, ~2-3s | +| Step 2: record index | 1 MDT call | 10K keys sorted, single HFile scan, ~2-3s | +| Step 3: in-memory resolution | 10K checks | ~0ms | +| **Total for viral blob** | **~4-6s** | Sequential scan, not 10K random seeks | + +Both steps use batch APIs (`readSecondaryIndexDataTableRecordKeysWithKeys` and +`readRecordIndexLocations`) that sort keys internally and perform sequential forward-scans +through HFile blocks. No per-key random I/O. + +Even in the viral blob case, this is far cheaper than the fallback table scan (12.5TB for Example 7). **Index definition.** The index is defined as a secondary index on the blob reference's `external_path` field using the existing `HoodieIndexDefinition` mechanism: @@ -1243,10 +1253,10 @@ does not delete data files. The blob cleaner's inputs are unaffected by archival ### C13: Non-path-dispatched blobs require cross-FG verification at scale **Satisfied.** Stage 2 uses an MDT secondary index with batched prefix scans and short-circuit -record index hops. The cost is proportional to candidates and their reference fan-out -(O(candidates * refs_per_candidate) worst case, O(candidates) amortized with short-circuit), not -the total table size. For the Example 7 scenario (10M videos, 50K FGs, 2,000 candidates), the cost -is ~10s (common case) to ~20s (all orphaned) -- not 12.5TB of data reads. The fallback table scan +record index hops. The cost is two sequential I/O passes (batched prefix scan + batched record index +forward-scan) + in-memory resolution, proportional to candidates and their record key count, not +the total table size. For the Example 7 scenario (10M videos, 50K FGs, 2,000 candidates, 6K record +keys), the cost is ~3-7s -- not 12.5TB of data reads. The fallback table scan has a circuit breaker to prevent degenerate cases. See Section 3.2.1 for the detailed cost model. --- @@ -1426,13 +1436,12 @@ recovery. No coordination challenges. The refactoring is confined to: For Flow 2 workloads where cross-FG sharing is the common case (C13), the MDT secondary index `secondary_index_blob_external_path` maps each external blob path to the records that reference it. -Verification uses a batched prefix scan on the secondary index, then a short-circuiting record -index hop per candidate. The lookup is NOT a simple point lookup -- it is a prefix scan with fan-out -proportional to references per candidate. The short-circuit optimization makes the common case fast: -stop checking a candidate as soon as one non-cleaned FG reference is found. +Verification is a three-step batched pipeline: (1) batched prefix scan on the secondary index +returning all record keys, (2) batched record index lookup for all record keys in a single sorted +HFile forward-scan, (3) in-memory resolution with per-candidate short-circuit. -**Cost:** O(candidates * refs_per_candidate) worst case for prefix scans. O(candidates) amortized -for record index hops when most blobs are still live (short-circuit fires after 1-3 hops). See +**Cost:** Two I/O passes (one prefix scan, one record index scan) + O(R_total) in-memory checks. +The I/O is sequential (sorted forward-scans through HFiles), not random per-key lookups. See Section 3.2.1 for the detailed cost model and analysis. **Scaling analysis for Example 7 (media company, revised):** @@ -1444,13 +1453,12 @@ Section 3.2.1 for the detailed cost model and analysis. | FGs cleaned this cycle | 500 | 1% of table | | External blob candidates | 2,000 | Locally orphaned in cleaned FGs | | Avg refs per candidate | 3 | Typical: video in a few playlists | -| **Common case (most still live)** | | | -| Prefix scans (batched) | 2,000 | One MDT call, returns ~6K record keys, ~5s | -| Record index hops | ~2,000 | Short-circuit: 1 per candidate, ~5s | -| **Total Stage 2 time** | **~10s** | | -| **Worst case (all orphaned)** | | | -| Record index hops | ~6,000 | No short-circuit possible, ~15s | -| **Total Stage 2 time** | **~20s** | | +| Total record keys (R_total) | 6,000 | N * R_avg | +| **Stage 2 (batched pipeline)** | | | +| Step 1: batched prefix scan | 1 MDT call | Returns 6K record keys, ~2-5s | +| Step 2: batched record index | 1 MDT call | 6K keys sorted, single HFile scan, ~1-2s | +| Step 3: in-memory resolution | 6K checks | Hash set lookups, ~0ms | +| **Total Stage 2 time** | **~3-7s** | | | Comparison: naive table scan | 12.5TB read | 50K FGs * 5 slices * 50MB = prohibitive | The index maintenance cost is borne by the existing MDT write pipeline via @@ -1799,24 +1807,32 @@ the size of one set (expired_refs only). **With MDT secondary index (primary path):** -The lookup is a two-hop process: (1) prefix scan on secondary index returning record keys, then -(2) record index lookup per record key to resolve to a file group ID. The short-circuit -optimization stops Step 2 as soon as one non-cleaned FG reference is found per candidate. - -| Parameter | Value | Notes | -|-------------------------------------|----------------------|--------------------------------------------| -| Candidates (N) | N | Only locally-orphaned external blobs | -| Step 1: prefix scans | N (batched) | One call to readSecondaryIndex...WithKeys | -| Record keys returned | N * R_avg | R_avg = avg refs per candidate | -| Step 2: record index hops (common) | ~N | Short-circuit: 1-3 per candidate | -| Step 2: record index hops (worst) | N * R_avg | All refs in cleaned FGs (rare) | -| Latency per MDT I/O (cloud) | 10-50ms | S3/GCS HFile read | -| Memory | O(N * R_avg) | Record keys + lookup results | - -For Example 7 (2,000 candidates, R_avg=3): -- Common case (most still live): ~10s (5s prefix scans + 5s record index with short-circuit). -- Worst case (all orphaned): ~20s (5s prefix scans + 15s for 6K record index lookups). -- One viral blob (10K refs, all in cleaned FGs): ~27s for that blob alone. +The lookup is a three-step pipeline with two batched I/O passes and one in-memory pass: +(1) batched prefix scan on secondary index returning all record keys, (2) batched record index +lookup for all record keys in a single sorted HFile forward-scan, (3) in-memory resolution with +per-candidate short-circuit. + +| Parameter | Value | Notes | +|---------------------------------------|----------------------|------------------------------------------------------| +| Candidates (N) | N | Only locally-orphaned external blobs | +| Step 1: batched prefix scan | 1 MDT call | readSecondaryIndex...WithKeys for N paths | +| Record keys returned (R_total) | N * R_avg | R_avg = avg refs per candidate | +| Step 2: batched record index | 1 MDT call | readRecordIndexLocations: sorted forward-scan | +| Step 3: in-memory resolution | R_total checks | Hash set lookups against cleaned_fg_ids | +| Memory | O(N * R_avg) | Record keys + lookup results (in-memory after batch) | + +For Example 7 (2,000 candidates, R_avg=3, R_total=6K): +- Step 1: batched prefix scan returning 6K record keys, ~2-5s. +- Step 2: batched record index lookup, 6K keys sorted, single HFile scan, ~1-2s. +- Step 3: in-memory resolution, ~0ms. +- **Total: ~3-7s.** + +For a viral blob (10K refs): +- Step 1: prefix scan for that blob returns 10K keys, ~2-3s. +- Step 2: 10K keys in a single sorted HFile scan, ~2-3s. +- Step 3: in-memory, ~0ms. +- **Total: ~4-6s** (not 27s -- the previous estimate incorrectly assumed 10K sequential random + lookups instead of a single batched forward-scan). **Without MDT secondary index (fallback table scan):** @@ -1954,7 +1970,7 @@ function planBlobCleanup(table, fileGroupCleanResults, context, config): if not external_candidates.isEmpty(): indexPartition = "secondary_index_blob_external_path" if isSecondaryIndexFullyBuilt(table.getMetaClient(), indexPartition): - // Primary path: batched prefix scan + short-circuit record index hop + // Primary path: two batched I/O passes + in-memory resolution candidate_paths = external_candidates.map(r -> r.path).distinct() // Step 1: Batched prefix scan on secondary index @@ -1964,7 +1980,15 @@ function planBlobCleanup(table, fileGroupCleanResults, context, config): .collectAsList() .groupBy(pair -> pair.getKey()) - // Step 2: For each path, short-circuit record index hop + // Step 2: Batch record index lookup for ALL record keys + all_record_keys = path_to_record_keys.values().stream() + .flatMap(List::stream).collect(toList()) + all_locations = mdtMetadata + .readRecordIndexLocations(HoodieListData.eager(all_record_keys)) + .collectAsList() + .toMap(pair -> pair.getKey(), pair -> pair.getValue()) + + // Step 3: In-memory resolution with per-candidate short-circuit for path in candidate_paths: record_keys = path_to_record_keys.getOrDefault(path, []) if record_keys is empty: @@ -1974,10 +1998,10 @@ function planBlobCleanup(table, fileGroupCleanResults, context, config): found_live = false for rk in record_keys: - loc = mdtMetadata.readRecordIndex(rk) - if loc.fileId NOT in cleaned_fg_ids: + loc = all_locations.get(rk) + if loc != null and loc.fileId NOT in cleaned_fg_ids: found_live = true - break // short-circuit + break // short-circuit (in-memory) if not found_live: external_deletes.addAll( @@ -2104,10 +2128,11 @@ properties (C11, P3) make this both correct and sufficient. No new infrastructur triggered. **Flow 2 (non-path-dispatched external blobs):** First-class cross-FG verification using an MDT -secondary index on `reference.external_path` with a short-circuit optimization. The lookup is a -batched prefix scan on the secondary index followed by record index hops, with short-circuit on -first non-cleaned FG reference found. Cost is O(candidates) amortized in the common case, -O(candidates * refs_per_candidate) worst case -- not O(table_size). The index uses existing MDT +secondary index on `reference.external_path`. The lookup is a three-step batched pipeline: +(1) batched prefix scan on the secondary index, (2) batched record index lookup for all record +keys in a single sorted HFile forward-scan, (3) in-memory resolution with per-candidate +short-circuit. Cost is two sequential I/O passes + O(R_total) in-memory checks -- not +O(table_size). For Example 7 (2K candidates, 6K record keys): ~3-7s. The index uses existing MDT secondary index infrastructure: `HoodieIndexDefinition` with nested `sourceFields`, `SecondaryIndexRecordGenerationUtils` for maintenance, `readSecondaryIndexDataTableRecordKeysWithKeys` for batched lookups. Nested struct field paths (`<blob_col>.reference.external_path`) are verified
