hongkunxu opened a new pull request, #18642:
URL: https://github.com/apache/pinot/pull/18642
## Title
[MV] Harden materialized-view partition-state engine: watermark cap, DELETE
backfill, and centralized mutation entry point
## Description
This PR hardens the materialized-view partition-state engine in three areas:
it
fixes two correctness bugs that let stale data look fresh and let backfilled
buckets silently fail to re-sync; it consolidates every per-partition
mutation
behind a single CAS engine so future state transitions cannot drift across
the
executor, scheduler, and consistency manager; and it centralizes the
window-fingerprint algorithm that those three components use to agree on
"what the source looked like at task time".
### Design reference
The full state machine — partition states, operations, allowed transitions,
and how the consistency manager keeps the partition map in sync with
base-table segment changes — is documented here:
> [MV partition state engine — design notes (Google Doc)](<link>)
It contains four sections that together describe the contract this PR
enforces:
1. **Partition state definition** — `VACANT` / `VALID` / `STALE` and what
each
one means physically (entry presence, fingerprint, last-refresh time).
2. **Operation definition** — the seven operations
(`MinionTaskCycle_APPEND` / `_OVERWRITE` / `_CLEAR`,
`ConsistencyMgr_MARK_STALE`, `Scheduler_REVERT_VALID`,
`Manual_REFRESH_MV`, `Manual_DROP_MV_PARTITION`),
their triggers (Auto vs Manual), and which actor invokes each.
3. **State transition table** — the 3×3 from-state × to-state matrix listing
every operation that drives each transition.
4. **Sync with base-table change** — what the consistency manager does when
it
observes a base segment add/update/delete event, and which transitions get
produced as a result.
This PR is the first implementation pass that fully matches that document;
the
post-merge state machine has a single owner
(`MaterializedViewPartitionManager`)
and one test surface that exercises every documented transition.
### Why introduce `MaterializedViewPartitionManager`
Before this PR the per-partition mutation logic was spread across three
sites — the minion executor, the controller-side task scheduler, and the
consistency manager — each shipping its **own** ~50-line CAS-write retry loop
with its own retry budget, its own jittered backoff, and its own
hand-rolled `Map<Long, PartitionInfo>` copy/edit code:
| Site | Trigger | Bespoke CAS loop |
|------|---------|------------------|
| Executor | `APPEND` / `OVERWRITE` / `DELETE` task commit |
`DEFAULT_MAX_RUNTIME_UPDATE_ATTEMPTS` |
| Scheduler | False-positive `STALE` revert |
`MAX_PARTITION_STATE_PERSIST_RETRIES` (8, fixed) |
| Consistency mgr | Segment-change flush | `MAX_MARK_RETRIES` +
`ThreadLocalRandom` backoff |
Concrete problems this caused:
- Three retry/backoff policies that could (and did) drift apart whenever any
one was tuned. Operators had no single knob to control "MV runtime znode
contention pressure".
- Watermark advancement on `APPEND` was a separate ZK write from the bucket
insert in some refactor proposals; under concurrent writers this opens a
window where the partition map and `watermarkMs` disagree, which in turn
poisons the broker's `now - watermarkMs <= stalenessThresholdMs` check.
- Adding a new state to the state machine required touching ~3 files with
~3 different idioms. Every new transition was an opportunity to forget a
precondition check (e.g. `existing.getState() == STALE` before
`OVERWRITE`).
- Cross-package callers could synthesize arbitrary `PartitionInfo` objects
directly, bypassing every invariant the manager would otherwise enforce.
The new design replaces all three loops with one CAS engine
(`MaterializedViewPartitionManager#applyMutation`), exposes one public method
per documented operation (`appendValid` / `refreshValid` / `clearValid` /
`revertValid` / `markStale` / `deletePartition`), and bundles watermark
advancement into the same atomic write that mutates the bucket. The
`PartitionInfo` constructor is locked to package access so production code
outside the metadata package physically cannot bypass the manager — tests
opt in via a single named factory `PartitionInfo.forTesting(...)` annotated
`@VisibleForTesting`.
Net effect: one retry budget, one backoff implementation, one set of
preconditions, one place to add the next state. Future transitions
(`VACANT → STALE` synthesize for in-coverage backfill, an explicit
`VALID-empty` state, etc.) become a one-method addition with predictable
behavior, not a three-site coordination exercise.
### Bug fixes that landed alongside the refactor
Two separate correctness bugs were fixed as part of this pass — both touch
the same partition-state machinery the manager now owns, and both now have
regression tests in the manager's unit-test surface:
1. **Watermark over-advance past latest source data.** The MV scheduler kept
advancing `watermarkMs` toward `now - bufferMs` even when the base table
had stopped receiving new data, which made the broker's freshness check
`now - watermarkMs <= stalenessThresholdMs` return `true` for stale MV
data. `watermarkMs` is now capped at the latest `endTime` present in the
source segments, so the staleness check reflects real data freshness
instead of wall-clock drift.
2. **Backfill into a previously-deleted bucket silently dropped.** When a
`STALE` bucket's source data was retention-deleted, the executor's
`DELETE` branch removed the runtime `PartitionInfo` entry entirely. That
left the bucket in the `absent` state, which the consistency manager's
`markStale` pass deliberately skips, so any subsequent base-table
backfill into that window never propagated to the MV. The `DELETE`
branch now writes `VALID + PartitionFingerprint.EMPTY` instead, so the
bucket follows the standard `VALID → STALE → OVERWRITE` cycle on
backfill.
`PartitionFingerprint.EMPTY` is byte-identical to what
`computeWindowFingerprint` already produces when the overlapping segment
list is empty (both feed an empty input through `farmHashFingerprint64`),
so existing ZK records remain comparable across rolling upgrades — no
migration required.
### Other clean-ups bundled in
- Two verbatim copies of the
filter + sort + `farmHashFingerprint64` window-fingerprint algorithm
(one in the scheduler, one in the executor) are collapsed into
`MaterializedViewTaskUtils#computeWindowFingerprint`. Drift between the
two copies would silently break the executor's commit-time fingerprint
validation and produce tasks that retry forever without advancing the
watermark — one of the worst classes of MV bugs to triage.
### Backward compatibility / rolling upgrade
- ZK format unchanged. The DELETE-branch fix relies on
`PartitionFingerprint.EMPTY` being byte-identical to the
empty-overlap fingerprint, so znodes written by the old executor remain
comparable to znodes written by the new one.
- No public API or REST surface changes.
- `PartitionInfo` constructor is now package-private; the only production
callers were already inside the `metadata` package. Tests use the new
`PartitionInfo.forTesting(...)` factory.
### Tests
- New: `MaterializedViewPartitionManagerTest` exercises every public method,
including CAS retry / version-conflict paths and precondition violations.
- Updated: scheduler / executor / consistency-manager / rewrite-engine tests
rebased onto the manager and the `PartitionInfo.forTesting` factory.
- Manual end-to-end validation against a local Pinot cluster: created
`airlineStats_mv_hardening_smoke` with `1m` refresh and exercised the
segment-delete → STALE → executor-DELETE → backfill path; verified that
the `VALID-empty` write keeps the bucket re-syncing on backfill, and that
the watermark stops advancing once base-table ingestion stalls.
### Suggested labels
`bugfix`, `refactor`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]