Copilot commented on code in PR #18642:
URL: https://github.com/apache/pinot/pull/18642#discussion_r3365786701


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/materializedview/MaterializedViewTaskExecutor.java:
##########
@@ -173,7 +171,8 @@ public SegmentConversionResult executeTask(PinotTaskConfig 
pinotTaskConfig)
     String taskMode = configs.getOrDefault(MaterializedViewTask.TASK_MODE_KEY,
         MaterializedViewTask.TASK_MODE_APPEND);
 
-    // DELETE mode: skip query execution, only remove existing MV segments
+    // DELETE mode: skip query execution; remove existing MV segments and 
rewrite the
+    // runtime PartitionInfo to VALID-empty (see updateMaterializedViewRuntime 
DELETE branch).
     if (MaterializedViewTask.TASK_MODE_DELETE.equals(taskMode)) {

Review Comment:
   The DELETE-mode comment still references the removed 
updateMaterializedViewRuntime() method. This makes the code harder to follow 
and points readers to a symbol that no longer exists; the comment should 
reference the new MaterializedViewPartitionManager-based postProcess path 
instead.



##########
pinot-materialized-view/src/main/java/org/apache/pinot/materializedview/consistency/MaterializedViewConsistencyManager.java:
##########
@@ -410,55 +405,40 @@ private boolean markPartitionsDirty(String viewTableName, 
long affectedStartMs,
             + "mark partitions without a bucket size.  Repair the MV table 
config.",
         viewTableName, bucketMs);
 
-    // Iterate every bucket [partStart, partStart+bucketMs) that overlaps the 
affected range
-    // [affectedStartMs, affectedEndMs]. Two ranges overlap when start1 <= 
end2 AND end1 >= start2.
+    // Cap affectedEndMs at watermarkMs.  No partition with partStart > 
watermarkMs can exist
+    // (the writer invariant), so any bucket beyond that cannot be marked 
STALE anyway.  This
+    // protects the bucket iteration below from a caller passing 
Long.MAX_VALUE (full-range
+    // invalidation from notifyMaterializedViewConsistencyManager paths when 
segment
+    // startTime/endTime is unknown), which would otherwise enumerate 
~watermarkMs/bucketMs
+    // candidates — orders of magnitude more than the number of real 
partitions.
+    long cappedEnd = Math.min(affectedEndMs, runtime.getWatermarkMs());
+    if (cappedEnd < affectedStartMs) {
+      LOGGER.debug("Affected range [{}, {}] is past watermarkMs ({}) for MV 
table: {}; nothing to mark",
+          affectedStartMs, affectedEndMs, runtime.getWatermarkMs(), 
viewTableName);
+      return Collections.emptyList();
+    }
+
+    // Enumerate every bucket [partStart, partStart+bucketMs) that overlaps 
the affected range
+    // [affectedStartMs, cappedEnd]. Two ranges overlap when start1 <= end2 
AND end1 >= start2.
     // The first overlapping bucket has partStart = floorDiv(affectedStartMs, 
bucketMs) * bucketMs;
-    // the last has partStart <= affectedEndMs (any bucket whose start is past 
affectedEndMs
-    // cannot overlap because partStart > affectedEndMs >= affectedStartMs 
implies the bucket
-    // starts after the affected range ends). floorDiv (instead of /) is used 
defensively so a
-    // future caller passing negative affectedStartMs would still produce the 
correct floor.
-    // Only flip existing VALID entries to STALE.  Absent buckets are NOT 
synthesized — under
-    // Design C, a bucket's absence from the partition map already means "MV 
does not cover this
-    // range", so the broker rewrite (PR 2) routes those queries to the base 
table.  Synthesizing
-    // STALE entries for every uncovered bucket below `watermarkMs` would 
explode the znode size
-    // on a full-range invalidation (~watermarkMs / bucketMs entries) without 
affecting routing
-    // correctness — the bucket-iteration loop below stays O(affectedRange / 
bucketMs) but the
-    // persisted map grows only with real partitions.
+    // the last has partStart <= cappedEnd (any bucket whose start is past 
cappedEnd cannot
+    // overlap because partStart > cappedEnd >= affectedStartMs implies the 
bucket starts after
+    // the affected range ends). floorDiv (instead of /) is used defensively 
so a future caller
+    // passing negative affectedStartMs would still produce the correct floor.
+    //
+    // Absent buckets are still enumerated: the manager's per-bucket lenient 
filter no-ops them
+    // under the CAS lock.  Synthesizing STALE entries for absent buckets is 
intentionally NOT
+    // done here — under Design C, a bucket's absence already means "MV does 
not cover this
+    // range" and the broker routes those queries to the base.  Synthesizing 
on every
+    // full-range invalidation would explode the znode size by 
~watermarkMs/bucketMs entries
+    // without affecting routing correctness.
+    List<Long> buckets = new ArrayList<>();
     long partStart = Math.floorDiv(affectedStartMs, bucketMs) * bucketMs;
-    while (partStart <= affectedEndMs) {
-      PartitionInfo info = updatedInfos.get(partStart);
-      if (info != null && info.getState() == PartitionState.VALID) {
-        updatedInfos.put(partStart, info.withState(PartitionState.STALE));
-        anyChanged = true;
-        markedCount++;
-      }
+    while (partStart <= cappedEnd) {
+      buckets.add(partStart);
       partStart += bucketMs;
     }

Review Comment:
   enumerateCandidateBuckets() materializes every bucket start between 
affectedStartMs and watermark (cappedEnd) into a List. For full invalidation 
(affectedEndMs=Long.MAX_VALUE) with a small bucketMs and a large watermark 
horizon, this can allocate an extremely large list and risk high GC pressure or 
OOM. Consider chunking the range (e.g., call _partitionManager.markStale() in 
bounded-size batches) or moving the range-iteration into the manager so you 
don't need to hold all candidate buckets in memory at once.



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/materializedview/MaterializedViewTaskExecutor.java:
##########
@@ -392,8 +391,10 @@ private void revertWindowSegmentReplace(String 
tableNameWithType, String lineage
   }
 
   /// Handles DELETE mode: removes all existing MV segments for the given time 
window
-  /// via segment lineage replace (segmentsFrom=[old segments], segmentsTo=[]).
-  /// No query is executed and no new segments are created.
+  /// via segment lineage replace (segmentsFrom=[old segments], 
segmentsTo=[]).  No query
+  /// is executed and no new MV segments are created; the runtime 
PartitionInfo is rewritten
+  /// to `VALID + PartitionFingerprint.EMPTY` by 
[#updateMaterializedViewRuntime] in
+  /// [#postProcess], so the partition stays tracked-but-empty rather than 
disappearing.

Review Comment:
   This Javadoc still says the runtime PartitionInfo is rewritten by 
updateMaterializedViewRuntime(), but that helper was removed in favor of 
MaterializedViewPartitionManager. Updating this reference avoids confusion and 
keeps the documentation consistent with the current implementation.



##########
pinot-materialized-view/src/main/java/org/apache/pinot/materializedview/metadata/MaterializedViewPartitionManager.java:
##########
@@ -0,0 +1,499 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.materializedview.metadata;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.helix.store.HelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.zkclient.exception.ZkException;
+import org.apache.pinot.materializedview.scheduler.MaterializedViewTaskUtils;
+import org.apache.pinot.spi.utils.CommonConstants.MaterializedViewTask;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/// Single source of truth for every per-partition state transition on the
+/// [MaterializedViewRuntimeMetadata] runtime znode.
+///
+/// <h3>Why this class exists</h3>
+///
+/// Before this manager landed, the runtime znode was mutated by three 
separate sites — the
+/// minion executor (APPEND / OVERWRITE / DELETE task commits), the scheduler 
(false-positive
+/// STALE reverts), and the consistency manager (VALID → STALE markings).  
Each site shipped
+/// its own version-checked CAS loop, with subtly different retry budgets, 
backoff jitter,
+/// and exception-classification rules.  Subsequent state-machine changes 
(e.g. extending the
+/// VACANT → STALE synthesize for in-coverage backfill, or revoking the 
VALID-empty state)
+/// would have required coordinated edits at all three sites — exactly the 
failure mode that
+/// caused the earlier `computeWindowFingerprint` duplication scare.
+///
+/// This class consolidates all per-partition mutations behind a state-change 
DSL.  Each
+/// public method maps to exactly one operation in the per-partition state 
machine; the
+/// ZK CAS retry loop, the writer-side `validateForPersist` invariant gate, 
and the watermark
+/// recompute on APPEND all live behind the public API.  Callers express intent
+/// (`appendValid`, `markStale`, ...) and never see the persistence machinery.
+///
+/// <h3>Architecture</h3>
+///
+/// Three layers, deliberately separated:
+///
+///   - **Public state-change DSL** — one method per state-machine op.  Each 
method has a
+///     well-defined precondition and postcondition (see per-method javadoc) 
and selects the
+///     appropriate retry profile.  Callers never construct `PartitionInfo` 
directly.
+///   - **Private CRUD primitives** — `addPartitionInfo` / 
`updatePartitionInfo` /
+///     `removePartitionInfo`.  Each enforces a structural invariant on the 
in-memory map
+///     (e.g. "cannot add an already-existing entry") that fail-loud on 
violation, surfacing
+///     races as observable exceptions instead of silent overwrites.
+///   - **Single CAS engine** — `applyMutation` runs a caller-supplied mutator 
under the
+///     fetch / mutate / version-checked-write loop, with retry classification:
+///       - [MaterializedViewRuntimeMetadataUtils.CasConflictException] → 
re-fetch + retry
+///       - other [ZkException] (transport / session) → log warn + retry
+///       - [IllegalStateException] / [IllegalArgumentException] 
(validateForPersist) →
+///         propagate (fail-fast, no retry)
+///       - any other exception → propagate
+///
+/// <h3>Retry profile</h3>
+///
+/// Two profiles, calibrated to op semantics:
+///
+///   - **Critical** ([#DEFAULT_CRITICAL_MAX_ATTEMPTS] = 128, cluster-tunable 
via
+///     `pinot.materialized.view.executor.runtime.update.max.attempts`) — used 
by every
+///     state-changing op except `revertValid`.  Failure forces a minion task 
retry at the
+///     Helix level, which is far more expensive than a CAS retry, so the 
budget is sized
+///     to absorb realistic contention with `maxTasksPerBatch` parallel 
writers.
+///   - **Revert** ([#REVERT_MAX_ATTEMPTS] = 8) — used by `revertValid` only.  
Failure
+///     means we did not avoid one OVERWRITE task, which the next scheduling 
cycle will
+///     either run for real or revert again.  Spending 128 retries to save one 
task is
+///     wasteful.
+///
+/// <h3>Concurrency</h3>
+///
+/// The manager is thread-safe and stateless above the ZK property store 
handle.  Concurrent
+/// callers contending on the same MV runtime znode are serialized by ZK CAS; 
one thread's
+/// commit invalidates the other's snapshot, the loser re-fetches and re-runs 
its mutator.
+/// Different MVs map to different znodes — no cross-MV contention.
+///
+/// <h3>Scope discipline</h3>
+///
+/// This class manages exactly the partition map and `watermarkMs` field of
+/// [MaterializedViewRuntimeMetadata].  It MUST NOT acquire other znodes
+/// (e.g. [MaterializedViewDefinitionMetadata]), call source-table or 
table-config services,
+/// or grow a method count beyond the public DSL listed below.  See `red 
lines` in the PR
+/// description.
+public final class MaterializedViewPartitionManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MaterializedViewPartitionManager.class);
+
+  /// Compile-time default for the critical retry budget.  Overridable per 
cluster (no
+  /// restart) via 
`pinot.materialized.view.executor.runtime.update.max.attempts`.  Sized so
+  /// `maxTasksPerBatch` parallel APPEND completions cannot exhaust the budget 
under realistic
+  /// contention.  Reused by ConsistencyMgr-style mark ops so a STALE marking 
is never
+  /// silently dropped.
+  static final int DEFAULT_CRITICAL_MAX_ATTEMPTS = 128;
+
+  /// Compile-time retry budget for `revertValid` only.  Failure is recovered 
by the next
+  /// scheduling cycle (the partition stays STALE and is retried), so a small 
budget is
+  /// appropriate.  Not cluster-tunable — operationally, raising this would 
not buy
+  /// correctness, only delay.
+  static final int REVERT_MAX_ATTEMPTS = 8;
+
+  /// Backoff envelope for the critical profile.  Total wait at the cap with 
worst-case
+  /// jitter is about `128 * (50 + 150) = 25.6 s`, well within the implicit 
per-task timeout.
+  private static final long CRITICAL_BACKOFF_BASE_MS = 50L;
+  private static final int CRITICAL_BACKOFF_JITTER_MS = 150;
+
+  /// Backoff envelope for the revert profile.  Total wait at the cap is about
+  /// `8 * (5 + 20) = 200 ms`.  Tight CAS-loop livelock is the failure to 
defend against;
+  /// scheduler thread blocking is the failure to avoid.
+  private static final long REVERT_BACKOFF_BASE_MS = 5L;
+  private static final int REVERT_BACKOFF_JITTER_MS = 20;
+
+  private final HelixPropertyStore<ZNRecord> _propertyStore;
+
+  /// Optional live cluster-config reader used to override 
[#DEFAULT_CRITICAL_MAX_ATTEMPTS]
+  /// at runtime.  Null in unit tests; null reader falls back to the 
compile-time default.
+  @Nullable
+  private final Function<String, String> _clusterConfigReader;
+
+  public MaterializedViewPartitionManager(HelixPropertyStore<ZNRecord> 
propertyStore,
+      @Nullable Function<String, String> clusterConfigReader) {
+    Preconditions.checkArgument(propertyStore != null, "propertyStore must not 
be null");
+    _propertyStore = propertyStore;
+    _clusterConfigReader = clusterConfigReader;
+  }
+
+  // 
─────────────────────────────────────────────────────────────────────────────────────
+  //  Public state-change DSL
+  // 
─────────────────────────────────────────────────────────────────────────────────────
+
+  /// APPEND op: VACANT → VALID(fp).  Invoked by the minion executor at 
task-commit time
+  /// after a successful materialization of a window that was previously 
absent from the
+  /// partition map.
+  ///
+  /// Side effects (all in a single CAS write):
+  ///
+  ///   - Inserts a new `PartitionInfo(VALID, fp, now)` at `windowStartMs`.
+  ///   - Advances `watermarkMs` to the highest contiguous VALID upper bound 
starting at
+  ///     the prior watermark, derived via
+  ///     [MaterializedViewTaskUtils#computeContiguousUpperMs] over the 
resulting map.
+  ///     The bucket size used for the walk is `windowEndMs - windowStartMs`.
+  ///
+  /// Watermark advancement is bundled with the bucket transition because 
watermark is a
+  /// derived field of the map state and must stay consistent under concurrent 
writers.
+  /// Splitting them across two CAS writes would create a transient window 
where the map
+  /// has the new bucket but the watermark is stale.
+  ///
+  /// Strict precondition: `windowStartMs` must NOT be present in the map.  A 
violation
+  /// (raced double-dispatch from the scheduler, mid-flight task replay) throws
+  /// [IllegalStateException] inside the CAS loop, which propagates without 
retry — the
+  /// scheduler dispatch invariant guarantees one APPEND per bucket per cycle, 
so this
+  /// firing indicates a real bug, not transient contention.
+  ///
+  /// Retry profile: critical (cluster-tunable budget).
+  public void appendValid(String tableNameWithType, long windowStartMs, long 
windowEndMs,
+      PartitionFingerprint fingerprint) {
+    Preconditions.checkArgument(fingerprint != null, "fingerprint must not be 
null");
+    Preconditions.checkArgument(windowEndMs > windowStartMs,
+        "Invalid window: windowEndMs (%s) <= windowStartMs (%s) for table %s",
+        windowEndMs, windowStartMs, tableNameWithType);
+    long bucketMs = windowEndMs - windowStartMs;
+    applyMutation(tableNameWithType, current -> {
+      Preconditions.checkState(current != null,
+          "appendValid called before MV runtime znode was initialized for 
table: %s "
+              + "(cold-start path skipped?)", tableNameWithType);
+      Map<Long, PartitionInfo> updated = new 
HashMap<>(current.getPartitions());
+      addPartitionInfo(updated, windowStartMs,
+          new PartitionInfo(PartitionState.VALID, fingerprint, 
System.currentTimeMillis()));
+      long newWatermarkMs = MaterializedViewTaskUtils.computeContiguousUpperMs(
+          current.getWatermarkMs(), updated, bucketMs);
+      LOGGER.info("appendValid: table={} bucket={} watermarkMs {}->{} 
(partitions={})",
+          tableNameWithType, windowStartMs, current.getWatermarkMs(), 
newWatermarkMs, updated.size());
+      return new MaterializedViewRuntimeMetadata(
+          current.getMaterializedViewTableNameWithType(), newWatermarkMs, 
updated);
+    }, criticalMaxAttempts(), CRITICAL_BACKOFF_BASE_MS, 
CRITICAL_BACKOFF_JITTER_MS);
+  }
+
+  /// OVERWRITE op: STALE → VALID(fp).  Invoked by the minion executor at 
task-commit time
+  /// after re-materializing a STALE bucket whose source-side fingerprint has 
actually
+  /// changed.  Updates the in-place fingerprint and refreshes 
`lastRefreshTime`; watermark
+  /// is unchanged because OVERWRITE only refreshes existing coverage.
+  ///
+  /// Strict precondition: `windowStartMs` must be present and STALE.  A 
non-STALE entry
+  /// indicates the scheduler dispatch invariant was violated and throws
+  /// [IllegalStateException] without retry.
+  ///
+  /// Retry profile: critical.
+  public void refreshValid(String tableNameWithType, long windowStartMs,
+      PartitionFingerprint fingerprint) {
+    Preconditions.checkArgument(fingerprint != null, "fingerprint must not be 
null");
+    applyMutation(tableNameWithType, current -> {
+      Preconditions.checkState(current != null,
+          "refreshValid called before MV runtime znode was initialized for 
table: %s",
+          tableNameWithType);
+      Map<Long, PartitionInfo> updated = new 
HashMap<>(current.getPartitions());
+      PartitionInfo existing = updated.get(windowStartMs);
+      Preconditions.checkState(existing != null && existing.getState() == 
PartitionState.STALE,
+          "refreshValid expects bucket %s to be STALE for table %s, got: %s",
+          windowStartMs, tableNameWithType, existing);
+      updatePartitionInfo(updated, windowStartMs,
+          new PartitionInfo(PartitionState.VALID, fingerprint, 
System.currentTimeMillis()));
+      LOGGER.info("refreshValid: table={} bucket={} STALE->VALID (fp={})",
+          tableNameWithType, windowStartMs, fingerprint);
+      return new MaterializedViewRuntimeMetadata(
+          current.getMaterializedViewTableNameWithType(), 
current.getWatermarkMs(), updated);
+    }, criticalMaxAttempts(), CRITICAL_BACKOFF_BASE_MS, 
CRITICAL_BACKOFF_JITTER_MS);
+  }
+
+  /// DELETE op: STALE → VALID(EMPTY).  Invoked by the minion executor at 
task-commit time
+  /// after retention-deleting MV segments for a window whose source data is 
now empty.
+  /// Persists the `VALID + PartitionFingerprint.EMPTY` shape rather than 
removing the
+  /// entry — the coverage model treats every processed bucket as "tracked", 
so a later
+  /// base-table backfill flows through the standard VALID → STALE → OVERWRITE 
cycle.
+  ///
+  /// <p><b>Design context (open question):</b> The continued existence of the 
`VALID-empty`
+  /// shape is under review.  An alternative model removes the entry entirely 
on DELETE
+  /// (treating empty buckets as VACANT), which requires extending
+  /// [MaterializedViewTaskUtils#computeContiguousUpperMs] to walk past VACANT 
buckets and
+  /// extending the consistency manager to synthesize STALE entries for 
in-coverage VACANT
+  /// buckets on backfill.  Until that design lands, this method preserves 
today's
+  /// `VALID-empty` semantics so the scheduler's contiguous-VALID watermark 
walk continues
+  /// to work without modification.
+  ///
+  /// Strict precondition: `windowStartMs` must be present and STALE.  
Non-STALE entries
+  /// indicate the scheduler dispatch invariant was violated; fail-loud 
surfaces the bug.
+  ///
+  /// Retry profile: critical.
+  public void clearValid(String tableNameWithType, long windowStartMs) {
+    applyMutation(tableNameWithType, current -> {
+      Preconditions.checkState(current != null,
+          "clearValid called before MV runtime znode was initialized for 
table: %s",
+          tableNameWithType);
+      Map<Long, PartitionInfo> updated = new 
HashMap<>(current.getPartitions());
+      PartitionInfo existing = updated.get(windowStartMs);
+      Preconditions.checkState(existing != null && existing.getState() == 
PartitionState.STALE,
+          "clearValid expects bucket %s to be STALE for table %s, got: %s",
+          windowStartMs, tableNameWithType, existing);
+      updatePartitionInfo(updated, windowStartMs,
+          new PartitionInfo(PartitionState.VALID, PartitionFingerprint.EMPTY,
+              System.currentTimeMillis()));
+      LOGGER.info("clearValid: table={} bucket={} STALE->VALID-empty",
+          tableNameWithType, windowStartMs);
+      return new MaterializedViewRuntimeMetadata(
+          current.getMaterializedViewTableNameWithType(), 
current.getWatermarkMs(), updated);
+    }, criticalMaxAttempts(), CRITICAL_BACKOFF_BASE_MS, 
CRITICAL_BACKOFF_JITTER_MS);
+  }
+
+  /// STALE_REVERT op: STALE → VALID, fingerprint and `lastRefreshTime` 
preserved.  Invoked
+  /// by the scheduler after a precise fingerprint comparison concludes that a 
STALE marking
+  /// was a false positive (the source did not actually change in the affected 
window).  The
+  /// underlying data was not re-materialized, so neither fingerprint nor 
`lastRefreshTime`
+  /// move.
+  ///
+  /// Lenient precondition: if the bucket is no longer STALE on retry (either 
already
+  /// reverted by a concurrent write or the consistency manager re-marked it 
for a new
+  /// reason), the call exits successfully without writing — the desired 
transition is
+  /// already done or no longer applicable.
+  ///
+  /// Retry profile: revert (small budget; failure is recovered by the next 
scheduling
+  /// cycle).
+  public void revertValid(String tableNameWithType, long windowStartMs) {
+    applyMutation(tableNameWithType, current -> {
+      if (current == null) {
+        LOGGER.debug("revertValid: runtime znode missing for table: {}; 
skipping",
+            tableNameWithType);
+        return null;
+      }
+      PartitionInfo existing = current.getPartitions().get(windowStartMs);
+      if (existing == null || existing.getState() != PartitionState.STALE) {
+        LOGGER.info("revertValid: bucket {} for table {} is not STALE (got: 
{}); skipping",
+            windowStartMs, tableNameWithType, existing);
+        return null;
+      }
+      Map<Long, PartitionInfo> updated = new 
HashMap<>(current.getPartitions());
+      updatePartitionInfo(updated, windowStartMs, 
existing.withState(PartitionState.VALID));
+      LOGGER.info("revertValid: table={} bucket={} STALE->VALID 
(false-positive, fp={})",
+          tableNameWithType, windowStartMs, existing.getFingerprint());
+      return new MaterializedViewRuntimeMetadata(
+          current.getMaterializedViewTableNameWithType(), 
current.getWatermarkMs(), updated);
+    }, REVERT_MAX_ATTEMPTS, REVERT_BACKOFF_BASE_MS, REVERT_BACKOFF_JITTER_MS);
+  }
+
+  /// MARK_STALE op (single-bucket): VALID → STALE.  Invoked by the 
consistency manager (and
+  /// future manual REFRESH commands) when base-table data has changed in a 
way that affects
+  /// `windowStartMs`.  Fingerprint and `lastRefreshTime` are preserved so the 
scheduler's
+  /// false-positive revert can compare against the prior snapshot.
+  ///
+  /// Lenient: if the bucket is absent or already STALE, the call is a no-op.  
Today the
+  /// in-coverage VACANT case is a deliberate skip (matching the prior 
consistency manager
+  /// behavior); a future change will synthesize a STALE entry for in-coverage 
VACANT
+  /// buckets to fix the backfill silent-data-loss case.
+  ///

Review Comment:
   PR description states that ConsistencyMgr now covers in-coverage "VACANT → 
STALE" by synthesizing a STALE entry for absent buckets, but the current 
manager implementation explicitly treats absent buckets as a no-op (and even 
calls out synthesis as a future change). If synthesis is required for 
correctness per the design notes, it likely needs to be implemented here (or 
the PR description/design docs updated to reflect the current behavior).



##########
pinot-materialized-view/src/main/java/org/apache/pinot/materializedview/scheduler/MaterializedViewTaskUtils.java:
##########
@@ -172,4 +180,51 @@ public static String buildSegmentName(String tableName, 
long windowStartMs, long
       String attemptId, int segIdx) {
     return tableName + "_" + windowStartMs + "_" + windowEndMs + "_" + 
attemptId + "_" + segIdx;
   }
+
+  /// Computes a [PartitionFingerprint] over the source segments overlapping
+  /// `[windowStartMs, windowEndMs)`.  Single source of truth — both the 
scheduler (when
+  /// generating tasks) and the minion executor (when validating fingerprints 
at commit time)
+  /// must call this method, never re-implement the algorithm.
+  ///
+  /// Algorithm (each step is part of the byte-equality contract — changing 
any one of them
+  /// breaks the fingerprint and forces an MV-wide recompute):
+  ///
+  ///   1. **Filter** to segments overlapping the half-open window: `start < 
windowEndMs` AND
+  ///      `end >= windowStartMs`.  Asymmetric `>=` on the lower bound is 
intentional —
+  ///      `SegmentZKMetadata#getEndTimeMs` is inclusive.
+  ///   2. **Sort** the surviving list by segment name.  Listing order from
+  ///      `getSegmentsZKMetadata` is not guaranteed to be stable across 
calls, so the sort
+  ///      makes the hash deterministic.  The comparator is part of the 
algorithm: changing
+  ///      it (e.g. to sort by CRC) is byte-equivalent to changing the hash 
encoding.
+  ///   3. **Hash** the sorted list with `farmHashFingerprint64`, feeding each 
segment as
+  ///      `<segmentName>\0<crc>\n`.  FarmHash64 is non-cryptographic but 
collision-resistant
+  ///      for non-adversarial inputs; it replaces an earlier XOR-CRC scheme 
that exhibited
+  ///      cancellation collisions (swap two segments with equal combined 
contribution →
+  ///      identical fingerprint).
+  ///
+  /// Empty overlap returns [PartitionFingerprint#EMPTY] — by construction, 
since
+  /// `farmHashFingerprint64` over zero input bytes is the constant baked into 
[#EMPTY].
+  /// Callers MUST NOT pre-filter or pre-sort `allSegments`; the helper is 
responsible for
+  /// both.  Pre-sorting with a different comparator would silently break 
byte-equality
+  /// against the executor's commit-time recomputation.
+  public static PartitionFingerprint computeWindowFingerprint(
+      List<SegmentZKMetadata> allSegments, long windowStartMs, long 
windowEndMs) {
+    List<SegmentZKMetadata> overlapping = new ArrayList<>();
+    for (SegmentZKMetadata seg : allSegments) {
+      long segStartMs = seg.getStartTimeMs();
+      long segEndMs = seg.getEndTimeMs();
+      if (segStartMs < windowEndMs && segEndMs >= windowStartMs) {
+        overlapping.add(seg);
+      }
+    }
+    overlapping.sort(Comparator.comparing(SegmentZKMetadata::getSegmentName));
+    Hasher hasher = Hashing.farmHashFingerprint64().newHasher();
+    for (SegmentZKMetadata seg : overlapping) {
+      hasher.putString(seg.getSegmentName(), StandardCharsets.UTF_8);
+      hasher.putByte((byte) 0);
+      hasher.putLong(seg.getCrc());
+      hasher.putByte((byte) '\n');
+    }
+    return new PartitionFingerprint(overlapping.size(), 
hasher.hash().asLong());

Review Comment:
   computeWindowFingerprint()'s Javadoc says empty overlap returns 
PartitionFingerprint.EMPTY, but the implementation always allocates a new 
PartitionFingerprint (even for empty overlap). Returning the constant in the 
empty case would make the implementation match the documented contract and 
avoid per-call allocation on empty windows.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to