xiangfu0 commented on code in PR #18642: URL: https://github.com/apache/pinot/pull/18642#discussion_r3366657509
########## pinot-materialized-view/src/main/java/org/apache/pinot/materializedview/metadata/MaterializedViewPartitionManager.java: ########## @@ -0,0 +1,499 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.materializedview.metadata; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Function; +import javax.annotation.Nullable; +import org.apache.helix.store.HelixPropertyStore; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.helix.zookeeper.zkclient.exception.ZkException; +import org.apache.pinot.materializedview.scheduler.MaterializedViewTaskUtils; +import org.apache.pinot.spi.utils.CommonConstants.MaterializedViewTask; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/// Single source of truth for every per-partition state transition on the +/// [MaterializedViewRuntimeMetadata] runtime znode. +/// +/// <h3>Why this class exists</h3> +/// +/// Before this manager landed, the runtime znode was mutated by three separate sites — the +/// minion executor (APPEND / OVERWRITE / DELETE task commits), the scheduler (false-positive +/// STALE reverts), and the consistency manager (VALID → STALE markings). Each site shipped +/// its own version-checked CAS loop, with subtly different retry budgets, backoff jitter, +/// and exception-classification rules. Subsequent state-machine changes (e.g. extending the +/// VACANT → STALE synthesize for in-coverage backfill, or revoking the VALID-empty state) +/// would have required coordinated edits at all three sites — exactly the failure mode that +/// caused the earlier `computeWindowFingerprint` duplication scare. +/// +/// This class consolidates all per-partition mutations behind a state-change DSL. Each +/// public method maps to exactly one operation in the per-partition state machine; the +/// ZK CAS retry loop, the writer-side `validateForPersist` invariant gate, and the watermark +/// recompute on APPEND all live behind the public API. Callers express intent +/// (`appendValid`, `markStale`, ...) and never see the persistence machinery. +/// +/// <h3>Architecture</h3> +/// +/// Three layers, deliberately separated: +/// +/// - **Public state-change DSL** — one method per state-machine op. Each method has a +/// well-defined precondition and postcondition (see per-method javadoc) and selects the +/// appropriate retry profile. Callers never construct `PartitionInfo` directly. +/// - **Private CRUD primitives** — `addPartitionInfo` / `updatePartitionInfo` / +/// `removePartitionInfo`. Each enforces a structural invariant on the in-memory map +/// (e.g. "cannot add an already-existing entry") that fail-loud on violation, surfacing +/// races as observable exceptions instead of silent overwrites. +/// - **Single CAS engine** — `applyMutation` runs a caller-supplied mutator under the +/// fetch / mutate / version-checked-write loop, with retry classification: +/// - [MaterializedViewRuntimeMetadataUtils.CasConflictException] → re-fetch + retry +/// - other [ZkException] (transport / session) → log warn + retry +/// - [IllegalStateException] / [IllegalArgumentException] (validateForPersist) → +/// propagate (fail-fast, no retry) +/// - any other exception → propagate +/// +/// <h3>Retry profile</h3> +/// +/// Two profiles, calibrated to op semantics: +/// +/// - **Critical** ([#DEFAULT_CRITICAL_MAX_ATTEMPTS] = 128, cluster-tunable via +/// `pinot.materialized.view.executor.runtime.update.max.attempts`) — used by every +/// state-changing op except `revertValid`. Failure forces a minion task retry at the +/// Helix level, which is far more expensive than a CAS retry, so the budget is sized +/// to absorb realistic contention with `maxTasksPerBatch` parallel writers. +/// - **Revert** ([#REVERT_MAX_ATTEMPTS] = 8) — used by `revertValid` only. Failure +/// means we did not avoid one OVERWRITE task, which the next scheduling cycle will +/// either run for real or revert again. Spending 128 retries to save one task is +/// wasteful. +/// +/// <h3>Concurrency</h3> +/// +/// The manager is thread-safe and stateless above the ZK property store handle. Concurrent +/// callers contending on the same MV runtime znode are serialized by ZK CAS; one thread's +/// commit invalidates the other's snapshot, the loser re-fetches and re-runs its mutator. +/// Different MVs map to different znodes — no cross-MV contention. +/// +/// <h3>Scope discipline</h3> +/// +/// This class manages exactly the partition map and `watermarkMs` field of +/// [MaterializedViewRuntimeMetadata]. It MUST NOT acquire other znodes +/// (e.g. [MaterializedViewDefinitionMetadata]), call source-table or table-config services, +/// or grow a method count beyond the public DSL listed below. See `red lines` in the PR +/// description. +public final class MaterializedViewPartitionManager { + private static final Logger LOGGER = LoggerFactory.getLogger(MaterializedViewPartitionManager.class); + + /// Compile-time default for the critical retry budget. Overridable per cluster (no + /// restart) via `pinot.materialized.view.executor.runtime.update.max.attempts`. Sized so + /// `maxTasksPerBatch` parallel APPEND completions cannot exhaust the budget under realistic + /// contention. Reused by ConsistencyMgr-style mark ops so a STALE marking is never + /// silently dropped. + static final int DEFAULT_CRITICAL_MAX_ATTEMPTS = 128; + + /// Compile-time retry budget for `revertValid` only. Failure is recovered by the next + /// scheduling cycle (the partition stays STALE and is retried), so a small budget is + /// appropriate. Not cluster-tunable — operationally, raising this would not buy + /// correctness, only delay. + static final int REVERT_MAX_ATTEMPTS = 8; + + /// Backoff envelope for the critical profile. Total wait at the cap with worst-case + /// jitter is about `128 * (50 + 150) = 25.6 s`, well within the implicit per-task timeout. + private static final long CRITICAL_BACKOFF_BASE_MS = 50L; + private static final int CRITICAL_BACKOFF_JITTER_MS = 150; + + /// Backoff envelope for the revert profile. Total wait at the cap is about + /// `8 * (5 + 20) = 200 ms`. Tight CAS-loop livelock is the failure to defend against; + /// scheduler thread blocking is the failure to avoid. + private static final long REVERT_BACKOFF_BASE_MS = 5L; + private static final int REVERT_BACKOFF_JITTER_MS = 20; + + private final HelixPropertyStore<ZNRecord> _propertyStore; + + /// Optional live cluster-config reader used to override [#DEFAULT_CRITICAL_MAX_ATTEMPTS] + /// at runtime. Null in unit tests; null reader falls back to the compile-time default. + @Nullable + private final Function<String, String> _clusterConfigReader; + + public MaterializedViewPartitionManager(HelixPropertyStore<ZNRecord> propertyStore, + @Nullable Function<String, String> clusterConfigReader) { + Preconditions.checkArgument(propertyStore != null, "propertyStore must not be null"); + _propertyStore = propertyStore; + _clusterConfigReader = clusterConfigReader; + } + + // ───────────────────────────────────────────────────────────────────────────────────── + // Public state-change DSL + // ───────────────────────────────────────────────────────────────────────────────────── + + /// APPEND op: VACANT → VALID(fp). Invoked by the minion executor at task-commit time + /// after a successful materialization of a window that was previously absent from the + /// partition map. + /// + /// Side effects (all in a single CAS write): + /// + /// - Inserts a new `PartitionInfo(VALID, fp, now)` at `windowStartMs`. + /// - Advances `watermarkMs` to the highest contiguous VALID upper bound starting at + /// the prior watermark, derived via + /// [MaterializedViewTaskUtils#computeContiguousUpperMs] over the resulting map. + /// The bucket size used for the walk is `windowEndMs - windowStartMs`. + /// + /// Watermark advancement is bundled with the bucket transition because watermark is a + /// derived field of the map state and must stay consistent under concurrent writers. + /// Splitting them across two CAS writes would create a transient window where the map + /// has the new bucket but the watermark is stale. + /// + /// Strict precondition: `windowStartMs` must NOT be present in the map. A violation + /// (raced double-dispatch from the scheduler, mid-flight task replay) throws + /// [IllegalStateException] inside the CAS loop, which propagates without retry — the + /// scheduler dispatch invariant guarantees one APPEND per bucket per cycle, so this + /// firing indicates a real bug, not transient contention. + /// + /// Retry profile: critical (cluster-tunable budget). + public void appendValid(String tableNameWithType, long windowStartMs, long windowEndMs, + PartitionFingerprint fingerprint) { + Preconditions.checkArgument(fingerprint != null, "fingerprint must not be null"); + Preconditions.checkArgument(windowEndMs > windowStartMs, + "Invalid window: windowEndMs (%s) <= windowStartMs (%s) for table %s", + windowEndMs, windowStartMs, tableNameWithType); + long bucketMs = windowEndMs - windowStartMs; + applyMutation(tableNameWithType, current -> { + Preconditions.checkState(current != null, + "appendValid called before MV runtime znode was initialized for table: %s " + + "(cold-start path skipped?)", tableNameWithType); + Map<Long, PartitionInfo> updated = new HashMap<>(current.getPartitions()); + addPartitionInfo(updated, windowStartMs, + new PartitionInfo(PartitionState.VALID, fingerprint, System.currentTimeMillis())); + long newWatermarkMs = MaterializedViewTaskUtils.computeContiguousUpperMs( + current.getWatermarkMs(), updated, bucketMs); + LOGGER.info("appendValid: table={} bucket={} watermarkMs {}->{} (partitions={})", + tableNameWithType, windowStartMs, current.getWatermarkMs(), newWatermarkMs, updated.size()); + return new MaterializedViewRuntimeMetadata( + current.getMaterializedViewTableNameWithType(), newWatermarkMs, updated); + }, criticalMaxAttempts(), CRITICAL_BACKOFF_BASE_MS, CRITICAL_BACKOFF_JITTER_MS); + } + + /// OVERWRITE op: STALE → VALID(fp). Invoked by the minion executor at task-commit time + /// after re-materializing a STALE bucket whose source-side fingerprint has actually + /// changed. Updates the in-place fingerprint and refreshes `lastRefreshTime`; watermark + /// is unchanged because OVERWRITE only refreshes existing coverage. + /// + /// Strict precondition: `windowStartMs` must be present and STALE. A non-STALE entry + /// indicates the scheduler dispatch invariant was violated and throws + /// [IllegalStateException] without retry. + /// + /// Retry profile: critical. + public void refreshValid(String tableNameWithType, long windowStartMs, + PartitionFingerprint fingerprint) { + Preconditions.checkArgument(fingerprint != null, "fingerprint must not be null"); + applyMutation(tableNameWithType, current -> { + Preconditions.checkState(current != null, + "refreshValid called before MV runtime znode was initialized for table: %s", + tableNameWithType); + Map<Long, PartitionInfo> updated = new HashMap<>(current.getPartitions()); + PartitionInfo existing = updated.get(windowStartMs); + Preconditions.checkState(existing != null && existing.getState() == PartitionState.STALE, + "refreshValid expects bucket %s to be STALE for table %s, got: %s", + windowStartMs, tableNameWithType, existing); + updatePartitionInfo(updated, windowStartMs, + new PartitionInfo(PartitionState.VALID, fingerprint, System.currentTimeMillis())); + LOGGER.info("refreshValid: table={} bucket={} STALE->VALID (fp={})", + tableNameWithType, windowStartMs, fingerprint); + return new MaterializedViewRuntimeMetadata( + current.getMaterializedViewTableNameWithType(), current.getWatermarkMs(), updated); + }, criticalMaxAttempts(), CRITICAL_BACKOFF_BASE_MS, CRITICAL_BACKOFF_JITTER_MS); + } + + /// DELETE op: STALE → VALID(EMPTY). Invoked by the minion executor at task-commit time + /// after retention-deleting MV segments for a window whose source data is now empty. + /// Persists the `VALID + PartitionFingerprint.EMPTY` shape rather than removing the + /// entry — the coverage model treats every processed bucket as "tracked", so a later + /// base-table backfill flows through the standard VALID → STALE → OVERWRITE cycle. + /// + /// <p><b>Design context (open question):</b> The continued existence of the `VALID-empty` + /// shape is under review. An alternative model removes the entry entirely on DELETE + /// (treating empty buckets as VACANT), which requires extending + /// [MaterializedViewTaskUtils#computeContiguousUpperMs] to walk past VACANT buckets and + /// extending the consistency manager to synthesize STALE entries for in-coverage VACANT + /// buckets on backfill. Until that design lands, this method preserves today's + /// `VALID-empty` semantics so the scheduler's contiguous-VALID watermark walk continues + /// to work without modification. + /// + /// Strict precondition: `windowStartMs` must be present and STALE. Non-STALE entries + /// indicate the scheduler dispatch invariant was violated; fail-loud surfaces the bug. + /// + /// Retry profile: critical. + public void clearValid(String tableNameWithType, long windowStartMs) { + applyMutation(tableNameWithType, current -> { + Preconditions.checkState(current != null, + "clearValid called before MV runtime znode was initialized for table: %s", + tableNameWithType); + Map<Long, PartitionInfo> updated = new HashMap<>(current.getPartitions()); + PartitionInfo existing = updated.get(windowStartMs); + Preconditions.checkState(existing != null && existing.getState() == PartitionState.STALE, + "clearValid expects bucket %s to be STALE for table %s, got: %s", + windowStartMs, tableNameWithType, existing); + updatePartitionInfo(updated, windowStartMs, + new PartitionInfo(PartitionState.VALID, PartitionFingerprint.EMPTY, + System.currentTimeMillis())); + LOGGER.info("clearValid: table={} bucket={} STALE->VALID-empty", + tableNameWithType, windowStartMs); + return new MaterializedViewRuntimeMetadata( + current.getMaterializedViewTableNameWithType(), current.getWatermarkMs(), updated); + }, criticalMaxAttempts(), CRITICAL_BACKOFF_BASE_MS, CRITICAL_BACKOFF_JITTER_MS); + } + + /// STALE_REVERT op: STALE → VALID, fingerprint and `lastRefreshTime` preserved. Invoked + /// by the scheduler after a precise fingerprint comparison concludes that a STALE marking + /// was a false positive (the source did not actually change in the affected window). The + /// underlying data was not re-materialized, so neither fingerprint nor `lastRefreshTime` + /// move. + /// + /// Lenient precondition: if the bucket is no longer STALE on retry (either already + /// reverted by a concurrent write or the consistency manager re-marked it for a new + /// reason), the call exits successfully without writing — the desired transition is + /// already done or no longer applicable. + /// + /// Retry profile: revert (small budget; failure is recovered by the next scheduling + /// cycle). + public void revertValid(String tableNameWithType, long windowStartMs) { + applyMutation(tableNameWithType, current -> { + if (current == null) { + LOGGER.debug("revertValid: runtime znode missing for table: {}; skipping", + tableNameWithType); + return null; + } + PartitionInfo existing = current.getPartitions().get(windowStartMs); + if (existing == null || existing.getState() != PartitionState.STALE) { + LOGGER.info("revertValid: bucket {} for table {} is not STALE (got: {}); skipping", + windowStartMs, tableNameWithType, existing); + return null; + } + Map<Long, PartitionInfo> updated = new HashMap<>(current.getPartitions()); + updatePartitionInfo(updated, windowStartMs, existing.withState(PartitionState.VALID)); + LOGGER.info("revertValid: table={} bucket={} STALE->VALID (false-positive, fp={})", + tableNameWithType, windowStartMs, existing.getFingerprint()); + return new MaterializedViewRuntimeMetadata( + current.getMaterializedViewTableNameWithType(), current.getWatermarkMs(), updated); + }, REVERT_MAX_ATTEMPTS, REVERT_BACKOFF_BASE_MS, REVERT_BACKOFF_JITTER_MS); + } + + /// MARK_STALE op (single-bucket): VALID → STALE. Invoked by the consistency manager (and + /// future manual REFRESH commands) when base-table data has changed in a way that affects + /// `windowStartMs`. Fingerprint and `lastRefreshTime` are preserved so the scheduler's + /// false-positive revert can compare against the prior snapshot. + /// + /// Lenient: if the bucket is absent or already STALE, the call is a no-op. Today the + /// in-coverage VACANT case is a deliberate skip (matching the prior consistency manager + /// behavior); a future change will synthesize a STALE entry for in-coverage VACANT + /// buckets to fix the backfill silent-data-loss case. + /// Review Comment: The implemented design uses the `VALID-empty` approach rather than `VACANT -> STALE` synthesis: `DELETE` leaves the bucket `VALID-empty` (not `VACANT`), so a backfill flows through the standard `VALID -> STALE -> OVERWRITE` cycle, and the narrow commit-vs-backfill residual is closed by a new periodic `VALID-empty` re-evaluation sweep (8e1295a357). No `VACANT -> STALE` synthesis is required for correctness; absent buckets remain a no-op and the broker routes those queries to the base table. The code matches the design-notes line that frames `VACANT -> STALE` synthesis as future work; the PR description's earlier 'can synthesize VACANT -> STALE' wording (Bugfix summary + operation/transition tables) is now outdated and should be updated to match. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
