hongkunxu opened a new pull request, #18642:
URL: https://github.com/apache/pinot/pull/18642

   ## Title
   
   [MV] Harden materialized-view partition-state engine: watermark cap, DELETE 
backfill, and centralized mutation entry point
   
   ## Description
   
   This PR hardens the materialized-view partition-state engine in three areas: 
it
   fixes two correctness bugs that let stale data look fresh and let backfilled
   buckets silently fail to re-sync; it consolidates every per-partition 
mutation
   behind a single CAS engine so future state transitions cannot drift across 
the
   executor, scheduler, and consistency manager; and it centralizes the
   window-fingerprint algorithm that those three components use to agree on
   "what the source looked like at task time".
   
   ### Design reference
   
   The full state machine — partition states, operations, allowed transitions,
   and how the consistency manager keeps the partition map in sync with
   base-table segment changes — is documented here:
   
   > [MV partition state engine — design notes (Google Doc)](<link>)
   
   It contains four sections that together describe the contract this PR 
enforces:
   
   1. **Partition state definition** — `VACANT` / `VALID` / `STALE` and what 
each
      one means physically (entry presence, fingerprint, last-refresh time).
   2. **Operation definition** — the seven operations
      (`MinionTaskCycle_APPEND` / `_OVERWRITE` / `_CLEAR`,
      `ConsistencyMgr_MARK_STALE`, `Scheduler_REVERT_VALID`,
      `Manual_REFRESH_MV`, `Manual_DROP_MV_PARTITION`),
      their triggers (Auto vs Manual), and which actor invokes each.
   3. **State transition table** — the 3×3 from-state × to-state matrix listing
      every operation that drives each transition.
   4. **Sync with base-table change** — what the consistency manager does when 
it
      observes a base segment add/update/delete event, and which transitions get
      produced as a result.
   
   This PR is the first implementation pass that fully matches that document; 
the
   post-merge state machine has a single owner 
(`MaterializedViewPartitionManager`)
   and one test surface that exercises every documented transition.
   
   ### Why introduce `MaterializedViewPartitionManager`
   
   Before this PR the per-partition mutation logic was spread across three
   sites — the minion executor, the controller-side task scheduler, and the
   consistency manager — each shipping its **own** ~50-line CAS-write retry loop
   with its own retry budget, its own jittered backoff, and its own
   hand-rolled `Map<Long, PartitionInfo>` copy/edit code:
   
   | Site | Trigger | Bespoke CAS loop |
   |------|---------|------------------|
   | Executor | `APPEND` / `OVERWRITE` / `DELETE` task commit | 
`DEFAULT_MAX_RUNTIME_UPDATE_ATTEMPTS` |
   | Scheduler | False-positive `STALE` revert | 
`MAX_PARTITION_STATE_PERSIST_RETRIES` (8, fixed) |
   | Consistency mgr | Segment-change flush | `MAX_MARK_RETRIES` + 
`ThreadLocalRandom` backoff |
   
   Concrete problems this caused:
   
   - Three retry/backoff policies that could (and did) drift apart whenever any
     one was tuned. Operators had no single knob to control "MV runtime znode
     contention pressure".
   - Watermark advancement on `APPEND` was a separate ZK write from the bucket
     insert in some refactor proposals; under concurrent writers this opens a
     window where the partition map and `watermarkMs` disagree, which in turn
     poisons the broker's `now - watermarkMs <= stalenessThresholdMs` check.
   - Adding a new state to the state machine required touching ~3 files with
     ~3 different idioms. Every new transition was an opportunity to forget a
     precondition check (e.g. `existing.getState() == STALE` before 
`OVERWRITE`).
   - Cross-package callers could synthesize arbitrary `PartitionInfo` objects
     directly, bypassing every invariant the manager would otherwise enforce.
   
   The new design replaces all three loops with one CAS engine
   (`MaterializedViewPartitionManager#applyMutation`), exposes one public method
   per documented operation (`appendValid` / `refreshValid` / `clearValid` /
   `revertValid` / `markStale` / `deletePartition`), and bundles watermark
   advancement into the same atomic write that mutates the bucket. The
   `PartitionInfo` constructor is locked to package access so production code
   outside the metadata package physically cannot bypass the manager — tests
   opt in via a single named factory `PartitionInfo.forTesting(...)` annotated
   `@VisibleForTesting`.
   
   Net effect: one retry budget, one backoff implementation, one set of
   preconditions, one place to add the next state. Future transitions
   (`VACANT → STALE` synthesize for in-coverage backfill, an explicit
   `VALID-empty` state, etc.) become a one-method addition with predictable
   behavior, not a three-site coordination exercise.
   
   ### Bug fixes that landed alongside the refactor
   
   Two separate correctness bugs were fixed as part of this pass — both touch
   the same partition-state machinery the manager now owns, and both now have
   regression tests in the manager's unit-test surface:
   
   1. **Watermark over-advance past latest source data.** The MV scheduler kept
      advancing `watermarkMs` toward `now - bufferMs` even when the base table
      had stopped receiving new data, which made the broker's freshness check
      `now - watermarkMs <= stalenessThresholdMs` return `true` for stale MV
      data. `watermarkMs` is now capped at the latest `endTime` present in the
      source segments, so the staleness check reflects real data freshness
      instead of wall-clock drift.
   
   2. **Backfill into a previously-deleted bucket silently dropped.** When a
      `STALE` bucket's source data was retention-deleted, the executor's
      `DELETE` branch removed the runtime `PartitionInfo` entry entirely. That
      left the bucket in the `absent` state, which the consistency manager's
      `markStale` pass deliberately skips, so any subsequent base-table
      backfill into that window never propagated to the MV. The `DELETE`
      branch now writes `VALID + PartitionFingerprint.EMPTY` instead, so the
      bucket follows the standard `VALID → STALE → OVERWRITE` cycle on
      backfill.
   
      `PartitionFingerprint.EMPTY` is byte-identical to what
      `computeWindowFingerprint` already produces when the overlapping segment
      list is empty (both feed an empty input through `farmHashFingerprint64`),
      so existing ZK records remain comparable across rolling upgrades — no
      migration required.
   
   ### Other clean-ups bundled in
   
   - Two verbatim copies of the
     filter + sort + `farmHashFingerprint64` window-fingerprint algorithm
     (one in the scheduler, one in the executor) are collapsed into
     `MaterializedViewTaskUtils#computeWindowFingerprint`. Drift between the
     two copies would silently break the executor's commit-time fingerprint
     validation and produce tasks that retry forever without advancing the
     watermark — one of the worst classes of MV bugs to triage.
   
   ### Backward compatibility / rolling upgrade
   
   - ZK format unchanged. The DELETE-branch fix relies on
     `PartitionFingerprint.EMPTY` being byte-identical to the
     empty-overlap fingerprint, so znodes written by the old executor remain
     comparable to znodes written by the new one.
   - No public API or REST surface changes.
   - `PartitionInfo` constructor is now package-private; the only production
     callers were already inside the `metadata` package. Tests use the new
     `PartitionInfo.forTesting(...)` factory.
   
   ### Tests
   
   - New: `MaterializedViewPartitionManagerTest` exercises every public method,
     including CAS retry / version-conflict paths and precondition violations.
   - Updated: scheduler / executor / consistency-manager / rewrite-engine tests
     rebased onto the manager and the `PartitionInfo.forTesting` factory.
   - Manual end-to-end validation against a local Pinot cluster: created
     `airlineStats_mv_hardening_smoke` with `1m` refresh and exercised the
     segment-delete → STALE → executor-DELETE → backfill path; verified that
     the `VALID-empty` write keeps the bucket re-syncing on backfill, and that
     the watermark stops advancing once base-table ingestion stalls.
   
   ### Suggested labels
   
   `bugfix`, `refactor`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to