xiangfu0 commented on code in PR #18642:
URL: https://github.com/apache/pinot/pull/18642#discussion_r3366657509


##########
pinot-materialized-view/src/main/java/org/apache/pinot/materializedview/metadata/MaterializedViewPartitionManager.java:
##########
@@ -0,0 +1,499 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.materializedview.metadata;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.helix.store.HelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.zkclient.exception.ZkException;
+import org.apache.pinot.materializedview.scheduler.MaterializedViewTaskUtils;
+import org.apache.pinot.spi.utils.CommonConstants.MaterializedViewTask;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/// Single source of truth for every per-partition state transition on the
+/// [MaterializedViewRuntimeMetadata] runtime znode.
+///
+/// <h3>Why this class exists</h3>
+///
+/// Before this manager landed, the runtime znode was mutated by three 
separate sites — the
+/// minion executor (APPEND / OVERWRITE / DELETE task commits), the scheduler 
(false-positive
+/// STALE reverts), and the consistency manager (VALID → STALE markings).  
Each site shipped
+/// its own version-checked CAS loop, with subtly different retry budgets, 
backoff jitter,
+/// and exception-classification rules.  Subsequent state-machine changes 
(e.g. extending the
+/// VACANT → STALE synthesize for in-coverage backfill, or revoking the 
VALID-empty state)
+/// would have required coordinated edits at all three sites — exactly the 
failure mode that
+/// caused the earlier `computeWindowFingerprint` duplication scare.
+///
+/// This class consolidates all per-partition mutations behind a state-change 
DSL.  Each
+/// public method maps to exactly one operation in the per-partition state 
machine; the
+/// ZK CAS retry loop, the writer-side `validateForPersist` invariant gate, 
and the watermark
+/// recompute on APPEND all live behind the public API.  Callers express intent
+/// (`appendValid`, `markStale`, ...) and never see the persistence machinery.
+///
+/// <h3>Architecture</h3>
+///
+/// Three layers, deliberately separated:
+///
+///   - **Public state-change DSL** — one method per state-machine op.  Each 
method has a
+///     well-defined precondition and postcondition (see per-method javadoc) 
and selects the
+///     appropriate retry profile.  Callers never construct `PartitionInfo` 
directly.
+///   - **Private CRUD primitives** — `addPartitionInfo` / 
`updatePartitionInfo` /
+///     `removePartitionInfo`.  Each enforces a structural invariant on the 
in-memory map
+///     (e.g. "cannot add an already-existing entry") that fail-loud on 
violation, surfacing
+///     races as observable exceptions instead of silent overwrites.
+///   - **Single CAS engine** — `applyMutation` runs a caller-supplied mutator 
under the
+///     fetch / mutate / version-checked-write loop, with retry classification:
+///       - [MaterializedViewRuntimeMetadataUtils.CasConflictException] → 
re-fetch + retry
+///       - other [ZkException] (transport / session) → log warn + retry
+///       - [IllegalStateException] / [IllegalArgumentException] 
(validateForPersist) →
+///         propagate (fail-fast, no retry)
+///       - any other exception → propagate
+///
+/// <h3>Retry profile</h3>
+///
+/// Two profiles, calibrated to op semantics:
+///
+///   - **Critical** ([#DEFAULT_CRITICAL_MAX_ATTEMPTS] = 128, cluster-tunable 
via
+///     `pinot.materialized.view.executor.runtime.update.max.attempts`) — used 
by every
+///     state-changing op except `revertValid`.  Failure forces a minion task 
retry at the
+///     Helix level, which is far more expensive than a CAS retry, so the 
budget is sized
+///     to absorb realistic contention with `maxTasksPerBatch` parallel 
writers.
+///   - **Revert** ([#REVERT_MAX_ATTEMPTS] = 8) — used by `revertValid` only.  
Failure
+///     means we did not avoid one OVERWRITE task, which the next scheduling 
cycle will
+///     either run for real or revert again.  Spending 128 retries to save one 
task is
+///     wasteful.
+///
+/// <h3>Concurrency</h3>
+///
+/// The manager is thread-safe and stateless above the ZK property store 
handle.  Concurrent
+/// callers contending on the same MV runtime znode are serialized by ZK CAS; 
one thread's
+/// commit invalidates the other's snapshot, the loser re-fetches and re-runs 
its mutator.
+/// Different MVs map to different znodes — no cross-MV contention.
+///
+/// <h3>Scope discipline</h3>
+///
+/// This class manages exactly the partition map and `watermarkMs` field of
+/// [MaterializedViewRuntimeMetadata].  It MUST NOT acquire other znodes
+/// (e.g. [MaterializedViewDefinitionMetadata]), call source-table or 
table-config services,
+/// or grow a method count beyond the public DSL listed below.  See `red 
lines` in the PR
+/// description.
+public final class MaterializedViewPartitionManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MaterializedViewPartitionManager.class);
+
+  /// Compile-time default for the critical retry budget.  Overridable per 
cluster (no
+  /// restart) via 
`pinot.materialized.view.executor.runtime.update.max.attempts`.  Sized so
+  /// `maxTasksPerBatch` parallel APPEND completions cannot exhaust the budget 
under realistic
+  /// contention.  Reused by ConsistencyMgr-style mark ops so a STALE marking 
is never
+  /// silently dropped.
+  static final int DEFAULT_CRITICAL_MAX_ATTEMPTS = 128;
+
+  /// Compile-time retry budget for `revertValid` only.  Failure is recovered 
by the next
+  /// scheduling cycle (the partition stays STALE and is retried), so a small 
budget is
+  /// appropriate.  Not cluster-tunable — operationally, raising this would 
not buy
+  /// correctness, only delay.
+  static final int REVERT_MAX_ATTEMPTS = 8;
+
+  /// Backoff envelope for the critical profile.  Total wait at the cap with 
worst-case
+  /// jitter is about `128 * (50 + 150) = 25.6 s`, well within the implicit 
per-task timeout.
+  private static final long CRITICAL_BACKOFF_BASE_MS = 50L;
+  private static final int CRITICAL_BACKOFF_JITTER_MS = 150;
+
+  /// Backoff envelope for the revert profile.  Total wait at the cap is about
+  /// `8 * (5 + 20) = 200 ms`.  Tight CAS-loop livelock is the failure to 
defend against;
+  /// scheduler thread blocking is the failure to avoid.
+  private static final long REVERT_BACKOFF_BASE_MS = 5L;
+  private static final int REVERT_BACKOFF_JITTER_MS = 20;
+
+  private final HelixPropertyStore<ZNRecord> _propertyStore;
+
+  /// Optional live cluster-config reader used to override 
[#DEFAULT_CRITICAL_MAX_ATTEMPTS]
+  /// at runtime.  Null in unit tests; null reader falls back to the 
compile-time default.
+  @Nullable
+  private final Function<String, String> _clusterConfigReader;
+
+  public MaterializedViewPartitionManager(HelixPropertyStore<ZNRecord> 
propertyStore,
+      @Nullable Function<String, String> clusterConfigReader) {
+    Preconditions.checkArgument(propertyStore != null, "propertyStore must not 
be null");
+    _propertyStore = propertyStore;
+    _clusterConfigReader = clusterConfigReader;
+  }
+
+  // 
─────────────────────────────────────────────────────────────────────────────────────
+  //  Public state-change DSL
+  // 
─────────────────────────────────────────────────────────────────────────────────────
+
+  /// APPEND op: VACANT → VALID(fp).  Invoked by the minion executor at 
task-commit time
+  /// after a successful materialization of a window that was previously 
absent from the
+  /// partition map.
+  ///
+  /// Side effects (all in a single CAS write):
+  ///
+  ///   - Inserts a new `PartitionInfo(VALID, fp, now)` at `windowStartMs`.
+  ///   - Advances `watermarkMs` to the highest contiguous VALID upper bound 
starting at
+  ///     the prior watermark, derived via
+  ///     [MaterializedViewTaskUtils#computeContiguousUpperMs] over the 
resulting map.
+  ///     The bucket size used for the walk is `windowEndMs - windowStartMs`.
+  ///
+  /// Watermark advancement is bundled with the bucket transition because 
watermark is a
+  /// derived field of the map state and must stay consistent under concurrent 
writers.
+  /// Splitting them across two CAS writes would create a transient window 
where the map
+  /// has the new bucket but the watermark is stale.
+  ///
+  /// Strict precondition: `windowStartMs` must NOT be present in the map.  A 
violation
+  /// (raced double-dispatch from the scheduler, mid-flight task replay) throws
+  /// [IllegalStateException] inside the CAS loop, which propagates without 
retry — the
+  /// scheduler dispatch invariant guarantees one APPEND per bucket per cycle, 
so this
+  /// firing indicates a real bug, not transient contention.
+  ///
+  /// Retry profile: critical (cluster-tunable budget).
+  public void appendValid(String tableNameWithType, long windowStartMs, long 
windowEndMs,
+      PartitionFingerprint fingerprint) {
+    Preconditions.checkArgument(fingerprint != null, "fingerprint must not be 
null");
+    Preconditions.checkArgument(windowEndMs > windowStartMs,
+        "Invalid window: windowEndMs (%s) <= windowStartMs (%s) for table %s",
+        windowEndMs, windowStartMs, tableNameWithType);
+    long bucketMs = windowEndMs - windowStartMs;
+    applyMutation(tableNameWithType, current -> {
+      Preconditions.checkState(current != null,
+          "appendValid called before MV runtime znode was initialized for 
table: %s "
+              + "(cold-start path skipped?)", tableNameWithType);
+      Map<Long, PartitionInfo> updated = new 
HashMap<>(current.getPartitions());
+      addPartitionInfo(updated, windowStartMs,
+          new PartitionInfo(PartitionState.VALID, fingerprint, 
System.currentTimeMillis()));
+      long newWatermarkMs = MaterializedViewTaskUtils.computeContiguousUpperMs(
+          current.getWatermarkMs(), updated, bucketMs);
+      LOGGER.info("appendValid: table={} bucket={} watermarkMs {}->{} 
(partitions={})",
+          tableNameWithType, windowStartMs, current.getWatermarkMs(), 
newWatermarkMs, updated.size());
+      return new MaterializedViewRuntimeMetadata(
+          current.getMaterializedViewTableNameWithType(), newWatermarkMs, 
updated);
+    }, criticalMaxAttempts(), CRITICAL_BACKOFF_BASE_MS, 
CRITICAL_BACKOFF_JITTER_MS);
+  }
+
+  /// OVERWRITE op: STALE → VALID(fp).  Invoked by the minion executor at 
task-commit time
+  /// after re-materializing a STALE bucket whose source-side fingerprint has 
actually
+  /// changed.  Updates the in-place fingerprint and refreshes 
`lastRefreshTime`; watermark
+  /// is unchanged because OVERWRITE only refreshes existing coverage.
+  ///
+  /// Strict precondition: `windowStartMs` must be present and STALE.  A 
non-STALE entry
+  /// indicates the scheduler dispatch invariant was violated and throws
+  /// [IllegalStateException] without retry.
+  ///
+  /// Retry profile: critical.
+  public void refreshValid(String tableNameWithType, long windowStartMs,
+      PartitionFingerprint fingerprint) {
+    Preconditions.checkArgument(fingerprint != null, "fingerprint must not be 
null");
+    applyMutation(tableNameWithType, current -> {
+      Preconditions.checkState(current != null,
+          "refreshValid called before MV runtime znode was initialized for 
table: %s",
+          tableNameWithType);
+      Map<Long, PartitionInfo> updated = new 
HashMap<>(current.getPartitions());
+      PartitionInfo existing = updated.get(windowStartMs);
+      Preconditions.checkState(existing != null && existing.getState() == 
PartitionState.STALE,
+          "refreshValid expects bucket %s to be STALE for table %s, got: %s",
+          windowStartMs, tableNameWithType, existing);
+      updatePartitionInfo(updated, windowStartMs,
+          new PartitionInfo(PartitionState.VALID, fingerprint, 
System.currentTimeMillis()));
+      LOGGER.info("refreshValid: table={} bucket={} STALE->VALID (fp={})",
+          tableNameWithType, windowStartMs, fingerprint);
+      return new MaterializedViewRuntimeMetadata(
+          current.getMaterializedViewTableNameWithType(), 
current.getWatermarkMs(), updated);
+    }, criticalMaxAttempts(), CRITICAL_BACKOFF_BASE_MS, 
CRITICAL_BACKOFF_JITTER_MS);
+  }
+
+  /// DELETE op: STALE → VALID(EMPTY).  Invoked by the minion executor at 
task-commit time
+  /// after retention-deleting MV segments for a window whose source data is 
now empty.
+  /// Persists the `VALID + PartitionFingerprint.EMPTY` shape rather than 
removing the
+  /// entry — the coverage model treats every processed bucket as "tracked", 
so a later
+  /// base-table backfill flows through the standard VALID → STALE → OVERWRITE 
cycle.
+  ///
+  /// <p><b>Design context (open question):</b> The continued existence of the 
`VALID-empty`
+  /// shape is under review.  An alternative model removes the entry entirely 
on DELETE
+  /// (treating empty buckets as VACANT), which requires extending
+  /// [MaterializedViewTaskUtils#computeContiguousUpperMs] to walk past VACANT 
buckets and
+  /// extending the consistency manager to synthesize STALE entries for 
in-coverage VACANT
+  /// buckets on backfill.  Until that design lands, this method preserves 
today's
+  /// `VALID-empty` semantics so the scheduler's contiguous-VALID watermark 
walk continues
+  /// to work without modification.
+  ///
+  /// Strict precondition: `windowStartMs` must be present and STALE.  
Non-STALE entries
+  /// indicate the scheduler dispatch invariant was violated; fail-loud 
surfaces the bug.
+  ///
+  /// Retry profile: critical.
+  public void clearValid(String tableNameWithType, long windowStartMs) {
+    applyMutation(tableNameWithType, current -> {
+      Preconditions.checkState(current != null,
+          "clearValid called before MV runtime znode was initialized for 
table: %s",
+          tableNameWithType);
+      Map<Long, PartitionInfo> updated = new 
HashMap<>(current.getPartitions());
+      PartitionInfo existing = updated.get(windowStartMs);
+      Preconditions.checkState(existing != null && existing.getState() == 
PartitionState.STALE,
+          "clearValid expects bucket %s to be STALE for table %s, got: %s",
+          windowStartMs, tableNameWithType, existing);
+      updatePartitionInfo(updated, windowStartMs,
+          new PartitionInfo(PartitionState.VALID, PartitionFingerprint.EMPTY,
+              System.currentTimeMillis()));
+      LOGGER.info("clearValid: table={} bucket={} STALE->VALID-empty",
+          tableNameWithType, windowStartMs);
+      return new MaterializedViewRuntimeMetadata(
+          current.getMaterializedViewTableNameWithType(), 
current.getWatermarkMs(), updated);
+    }, criticalMaxAttempts(), CRITICAL_BACKOFF_BASE_MS, 
CRITICAL_BACKOFF_JITTER_MS);
+  }
+
+  /// STALE_REVERT op: STALE → VALID, fingerprint and `lastRefreshTime` 
preserved.  Invoked
+  /// by the scheduler after a precise fingerprint comparison concludes that a 
STALE marking
+  /// was a false positive (the source did not actually change in the affected 
window).  The
+  /// underlying data was not re-materialized, so neither fingerprint nor 
`lastRefreshTime`
+  /// move.
+  ///
+  /// Lenient precondition: if the bucket is no longer STALE on retry (either 
already
+  /// reverted by a concurrent write or the consistency manager re-marked it 
for a new
+  /// reason), the call exits successfully without writing — the desired 
transition is
+  /// already done or no longer applicable.
+  ///
+  /// Retry profile: revert (small budget; failure is recovered by the next 
scheduling
+  /// cycle).
+  public void revertValid(String tableNameWithType, long windowStartMs) {
+    applyMutation(tableNameWithType, current -> {
+      if (current == null) {
+        LOGGER.debug("revertValid: runtime znode missing for table: {}; 
skipping",
+            tableNameWithType);
+        return null;
+      }
+      PartitionInfo existing = current.getPartitions().get(windowStartMs);
+      if (existing == null || existing.getState() != PartitionState.STALE) {
+        LOGGER.info("revertValid: bucket {} for table {} is not STALE (got: 
{}); skipping",
+            windowStartMs, tableNameWithType, existing);
+        return null;
+      }
+      Map<Long, PartitionInfo> updated = new 
HashMap<>(current.getPartitions());
+      updatePartitionInfo(updated, windowStartMs, 
existing.withState(PartitionState.VALID));
+      LOGGER.info("revertValid: table={} bucket={} STALE->VALID 
(false-positive, fp={})",
+          tableNameWithType, windowStartMs, existing.getFingerprint());
+      return new MaterializedViewRuntimeMetadata(
+          current.getMaterializedViewTableNameWithType(), 
current.getWatermarkMs(), updated);
+    }, REVERT_MAX_ATTEMPTS, REVERT_BACKOFF_BASE_MS, REVERT_BACKOFF_JITTER_MS);
+  }
+
+  /// MARK_STALE op (single-bucket): VALID → STALE.  Invoked by the 
consistency manager (and
+  /// future manual REFRESH commands) when base-table data has changed in a 
way that affects
+  /// `windowStartMs`.  Fingerprint and `lastRefreshTime` are preserved so the 
scheduler's
+  /// false-positive revert can compare against the prior snapshot.
+  ///
+  /// Lenient: if the bucket is absent or already STALE, the call is a no-op.  
Today the
+  /// in-coverage VACANT case is a deliberate skip (matching the prior 
consistency manager
+  /// behavior); a future change will synthesize a STALE entry for in-coverage 
VACANT
+  /// buckets to fix the backfill silent-data-loss case.
+  ///

Review Comment:
   The implemented design uses the `VALID-empty` approach rather than `VACANT 
-> STALE` synthesis: `DELETE` leaves the bucket `VALID-empty` (not `VACANT`), 
so a backfill flows through the standard `VALID -> STALE -> OVERWRITE` cycle, 
and the narrow commit-vs-backfill residual is closed by a new periodic 
`VALID-empty` re-evaluation sweep (8e1295a357). No `VACANT -> STALE` synthesis 
is required for correctness; absent buckets remain a no-op and the broker 
routes those queries to the base table. The code matches the design-notes line 
that frames `VACANT -> STALE` synthesis as future work; the PR description's 
earlier 'can synthesize VACANT -> STALE' wording (Bugfix summary + 
operation/transition tables) is now outdated and should be updated to match.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to