xiangfu0 commented on code in PR #18642:
URL: https://github.com/apache/pinot/pull/18642#discussion_r3366657158


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/materializedview/MaterializedViewTaskExecutor.java:
##########
@@ -449,22 +450,40 @@ public void postProcess(PinotTaskConfig pinotTaskConfig) {
     long windowStartMs = 
Long.parseLong(configs.get(MaterializedViewTask.WINDOW_START_MS_KEY));
     long windowEndMs = 
Long.parseLong(configs.get(MaterializedViewTask.WINDOW_END_MS_KEY));
 
-    updateMaterializedViewRuntime(configs, tableName, taskMode, windowStartMs, 
windowEndMs);
-  }
+    MaterializedViewPartitionManager partitionManager = new 
MaterializedViewPartitionManager(
+        MINION_CONTEXT.getHelixPropertyStore(),
+        MaterializedViewTaskExecutor::readMinionClusterConfig);
 
-  /// Updates [MaterializedViewRuntimeMetadata] in a single CAS write, 
combining:
-  ///
-  ///   - partitions: set VALID with new fingerprint (APPEND/OVERWRITE) or 
remove (DELETE)
-  ///   - watermarkMs: advance on APPEND only (drives both scheduler dispatch 
and the
-  ///       broker's SPLIT_REWRITE boundary)
-  ///
-  // Compile-time default for the CAS retry budget when racing to update 
MaterializedViewRuntimeMetadata.
-  // Up to maxTasksPerBatch executors can contend per batch completion; each 
retry re-fetches the
-  // latest version with jittered backoff (Thread.sleep below). 128 is well 
above any realistic
-  // maxTasksPerBatch and stays low enough that genuinely pathological 
contention still surfaces as
-  // a task failure (caught by Helix and retried at the task level). 
Overridable per cluster via
-  // `MaterializedViewTask.CLUSTER_CONFIG_KEY_MAX_RUNTIME_UPDATE_ATTEMPTS` (no 
minion restart).
-  private static final int DEFAULT_MAX_RUNTIME_UPDATE_ATTEMPTS = 128;
+    if (MaterializedViewTask.TASK_MODE_DELETE.equals(taskMode)) {
+      // DELETE writes `VALID + PartitionFingerprint.EMPTY` rather than 
removing the entry —
+      // see `MaterializedViewPartitionManager#clearValid` for the design 
rationale (no
+      // fingerprint validation needed; the source contained zero overlapping 
segments by
+      // construction at scheduler dispatch time).
+      partitionManager.clearValid(tableName, windowStartMs);

Review Comment:
   Addressed in a71f721743: DELETE now re-checks source emptiness inside 
`clearValid`'s CAS loop (a `Supplier` re-reads the source on every attempt) and 
leaves the bucket STALE for OVERWRITE when a backfill is detected, instead of 
writing `VALID+EMPTY`. The narrow residual cross-ZK window is backstopped by a 
periodic `VALID-empty` re-evaluation sweep in 
`MaterializedViewConsistencyManager` (8e1295a357).



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/materializedview/MaterializedViewTaskExecutor.java:
##########
@@ -173,7 +171,8 @@ public SegmentConversionResult executeTask(PinotTaskConfig 
pinotTaskConfig)
     String taskMode = configs.getOrDefault(MaterializedViewTask.TASK_MODE_KEY,
         MaterializedViewTask.TASK_MODE_APPEND);
 
-    // DELETE mode: skip query execution, only remove existing MV segments
+    // DELETE mode: skip query execution; remove existing MV segments and 
rewrite the
+    // runtime PartitionInfo to VALID-empty (see updateMaterializedViewRuntime 
DELETE branch).
     if (MaterializedViewTask.TASK_MODE_DELETE.equals(taskMode)) {

Review Comment:
   Fixed in a71f721743 — the DELETE-mode comments now reference 
`executeDeleteTask` / `postProcess` (the 
`MaterializedViewPartitionManager`-based path); `updateMaterializedViewRuntime` 
is no longer referenced in the production code.



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/materializedview/MaterializedViewTaskExecutor.java:
##########
@@ -392,8 +391,10 @@ private void revertWindowSegmentReplace(String 
tableNameWithType, String lineage
   }
 
   /// Handles DELETE mode: removes all existing MV segments for the given time 
window
-  /// via segment lineage replace (segmentsFrom=[old segments], segmentsTo=[]).
-  /// No query is executed and no new segments are created.
+  /// via segment lineage replace (segmentsFrom=[old segments], 
segmentsTo=[]).  No query
+  /// is executed and no new MV segments are created; the runtime 
PartitionInfo is rewritten
+  /// to `VALID + PartitionFingerprint.EMPTY` by 
[#updateMaterializedViewRuntime] in
+  /// [#postProcess], so the partition stays tracked-but-empty rather than 
disappearing.

Review Comment:
   Fixed in a71f721743 — `executeDeleteTask`'s javadoc now points to 
`postProcess` instead of the removed `updateMaterializedViewRuntime`.



##########
pinot-materialized-view/src/main/java/org/apache/pinot/materializedview/scheduler/MaterializedViewTaskUtils.java:
##########
@@ -172,4 +180,51 @@ public static String buildSegmentName(String tableName, 
long windowStartMs, long
       String attemptId, int segIdx) {
     return tableName + "_" + windowStartMs + "_" + windowEndMs + "_" + 
attemptId + "_" + segIdx;
   }
+
+  /// Computes a [PartitionFingerprint] over the source segments overlapping
+  /// `[windowStartMs, windowEndMs)`.  Single source of truth — both the 
scheduler (when
+  /// generating tasks) and the minion executor (when validating fingerprints 
at commit time)
+  /// must call this method, never re-implement the algorithm.
+  ///
+  /// Algorithm (each step is part of the byte-equality contract — changing 
any one of them
+  /// breaks the fingerprint and forces an MV-wide recompute):
+  ///
+  ///   1. **Filter** to segments overlapping the half-open window: `start < 
windowEndMs` AND
+  ///      `end >= windowStartMs`.  Asymmetric `>=` on the lower bound is 
intentional —
+  ///      `SegmentZKMetadata#getEndTimeMs` is inclusive.
+  ///   2. **Sort** the surviving list by segment name.  Listing order from
+  ///      `getSegmentsZKMetadata` is not guaranteed to be stable across 
calls, so the sort
+  ///      makes the hash deterministic.  The comparator is part of the 
algorithm: changing
+  ///      it (e.g. to sort by CRC) is byte-equivalent to changing the hash 
encoding.
+  ///   3. **Hash** the sorted list with `farmHashFingerprint64`, feeding each 
segment as
+  ///      `<segmentName>\0<crc>\n`.  FarmHash64 is non-cryptographic but 
collision-resistant
+  ///      for non-adversarial inputs; it replaces an earlier XOR-CRC scheme 
that exhibited
+  ///      cancellation collisions (swap two segments with equal combined 
contribution →
+  ///      identical fingerprint).
+  ///
+  /// Empty overlap returns [PartitionFingerprint#EMPTY] — by construction, 
since
+  /// `farmHashFingerprint64` over zero input bytes is the constant baked into 
[#EMPTY].
+  /// Callers MUST NOT pre-filter or pre-sort `allSegments`; the helper is 
responsible for
+  /// both.  Pre-sorting with a different comparator would silently break 
byte-equality
+  /// against the executor's commit-time recomputation.
+  public static PartitionFingerprint computeWindowFingerprint(
+      List<SegmentZKMetadata> allSegments, long windowStartMs, long 
windowEndMs) {
+    List<SegmentZKMetadata> overlapping = new ArrayList<>();
+    for (SegmentZKMetadata seg : allSegments) {
+      long segStartMs = seg.getStartTimeMs();
+      long segEndMs = seg.getEndTimeMs();
+      if (segStartMs < windowEndMs && segEndMs >= windowStartMs) {
+        overlapping.add(seg);
+      }
+    }
+    overlapping.sort(Comparator.comparing(SegmentZKMetadata::getSegmentName));
+    Hasher hasher = Hashing.farmHashFingerprint64().newHasher();
+    for (SegmentZKMetadata seg : overlapping) {
+      hasher.putString(seg.getSegmentName(), StandardCharsets.UTF_8);
+      hasher.putByte((byte) 0);
+      hasher.putLong(seg.getCrc());
+      hasher.putByte((byte) '\n');
+    }
+    return new PartitionFingerprint(overlapping.size(), 
hasher.hash().asLong());

Review Comment:
   Fixed in 8e1295a357 — `computeWindowFingerprint` now returns 
`PartitionFingerprint.EMPTY` for empty overlap, matching the documented 
contract and avoiding the per-call allocation. The value is byte-identical to 
the prior hash-of-empty-input result, and 
`testWindowFingerprintForEmptyOverlapMatchesEmptyConstant` pins the contract.



##########
pinot-materialized-view/src/main/java/org/apache/pinot/materializedview/consistency/MaterializedViewConsistencyManager.java:
##########
@@ -410,55 +405,40 @@ private boolean markPartitionsDirty(String viewTableName, 
long affectedStartMs,
             + "mark partitions without a bucket size.  Repair the MV table 
config.",
         viewTableName, bucketMs);
 
-    // Iterate every bucket [partStart, partStart+bucketMs) that overlaps the 
affected range
-    // [affectedStartMs, affectedEndMs]. Two ranges overlap when start1 <= 
end2 AND end1 >= start2.
+    // Cap affectedEndMs at watermarkMs.  No partition with partStart > 
watermarkMs can exist
+    // (the writer invariant), so any bucket beyond that cannot be marked 
STALE anyway.  This
+    // protects the bucket iteration below from a caller passing 
Long.MAX_VALUE (full-range
+    // invalidation from notifyMaterializedViewConsistencyManager paths when 
segment
+    // startTime/endTime is unknown), which would otherwise enumerate 
~watermarkMs/bucketMs
+    // candidates — orders of magnitude more than the number of real 
partitions.
+    long cappedEnd = Math.min(affectedEndMs, runtime.getWatermarkMs());
+    if (cappedEnd < affectedStartMs) {
+      LOGGER.debug("Affected range [{}, {}] is past watermarkMs ({}) for MV 
table: {}; nothing to mark",
+          affectedStartMs, affectedEndMs, runtime.getWatermarkMs(), 
viewTableName);
+      return Collections.emptyList();
+    }
+
+    // Enumerate every bucket [partStart, partStart+bucketMs) that overlaps 
the affected range
+    // [affectedStartMs, cappedEnd]. Two ranges overlap when start1 <= end2 
AND end1 >= start2.
     // The first overlapping bucket has partStart = floorDiv(affectedStartMs, 
bucketMs) * bucketMs;
-    // the last has partStart <= affectedEndMs (any bucket whose start is past 
affectedEndMs
-    // cannot overlap because partStart > affectedEndMs >= affectedStartMs 
implies the bucket
-    // starts after the affected range ends). floorDiv (instead of /) is used 
defensively so a
-    // future caller passing negative affectedStartMs would still produce the 
correct floor.
-    // Only flip existing VALID entries to STALE.  Absent buckets are NOT 
synthesized — under
-    // Design C, a bucket's absence from the partition map already means "MV 
does not cover this
-    // range", so the broker rewrite (PR 2) routes those queries to the base 
table.  Synthesizing
-    // STALE entries for every uncovered bucket below `watermarkMs` would 
explode the znode size
-    // on a full-range invalidation (~watermarkMs / bucketMs entries) without 
affecting routing
-    // correctness — the bucket-iteration loop below stays O(affectedRange / 
bucketMs) but the
-    // persisted map grows only with real partitions.
+    // the last has partStart <= cappedEnd (any bucket whose start is past 
cappedEnd cannot
+    // overlap because partStart > cappedEnd >= affectedStartMs implies the 
bucket starts after
+    // the affected range ends). floorDiv (instead of /) is used defensively 
so a future caller
+    // passing negative affectedStartMs would still produce the correct floor.
+    //
+    // Absent buckets are still enumerated: the manager's per-bucket lenient 
filter no-ops them
+    // under the CAS lock.  Synthesizing STALE entries for absent buckets is 
intentionally NOT
+    // done here — under Design C, a bucket's absence already means "MV does 
not cover this
+    // range" and the broker routes those queries to the base.  Synthesizing 
on every
+    // full-range invalidation would explode the znode size by 
~watermarkMs/bucketMs entries
+    // without affecting routing correctness.
+    List<Long> buckets = new ArrayList<>();
     long partStart = Math.floorDiv(affectedStartMs, bucketMs) * bucketMs;
-    while (partStart <= affectedEndMs) {
-      PartitionInfo info = updatedInfos.get(partStart);
-      if (info != null && info.getState() == PartitionState.VALID) {
-        updatedInfos.put(partStart, info.withState(PartitionState.STALE));
-        anyChanged = true;
-        markedCount++;
-      }
+    while (partStart <= cappedEnd) {
+      buckets.add(partStart);
       partStart += bucketMs;
     }

Review Comment:
   Fixed in 8e1295a357 — `enumerateCandidateBuckets` now iterates the existing 
partition keys in range instead of every bucket slot, bounding the candidate 
list to the partition count (a full-range invalidation with a small `bucketMs` 
and a large watermark no longer allocates ~`watermarkMs/bucketMs` entries). 
Functionally identical, since `markStale` already no-ops absent buckets under 
its CAS lock.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to