jineshparakh opened a new pull request, #18460:
URL: https://github.com/apache/pinot/pull/18460
## Summary
For dedup tables with tier configurations, the segment-assignment path can
incorrectly place a new CONSUMING segment on cold-tier servers, breaking the
dedup invariant (all segments for a partition on the same hot-tier servers).
This PR fixes that by positively identifying cold-tier servers via ZK and
skipping cold-tier segments when looking up the "existing assignment" reference
for a partition.
This fix prevents new CONSUMING segments from being placed on cold-tier
servers. It does NOT retroactively repair segments that the bug already
misplaced. Tables that hit the bug before this fix was deployed will need
operator-driven recovery; the specific recovery procedure is out of scope for
this PR.
Affects only `MultiTierStrictRealtimeSegmentAssignment` (dedup tables with
tier configs). `SingleTierStrictRealtimeSegmentAssignment` (upsert) and plain
`RealtimeSegmentAssignment` are unaffected.
---
## The Bug
### Symptom
When a dedup table has a tier configuration (e.g. cold tier) and
`SegmentRelocator` has moved older COMPLETED segments to cold-tier servers,
subsequent new CONSUMING segments for the same partition could be placed on
cold-tier servers instead of hot-tier (consuming) servers.
This breaks the dedup invariant that **all segments for a partition must be
on the same hot-tier servers**, since dedup metadata is co-located with the
CONSUMING-tagged servers.
### Root Cause
In `BaseStrictRealtimeSegmentAssignment.getExistingAssignment(partitionId,
currentAssignment)`:
```java
for (Map.Entry<String, Map<String, String>> entry :
currentAssignment.entrySet()) {
if (isOfflineSegment(entry.getValue())) {
continue;
}
LLCSegmentName llcSegmentName = LLCSegmentName.of(entry.getKey());
if (llcSegmentName == null) {
uploadedSegments.add(entry.getKey());
continue;
}
if (llcSegmentName.getPartitionGroupId() == partitionId) {
return entry.getValue().keySet(); // <-- first match returned
}
}
```
**The fundamental issue**: the method returns the assignment of the first
matching segment it encounters, without verifying that segment is a valid
hot-tier reference. Cold-tier assignments are not a valid source of truth for
the dedup invariant, they describe where the segment was *relocated to*, not
where new CONSUMING segments for that partition should land.
**Why the bug triggers reliably in practice**: `currentAssignment` is a
`TreeMap` keyed by segment name (LLC segment names have the form
`table__partition__seqNum__timestamp`). Iteration is in lexicographic order,
which for single-digit seqNums (0–9) coincides with numeric order. For a given
partition, older segments (lower seqNum) are encountered first and older
completed segments are exactly the ones `SegmentRelocator` moves to cold tier
first. The first match is therefore very likely to be a cold-tier segment
whenever any segment for the partition has been relocated.
### Example
```
Partition 3 state in idealState (TreeMap, iterates alphabetically by segment
name):
testTable__3__874__T → {coldTier_server_12: ONLINE, coldTier_server_13:
ONLINE}
↑ older COMPLETED segment, relocated to cold by
SegmentRelocator
testTable__3__875__T → {hot_server_10: ONLINE, hot_server_12: ONLINE,
...}
↑ newer COMPLETED segment, still on hot tier
testTable__3__876__T → {hot_server_10: CONSUMING, hot_server_12:
CONSUMING, ...}
↑ actively consuming on hot tier
assignSegment(testTable__3__877__T, ...):
Pre-fix: __874 is the first match for partition 3
→ returns {coldTier_server_12, coldTier_server_13}
→ seg__877 (new CONSUMING) gets assigned to cold-tier servers →
BUG
Post-fix: __874's servers are all in the tier-instance set → filter skips
it
→ __875 is the next match → returns hot-tier servers
→ seg__877 assigned to hot-tier servers ✓
```
---
## The Fix
## Chosen Approach: ZK-based positive tier identification
Add a `protected Set<String> getTierInstances()` template method to
`BaseStrictRealtimeSegmentAssignment`. Inside `getExistingAssignment`, skip any
segment whose servers are **entirely** in this set:
```java
if (!tierInstances.isEmpty() &&
tierInstances.containsAll(entry.getValue().keySet())) {
continue;
}
```
- Default implementation: returns `Set.of()` — no filtering, identical
behavior for upsert.
- Override in `MultiTierStrictRealtimeSegmentAssignment`: fetches each
tier's `InstancePartitions` from ZK and aggregates all tier-server identities
into a single set.
### Why this works for every scenario
| Scenario | Behavior |
|---|---|
| Normal: all segments on hot-tier | Tier filter never matches → first hot
segment returned ✓ |
| Cold tier exists, mix of cold + hot segments | Cold ones skipped → first
hot returned ✓ |
| Full cascade: all segments on cold-tier (bug already fired) | All skipped
→ returns null → falls back to computed hot-tier assignment → **self-healing**
✓ |
| IP change, no cold tier | Old consuming servers NOT in tier pool → not
skipped → dedup invariant preserved ✓ |
| IP change + cold tier (mixed) | Cold skipped, old hot returned → dedup
invariant preserved ✓ |
| Upsert (`SingleTier`) | Inherits empty default → filter never fires →
behavior identical to pre-change ✓ |
| Tier config but no tier IPs in ZK yet | `fetchInstancePartitions` returns
null → empty tier set → no filtering. Correct because if tier IPs don't exist,
no segment could have been moved to that tier ✓ |
### Trade-off: ZK read per `assignSegment`
`getTierInstances` issues one ZK property store read per configured tier on
every `assignSegment` call. Mitigations:
- `ZkHelixPropertyStore` caches reads locally; only cache-miss/expiry hits
the actual ZK ensemble
- `assignSegment` is not on the query hot path (called once per
CONSUMING-segment commit)
- Pinot's controller is already heavily ZK-dependent for the same code path
If ZK is unavailable, the fetch will throw and propagate up. This is
**intentional** — silently falling back to an empty set would re-introduce the
bug during ZK flakiness. Documented in code.
---
## How we guarantee new CONSUMING segments go to hot servers
Tracing the post-fix `assignSegment` for the typical bug scenario:
1. **Compute candidate from instance partitions**:
`assignConsumingSegment(partitionId, consumingInstancePartitions)`
returns hot-tier servers by construction.
2. **Look up existing assignment** via `getExistingAssignment(partitionId,
currentAssignment)`:
- Fetch tier server set from ZK once
- Iterate `currentAssignment` (TreeMap, alphabetical order by segment
name)
- For each segment:
- Skip if all servers are OFFLINE
- **Skip if all servers are in the tier-server set** (the new filter)
- Match partition → return its keySet (which contains only hot-tier
servers, since cold-tier segments were skipped)
- If no match found → return `null`
3. **Reconcile**:
- If existing == null → use computed hot-tier assignment (case: first
segment for partition, or full cascade)
- If existing != null and matches computed → use computed hot-tier
- If existing != null and differs from computed → use existing (still
hot-tier, because cold-tier was filtered)
In every branch, the final `instancesAssigned` contains only hot-tier
servers.
---
## Tested Scenarios
All tests in `StrictRealtimeSegmentAssignmentTest`:
### Existing tests (regression coverage)
- `testAssignSegment(upsert)` — confirms upsert path is unchanged
- `testAssignSegment(dedup)` — confirms dedup path without tier config is
unchanged
- `testAssignSegmentWithOfflineSegment(upsert / dedup)` — OFFLINE segments
skipped
- `testRebalanceDedupTableWithTiers` — rebalance path unchanged
- `testRebalanceUpsertTableWithTiers` — upsert rejects tiers
- `testAssignSegmentToCompletedServers(upsert / dedup)` — precondition checks
- `testRebalanceTableToCompletedServers(upsert / dedup)` — precondition
checks
### New tests (bug coverage)
| Test | Scenario | What it verifies |
|---|---|---|
| `testAssignSegmentIgnoresColdTierSegments` | Single cold segment for p0,
no other p0 segment, same IPs | Cold skipped → null → computed hot-tier used |
| `testAssignSegmentAllColdTierFallsBackToComputed` | Single cold segment
for p0, new IPs | Cold skipped → null → new computed hot-tier used |
| `testAssignSegmentPrefersSamePartitionOnConsumingTier` | Cold seg_0 + hot
seg_4 for p0, new IPs | Cold skipped, hot returned → dedup invariant preserved
(uses old hot servers, not new) |
| `testAssignSegmentAfterCommitWithColdTier` (parameterized × 2 cold-tier
replications) | **Production-realistic flow**: previous CONSUMING transitioned
to ONLINE before `assignSegment`, with cold-tier segment present | Mirrors what
`PinotLLCRealtimeSegmentManager` does at commit time |
| `testCascadeColdTierCorruptionSelfHeals` (parameterized × 2 cold-tier
replications) | **All p0 segments on cold-tier (bug already fired)** |
Self-healing: all cold skipped → null → falls back to computed hot-tier →
**breaks the cascade** |
| `testIPChangeWithTierConfigPreservesDedupInvariant` | Instance-partition
change, no cold-tier segments | Tier filter does NOT interfere with the normal
IP-change path |
### Cold-tier replication coverage
Cold tier may legitimately have a different replication factor than hot
tier. Two of the most critical tests are parameterized via
`@DataProvider("coldTierReplications")` to run with:
- **2 cold-tier servers** (cold replication `<` `NUM_REPLICAS=3`)
- **3 cold-tier servers** (cold replication `==` `NUM_REPLICAS=3`)
The 3-server case is the more rigorous coverage: without the fix,
`isSameAssignment` would pass the size check (3 == 3) but fail content check,
then assign the new CONSUMING segment to cold-tier servers — the exact
production bug. The 2-server case fails earlier on the size mismatch. So the
parameterization directly exercises the cold-tier-assignment failure mode.
## Alternative Approaches Considered & Discarded
### Approach A: "Highest sequence number" heuristic
Return the assignment of the segment with the highest sequence number for
the partition, on the assumption that "highest seq = most recently committed =
always on hot-tier" (since `SegmentRelocator` moves oldest segments first).
**Discarded because**: Doesn't break the cascade. If a previous occurrence
of the bug already placed the latest segment on cold tier, the highest-seq
segment IS on cold tier, and the heuristic perpetuates the misplacement:
```
Cascade-corrupted state:
seg__0__0 → cold-tier (moved by SegmentRelocator)
seg__0__1 → cold-tier (placed there by previous bug occurrence)
seg__0__2 → cold-tier (placed there by previous bug occurrence)
assignSegment(seg__0__3) with "highest seq" heuristic:
→ returns seg__0__2's cold-tier assignment
→ seg__0__3 assigned to cold-tier
→ cascade continues forever ✗
```
The fix needs to **stop the bleeding** i.e. once deployed, even tables that
were corrupted by the bug should route *new* segments back to hot tier. (Note:
this does NOT retroactively repair already-misplaced segments — those remain on
cold tier until an operator runs a table rebalance.)
### Approach B: Override `assignSegment` entirely in the subclass (Just a
different impl way)
Duplicate all of `assignSegment`'s orchestration (preconditions, partition
ID, candidate calculation, mismatch comparison, metrics, logging) in
`MultiTierStrictRealtimeSegmentAssignment` so the base class needs no changes.
**Discarded because**:
- Duplicated orchestration logic
- Requires making `getPartitionId`, `isSameAssignment`,
`getExistingAssignment` protected (widening their visibility)
- Risk of silent divergence when base logic evolves (new metrics, new log
fields, new edge case handling)
### Approach C: Skip segments with sequence number lower than the highest
hot-tier segment
Iterate to find the highest-seq segment on hot-tier; use that as the
reference.
**Discarded because**: Same cascade problem as Approach A. In the corrupted
state, there is no hot-tier segment for the partition, and we'd return some
arbitrary cold-tier segment. Adding "if no hot-tier segment, return null" is
equivalent to Approach B for the cascade case.
---
## Backwards Compatibility
| Path | Behavior |
|---|---|
| Plain realtime tables (`RealtimeSegmentAssignment`) | Unaffected — not in
the changed class hierarchy |
| Upsert tables (`SingleTierStrictRealtimeSegmentAssignment`) | Identical to
pre-change (default `getTierInstances` returns empty set; filter never fires) |
| Dedup tables without tier config
(`MultiTierStrictRealtimeSegmentAssignment`, no tiers) | Identical to
pre-change (`getTierConfigsList()` returns null → empty set returned without ZK
calls) |
| Dedup tables with tier config but no tier IPs persisted in ZK yet |
Identical to pre-change (no segments could have been moved to non-existent
tiers, so empty tier set is correct) |
| Dedup tables with tier config + tier IPs in ZK | New behavior: cold-tier
segments excluded from existing-assignment lookup (the fix) |
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]