Copilot commented on code in PR #18642:
URL: https://github.com/apache/pinot/pull/18642#discussion_r3365786701
##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/materializedview/MaterializedViewTaskExecutor.java:
##########
@@ -173,7 +171,8 @@ public SegmentConversionResult executeTask(PinotTaskConfig
pinotTaskConfig)
String taskMode = configs.getOrDefault(MaterializedViewTask.TASK_MODE_KEY,
MaterializedViewTask.TASK_MODE_APPEND);
- // DELETE mode: skip query execution, only remove existing MV segments
+ // DELETE mode: skip query execution; remove existing MV segments and
rewrite the
+ // runtime PartitionInfo to VALID-empty (see updateMaterializedViewRuntime
DELETE branch).
if (MaterializedViewTask.TASK_MODE_DELETE.equals(taskMode)) {
Review Comment:
The DELETE-mode comment still references the removed
updateMaterializedViewRuntime() method. This makes the code harder to follow
and points readers to a symbol that no longer exists; the comment should
reference the new MaterializedViewPartitionManager-based postProcess path
instead.
##########
pinot-materialized-view/src/main/java/org/apache/pinot/materializedview/consistency/MaterializedViewConsistencyManager.java:
##########
@@ -410,55 +405,40 @@ private boolean markPartitionsDirty(String viewTableName,
long affectedStartMs,
+ "mark partitions without a bucket size. Repair the MV table
config.",
viewTableName, bucketMs);
- // Iterate every bucket [partStart, partStart+bucketMs) that overlaps the
affected range
- // [affectedStartMs, affectedEndMs]. Two ranges overlap when start1 <=
end2 AND end1 >= start2.
+ // Cap affectedEndMs at watermarkMs. No partition with partStart >
watermarkMs can exist
+ // (the writer invariant), so any bucket beyond that cannot be marked
STALE anyway. This
+ // protects the bucket iteration below from a caller passing
Long.MAX_VALUE (full-range
+ // invalidation from notifyMaterializedViewConsistencyManager paths when
segment
+ // startTime/endTime is unknown), which would otherwise enumerate
~watermarkMs/bucketMs
+ // candidates — orders of magnitude more than the number of real
partitions.
+ long cappedEnd = Math.min(affectedEndMs, runtime.getWatermarkMs());
+ if (cappedEnd < affectedStartMs) {
+ LOGGER.debug("Affected range [{}, {}] is past watermarkMs ({}) for MV
table: {}; nothing to mark",
+ affectedStartMs, affectedEndMs, runtime.getWatermarkMs(),
viewTableName);
+ return Collections.emptyList();
+ }
+
+ // Enumerate every bucket [partStart, partStart+bucketMs) that overlaps
the affected range
+ // [affectedStartMs, cappedEnd]. Two ranges overlap when start1 <= end2
AND end1 >= start2.
// The first overlapping bucket has partStart = floorDiv(affectedStartMs,
bucketMs) * bucketMs;
- // the last has partStart <= affectedEndMs (any bucket whose start is past
affectedEndMs
- // cannot overlap because partStart > affectedEndMs >= affectedStartMs
implies the bucket
- // starts after the affected range ends). floorDiv (instead of /) is used
defensively so a
- // future caller passing negative affectedStartMs would still produce the
correct floor.
- // Only flip existing VALID entries to STALE. Absent buckets are NOT
synthesized — under
- // Design C, a bucket's absence from the partition map already means "MV
does not cover this
- // range", so the broker rewrite (PR 2) routes those queries to the base
table. Synthesizing
- // STALE entries for every uncovered bucket below `watermarkMs` would
explode the znode size
- // on a full-range invalidation (~watermarkMs / bucketMs entries) without
affecting routing
- // correctness — the bucket-iteration loop below stays O(affectedRange /
bucketMs) but the
- // persisted map grows only with real partitions.
+ // the last has partStart <= cappedEnd (any bucket whose start is past
cappedEnd cannot
+ // overlap because partStart > cappedEnd >= affectedStartMs implies the
bucket starts after
+ // the affected range ends). floorDiv (instead of /) is used defensively
so a future caller
+ // passing negative affectedStartMs would still produce the correct floor.
+ //
+ // Absent buckets are still enumerated: the manager's per-bucket lenient
filter no-ops them
+ // under the CAS lock. Synthesizing STALE entries for absent buckets is
intentionally NOT
+ // done here — under Design C, a bucket's absence already means "MV does
not cover this
+ // range" and the broker routes those queries to the base. Synthesizing
on every
+ // full-range invalidation would explode the znode size by
~watermarkMs/bucketMs entries
+ // without affecting routing correctness.
+ List<Long> buckets = new ArrayList<>();
long partStart = Math.floorDiv(affectedStartMs, bucketMs) * bucketMs;
- while (partStart <= affectedEndMs) {
- PartitionInfo info = updatedInfos.get(partStart);
- if (info != null && info.getState() == PartitionState.VALID) {
- updatedInfos.put(partStart, info.withState(PartitionState.STALE));
- anyChanged = true;
- markedCount++;
- }
+ while (partStart <= cappedEnd) {
+ buckets.add(partStart);
partStart += bucketMs;
}
Review Comment:
enumerateCandidateBuckets() materializes every bucket start between
affectedStartMs and watermark (cappedEnd) into a List. For full invalidation
(affectedEndMs=Long.MAX_VALUE) with a small bucketMs and a large watermark
horizon, this can allocate an extremely large list and risk high GC pressure or
OOM. Consider chunking the range (e.g., call _partitionManager.markStale() in
bounded-size batches) or moving the range-iteration into the manager so you
don't need to hold all candidate buckets in memory at once.
##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/materializedview/MaterializedViewTaskExecutor.java:
##########
@@ -392,8 +391,10 @@ private void revertWindowSegmentReplace(String
tableNameWithType, String lineage
}
/// Handles DELETE mode: removes all existing MV segments for the given time
window
- /// via segment lineage replace (segmentsFrom=[old segments], segmentsTo=[]).
- /// No query is executed and no new segments are created.
+ /// via segment lineage replace (segmentsFrom=[old segments],
segmentsTo=[]). No query
+ /// is executed and no new MV segments are created; the runtime
PartitionInfo is rewritten
+ /// to `VALID + PartitionFingerprint.EMPTY` by
[#updateMaterializedViewRuntime] in
+ /// [#postProcess], so the partition stays tracked-but-empty rather than
disappearing.
Review Comment:
This Javadoc still says the runtime PartitionInfo is rewritten by
updateMaterializedViewRuntime(), but that helper was removed in favor of
MaterializedViewPartitionManager. Updating this reference avoids confusion and
keeps the documentation consistent with the current implementation.
##########
pinot-materialized-view/src/main/java/org/apache/pinot/materializedview/metadata/MaterializedViewPartitionManager.java:
##########
@@ -0,0 +1,499 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.materializedview.metadata;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.helix.store.HelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.zkclient.exception.ZkException;
+import org.apache.pinot.materializedview.scheduler.MaterializedViewTaskUtils;
+import org.apache.pinot.spi.utils.CommonConstants.MaterializedViewTask;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/// Single source of truth for every per-partition state transition on the
+/// [MaterializedViewRuntimeMetadata] runtime znode.
+///
+/// <h3>Why this class exists</h3>
+///
+/// Before this manager landed, the runtime znode was mutated by three
separate sites — the
+/// minion executor (APPEND / OVERWRITE / DELETE task commits), the scheduler
(false-positive
+/// STALE reverts), and the consistency manager (VALID → STALE markings).
Each site shipped
+/// its own version-checked CAS loop, with subtly different retry budgets,
backoff jitter,
+/// and exception-classification rules. Subsequent state-machine changes
(e.g. extending the
+/// VACANT → STALE synthesize for in-coverage backfill, or revoking the
VALID-empty state)
+/// would have required coordinated edits at all three sites — exactly the
failure mode that
+/// caused the earlier `computeWindowFingerprint` duplication scare.
+///
+/// This class consolidates all per-partition mutations behind a state-change
DSL. Each
+/// public method maps to exactly one operation in the per-partition state
machine; the
+/// ZK CAS retry loop, the writer-side `validateForPersist` invariant gate,
and the watermark
+/// recompute on APPEND all live behind the public API. Callers express intent
+/// (`appendValid`, `markStale`, ...) and never see the persistence machinery.
+///
+/// <h3>Architecture</h3>
+///
+/// Three layers, deliberately separated:
+///
+/// - **Public state-change DSL** — one method per state-machine op. Each
method has a
+/// well-defined precondition and postcondition (see per-method javadoc)
and selects the
+/// appropriate retry profile. Callers never construct `PartitionInfo`
directly.
+/// - **Private CRUD primitives** — `addPartitionInfo` /
`updatePartitionInfo` /
+/// `removePartitionInfo`. Each enforces a structural invariant on the
in-memory map
+/// (e.g. "cannot add an already-existing entry") that fail-loud on
violation, surfacing
+/// races as observable exceptions instead of silent overwrites.
+/// - **Single CAS engine** — `applyMutation` runs a caller-supplied mutator
under the
+/// fetch / mutate / version-checked-write loop, with retry classification:
+/// - [MaterializedViewRuntimeMetadataUtils.CasConflictException] →
re-fetch + retry
+/// - other [ZkException] (transport / session) → log warn + retry
+/// - [IllegalStateException] / [IllegalArgumentException]
(validateForPersist) →
+/// propagate (fail-fast, no retry)
+/// - any other exception → propagate
+///
+/// <h3>Retry profile</h3>
+///
+/// Two profiles, calibrated to op semantics:
+///
+/// - **Critical** ([#DEFAULT_CRITICAL_MAX_ATTEMPTS] = 128, cluster-tunable
via
+/// `pinot.materialized.view.executor.runtime.update.max.attempts`) — used
by every
+/// state-changing op except `revertValid`. Failure forces a minion task
retry at the
+/// Helix level, which is far more expensive than a CAS retry, so the
budget is sized
+/// to absorb realistic contention with `maxTasksPerBatch` parallel
writers.
+/// - **Revert** ([#REVERT_MAX_ATTEMPTS] = 8) — used by `revertValid` only.
Failure
+/// means we did not avoid one OVERWRITE task, which the next scheduling
cycle will
+/// either run for real or revert again. Spending 128 retries to save one
task is
+/// wasteful.
+///
+/// <h3>Concurrency</h3>
+///
+/// The manager is thread-safe and stateless above the ZK property store
handle. Concurrent
+/// callers contending on the same MV runtime znode are serialized by ZK CAS;
one thread's
+/// commit invalidates the other's snapshot, the loser re-fetches and re-runs
its mutator.
+/// Different MVs map to different znodes — no cross-MV contention.
+///
+/// <h3>Scope discipline</h3>
+///
+/// This class manages exactly the partition map and `watermarkMs` field of
+/// [MaterializedViewRuntimeMetadata]. It MUST NOT acquire other znodes
+/// (e.g. [MaterializedViewDefinitionMetadata]), call source-table or
table-config services,
+/// or grow a method count beyond the public DSL listed below. See `red
lines` in the PR
+/// description.
+public final class MaterializedViewPartitionManager {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MaterializedViewPartitionManager.class);
+
+ /// Compile-time default for the critical retry budget. Overridable per
cluster (no
+ /// restart) via
`pinot.materialized.view.executor.runtime.update.max.attempts`. Sized so
+ /// `maxTasksPerBatch` parallel APPEND completions cannot exhaust the budget
under realistic
+ /// contention. Reused by ConsistencyMgr-style mark ops so a STALE marking
is never
+ /// silently dropped.
+ static final int DEFAULT_CRITICAL_MAX_ATTEMPTS = 128;
+
+ /// Compile-time retry budget for `revertValid` only. Failure is recovered
by the next
+ /// scheduling cycle (the partition stays STALE and is retried), so a small
budget is
+ /// appropriate. Not cluster-tunable — operationally, raising this would
not buy
+ /// correctness, only delay.
+ static final int REVERT_MAX_ATTEMPTS = 8;
+
+ /// Backoff envelope for the critical profile. Total wait at the cap with
worst-case
+ /// jitter is about `128 * (50 + 150) = 25.6 s`, well within the implicit
per-task timeout.
+ private static final long CRITICAL_BACKOFF_BASE_MS = 50L;
+ private static final int CRITICAL_BACKOFF_JITTER_MS = 150;
+
+ /// Backoff envelope for the revert profile. Total wait at the cap is about
+ /// `8 * (5 + 20) = 200 ms`. Tight CAS-loop livelock is the failure to
defend against;
+ /// scheduler thread blocking is the failure to avoid.
+ private static final long REVERT_BACKOFF_BASE_MS = 5L;
+ private static final int REVERT_BACKOFF_JITTER_MS = 20;
+
+ private final HelixPropertyStore<ZNRecord> _propertyStore;
+
+ /// Optional live cluster-config reader used to override
[#DEFAULT_CRITICAL_MAX_ATTEMPTS]
+ /// at runtime. Null in unit tests; null reader falls back to the
compile-time default.
+ @Nullable
+ private final Function<String, String> _clusterConfigReader;
+
+ public MaterializedViewPartitionManager(HelixPropertyStore<ZNRecord>
propertyStore,
+ @Nullable Function<String, String> clusterConfigReader) {
+ Preconditions.checkArgument(propertyStore != null, "propertyStore must not
be null");
+ _propertyStore = propertyStore;
+ _clusterConfigReader = clusterConfigReader;
+ }
+
+ //
─────────────────────────────────────────────────────────────────────────────────────
+ // Public state-change DSL
+ //
─────────────────────────────────────────────────────────────────────────────────────
+
+ /// APPEND op: VACANT → VALID(fp). Invoked by the minion executor at
task-commit time
+ /// after a successful materialization of a window that was previously
absent from the
+ /// partition map.
+ ///
+ /// Side effects (all in a single CAS write):
+ ///
+ /// - Inserts a new `PartitionInfo(VALID, fp, now)` at `windowStartMs`.
+ /// - Advances `watermarkMs` to the highest contiguous VALID upper bound
starting at
+ /// the prior watermark, derived via
+ /// [MaterializedViewTaskUtils#computeContiguousUpperMs] over the
resulting map.
+ /// The bucket size used for the walk is `windowEndMs - windowStartMs`.
+ ///
+ /// Watermark advancement is bundled with the bucket transition because
watermark is a
+ /// derived field of the map state and must stay consistent under concurrent
writers.
+ /// Splitting them across two CAS writes would create a transient window
where the map
+ /// has the new bucket but the watermark is stale.
+ ///
+ /// Strict precondition: `windowStartMs` must NOT be present in the map. A
violation
+ /// (raced double-dispatch from the scheduler, mid-flight task replay) throws
+ /// [IllegalStateException] inside the CAS loop, which propagates without
retry — the
+ /// scheduler dispatch invariant guarantees one APPEND per bucket per cycle,
so this
+ /// firing indicates a real bug, not transient contention.
+ ///
+ /// Retry profile: critical (cluster-tunable budget).
+ public void appendValid(String tableNameWithType, long windowStartMs, long
windowEndMs,
+ PartitionFingerprint fingerprint) {
+ Preconditions.checkArgument(fingerprint != null, "fingerprint must not be
null");
+ Preconditions.checkArgument(windowEndMs > windowStartMs,
+ "Invalid window: windowEndMs (%s) <= windowStartMs (%s) for table %s",
+ windowEndMs, windowStartMs, tableNameWithType);
+ long bucketMs = windowEndMs - windowStartMs;
+ applyMutation(tableNameWithType, current -> {
+ Preconditions.checkState(current != null,
+ "appendValid called before MV runtime znode was initialized for
table: %s "
+ + "(cold-start path skipped?)", tableNameWithType);
+ Map<Long, PartitionInfo> updated = new
HashMap<>(current.getPartitions());
+ addPartitionInfo(updated, windowStartMs,
+ new PartitionInfo(PartitionState.VALID, fingerprint,
System.currentTimeMillis()));
+ long newWatermarkMs = MaterializedViewTaskUtils.computeContiguousUpperMs(
+ current.getWatermarkMs(), updated, bucketMs);
+ LOGGER.info("appendValid: table={} bucket={} watermarkMs {}->{}
(partitions={})",
+ tableNameWithType, windowStartMs, current.getWatermarkMs(),
newWatermarkMs, updated.size());
+ return new MaterializedViewRuntimeMetadata(
+ current.getMaterializedViewTableNameWithType(), newWatermarkMs,
updated);
+ }, criticalMaxAttempts(), CRITICAL_BACKOFF_BASE_MS,
CRITICAL_BACKOFF_JITTER_MS);
+ }
+
+ /// OVERWRITE op: STALE → VALID(fp). Invoked by the minion executor at
task-commit time
+ /// after re-materializing a STALE bucket whose source-side fingerprint has
actually
+ /// changed. Updates the in-place fingerprint and refreshes
`lastRefreshTime`; watermark
+ /// is unchanged because OVERWRITE only refreshes existing coverage.
+ ///
+ /// Strict precondition: `windowStartMs` must be present and STALE. A
non-STALE entry
+ /// indicates the scheduler dispatch invariant was violated and throws
+ /// [IllegalStateException] without retry.
+ ///
+ /// Retry profile: critical.
+ public void refreshValid(String tableNameWithType, long windowStartMs,
+ PartitionFingerprint fingerprint) {
+ Preconditions.checkArgument(fingerprint != null, "fingerprint must not be
null");
+ applyMutation(tableNameWithType, current -> {
+ Preconditions.checkState(current != null,
+ "refreshValid called before MV runtime znode was initialized for
table: %s",
+ tableNameWithType);
+ Map<Long, PartitionInfo> updated = new
HashMap<>(current.getPartitions());
+ PartitionInfo existing = updated.get(windowStartMs);
+ Preconditions.checkState(existing != null && existing.getState() ==
PartitionState.STALE,
+ "refreshValid expects bucket %s to be STALE for table %s, got: %s",
+ windowStartMs, tableNameWithType, existing);
+ updatePartitionInfo(updated, windowStartMs,
+ new PartitionInfo(PartitionState.VALID, fingerprint,
System.currentTimeMillis()));
+ LOGGER.info("refreshValid: table={} bucket={} STALE->VALID (fp={})",
+ tableNameWithType, windowStartMs, fingerprint);
+ return new MaterializedViewRuntimeMetadata(
+ current.getMaterializedViewTableNameWithType(),
current.getWatermarkMs(), updated);
+ }, criticalMaxAttempts(), CRITICAL_BACKOFF_BASE_MS,
CRITICAL_BACKOFF_JITTER_MS);
+ }
+
+ /// DELETE op: STALE → VALID(EMPTY). Invoked by the minion executor at
task-commit time
+ /// after retention-deleting MV segments for a window whose source data is
now empty.
+ /// Persists the `VALID + PartitionFingerprint.EMPTY` shape rather than
removing the
+ /// entry — the coverage model treats every processed bucket as "tracked",
so a later
+ /// base-table backfill flows through the standard VALID → STALE → OVERWRITE
cycle.
+ ///
+ /// <p><b>Design context (open question):</b> The continued existence of the
`VALID-empty`
+ /// shape is under review. An alternative model removes the entry entirely
on DELETE
+ /// (treating empty buckets as VACANT), which requires extending
+ /// [MaterializedViewTaskUtils#computeContiguousUpperMs] to walk past VACANT
buckets and
+ /// extending the consistency manager to synthesize STALE entries for
in-coverage VACANT
+ /// buckets on backfill. Until that design lands, this method preserves
today's
+ /// `VALID-empty` semantics so the scheduler's contiguous-VALID watermark
walk continues
+ /// to work without modification.
+ ///
+ /// Strict precondition: `windowStartMs` must be present and STALE.
Non-STALE entries
+ /// indicate the scheduler dispatch invariant was violated; fail-loud
surfaces the bug.
+ ///
+ /// Retry profile: critical.
+ public void clearValid(String tableNameWithType, long windowStartMs) {
+ applyMutation(tableNameWithType, current -> {
+ Preconditions.checkState(current != null,
+ "clearValid called before MV runtime znode was initialized for
table: %s",
+ tableNameWithType);
+ Map<Long, PartitionInfo> updated = new
HashMap<>(current.getPartitions());
+ PartitionInfo existing = updated.get(windowStartMs);
+ Preconditions.checkState(existing != null && existing.getState() ==
PartitionState.STALE,
+ "clearValid expects bucket %s to be STALE for table %s, got: %s",
+ windowStartMs, tableNameWithType, existing);
+ updatePartitionInfo(updated, windowStartMs,
+ new PartitionInfo(PartitionState.VALID, PartitionFingerprint.EMPTY,
+ System.currentTimeMillis()));
+ LOGGER.info("clearValid: table={} bucket={} STALE->VALID-empty",
+ tableNameWithType, windowStartMs);
+ return new MaterializedViewRuntimeMetadata(
+ current.getMaterializedViewTableNameWithType(),
current.getWatermarkMs(), updated);
+ }, criticalMaxAttempts(), CRITICAL_BACKOFF_BASE_MS,
CRITICAL_BACKOFF_JITTER_MS);
+ }
+
+ /// STALE_REVERT op: STALE → VALID, fingerprint and `lastRefreshTime`
preserved. Invoked
+ /// by the scheduler after a precise fingerprint comparison concludes that a
STALE marking
+ /// was a false positive (the source did not actually change in the affected
window). The
+ /// underlying data was not re-materialized, so neither fingerprint nor
`lastRefreshTime`
+ /// move.
+ ///
+ /// Lenient precondition: if the bucket is no longer STALE on retry (either
already
+ /// reverted by a concurrent write or the consistency manager re-marked it
for a new
+ /// reason), the call exits successfully without writing — the desired
transition is
+ /// already done or no longer applicable.
+ ///
+ /// Retry profile: revert (small budget; failure is recovered by the next
scheduling
+ /// cycle).
+ public void revertValid(String tableNameWithType, long windowStartMs) {
+ applyMutation(tableNameWithType, current -> {
+ if (current == null) {
+ LOGGER.debug("revertValid: runtime znode missing for table: {};
skipping",
+ tableNameWithType);
+ return null;
+ }
+ PartitionInfo existing = current.getPartitions().get(windowStartMs);
+ if (existing == null || existing.getState() != PartitionState.STALE) {
+ LOGGER.info("revertValid: bucket {} for table {} is not STALE (got:
{}); skipping",
+ windowStartMs, tableNameWithType, existing);
+ return null;
+ }
+ Map<Long, PartitionInfo> updated = new
HashMap<>(current.getPartitions());
+ updatePartitionInfo(updated, windowStartMs,
existing.withState(PartitionState.VALID));
+ LOGGER.info("revertValid: table={} bucket={} STALE->VALID
(false-positive, fp={})",
+ tableNameWithType, windowStartMs, existing.getFingerprint());
+ return new MaterializedViewRuntimeMetadata(
+ current.getMaterializedViewTableNameWithType(),
current.getWatermarkMs(), updated);
+ }, REVERT_MAX_ATTEMPTS, REVERT_BACKOFF_BASE_MS, REVERT_BACKOFF_JITTER_MS);
+ }
+
+ /// MARK_STALE op (single-bucket): VALID → STALE. Invoked by the
consistency manager (and
+ /// future manual REFRESH commands) when base-table data has changed in a
way that affects
+ /// `windowStartMs`. Fingerprint and `lastRefreshTime` are preserved so the
scheduler's
+ /// false-positive revert can compare against the prior snapshot.
+ ///
+ /// Lenient: if the bucket is absent or already STALE, the call is a no-op.
Today the
+ /// in-coverage VACANT case is a deliberate skip (matching the prior
consistency manager
+ /// behavior); a future change will synthesize a STALE entry for in-coverage
VACANT
+ /// buckets to fix the backfill silent-data-loss case.
+ ///
Review Comment:
PR description states that ConsistencyMgr now covers in-coverage "VACANT →
STALE" by synthesizing a STALE entry for absent buckets, but the current
manager implementation explicitly treats absent buckets as a no-op (and even
calls out synthesis as a future change). If synthesis is required for
correctness per the design notes, it likely needs to be implemented here (or
the PR description/design docs updated to reflect the current behavior).
##########
pinot-materialized-view/src/main/java/org/apache/pinot/materializedview/scheduler/MaterializedViewTaskUtils.java:
##########
@@ -172,4 +180,51 @@ public static String buildSegmentName(String tableName,
long windowStartMs, long
String attemptId, int segIdx) {
return tableName + "_" + windowStartMs + "_" + windowEndMs + "_" +
attemptId + "_" + segIdx;
}
+
+ /// Computes a [PartitionFingerprint] over the source segments overlapping
+ /// `[windowStartMs, windowEndMs)`. Single source of truth — both the
scheduler (when
+ /// generating tasks) and the minion executor (when validating fingerprints
at commit time)
+ /// must call this method, never re-implement the algorithm.
+ ///
+ /// Algorithm (each step is part of the byte-equality contract — changing
any one of them
+ /// breaks the fingerprint and forces an MV-wide recompute):
+ ///
+ /// 1. **Filter** to segments overlapping the half-open window: `start <
windowEndMs` AND
+ /// `end >= windowStartMs`. Asymmetric `>=` on the lower bound is
intentional —
+ /// `SegmentZKMetadata#getEndTimeMs` is inclusive.
+ /// 2. **Sort** the surviving list by segment name. Listing order from
+ /// `getSegmentsZKMetadata` is not guaranteed to be stable across
calls, so the sort
+ /// makes the hash deterministic. The comparator is part of the
algorithm: changing
+ /// it (e.g. to sort by CRC) is byte-equivalent to changing the hash
encoding.
+ /// 3. **Hash** the sorted list with `farmHashFingerprint64`, feeding each
segment as
+ /// `<segmentName>\0<crc>\n`. FarmHash64 is non-cryptographic but
collision-resistant
+ /// for non-adversarial inputs; it replaces an earlier XOR-CRC scheme
that exhibited
+ /// cancellation collisions (swap two segments with equal combined
contribution →
+ /// identical fingerprint).
+ ///
+ /// Empty overlap returns [PartitionFingerprint#EMPTY] — by construction,
since
+ /// `farmHashFingerprint64` over zero input bytes is the constant baked into
[#EMPTY].
+ /// Callers MUST NOT pre-filter or pre-sort `allSegments`; the helper is
responsible for
+ /// both. Pre-sorting with a different comparator would silently break
byte-equality
+ /// against the executor's commit-time recomputation.
+ public static PartitionFingerprint computeWindowFingerprint(
+ List<SegmentZKMetadata> allSegments, long windowStartMs, long
windowEndMs) {
+ List<SegmentZKMetadata> overlapping = new ArrayList<>();
+ for (SegmentZKMetadata seg : allSegments) {
+ long segStartMs = seg.getStartTimeMs();
+ long segEndMs = seg.getEndTimeMs();
+ if (segStartMs < windowEndMs && segEndMs >= windowStartMs) {
+ overlapping.add(seg);
+ }
+ }
+ overlapping.sort(Comparator.comparing(SegmentZKMetadata::getSegmentName));
+ Hasher hasher = Hashing.farmHashFingerprint64().newHasher();
+ for (SegmentZKMetadata seg : overlapping) {
+ hasher.putString(seg.getSegmentName(), StandardCharsets.UTF_8);
+ hasher.putByte((byte) 0);
+ hasher.putLong(seg.getCrc());
+ hasher.putByte((byte) '\n');
+ }
+ return new PartitionFingerprint(overlapping.size(),
hasher.hash().asLong());
Review Comment:
computeWindowFingerprint()'s Javadoc says empty overlap returns
PartitionFingerprint.EMPTY, but the implementation always allocates a new
PartitionFingerprint (even for empty overlap). Returning the constant in the
empty case would make the implementation match the documented contract and
avoid per-call allocation on empty windows.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]