xiangfu0 commented on code in PR #18642:
URL: https://github.com/apache/pinot/pull/18642#discussion_r3366657158
##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/materializedview/MaterializedViewTaskExecutor.java:
##########
@@ -449,22 +450,40 @@ public void postProcess(PinotTaskConfig pinotTaskConfig) {
long windowStartMs =
Long.parseLong(configs.get(MaterializedViewTask.WINDOW_START_MS_KEY));
long windowEndMs =
Long.parseLong(configs.get(MaterializedViewTask.WINDOW_END_MS_KEY));
- updateMaterializedViewRuntime(configs, tableName, taskMode, windowStartMs,
windowEndMs);
- }
+ MaterializedViewPartitionManager partitionManager = new
MaterializedViewPartitionManager(
+ MINION_CONTEXT.getHelixPropertyStore(),
+ MaterializedViewTaskExecutor::readMinionClusterConfig);
- /// Updates [MaterializedViewRuntimeMetadata] in a single CAS write,
combining:
- ///
- /// - partitions: set VALID with new fingerprint (APPEND/OVERWRITE) or
remove (DELETE)
- /// - watermarkMs: advance on APPEND only (drives both scheduler dispatch
and the
- /// broker's SPLIT_REWRITE boundary)
- ///
- // Compile-time default for the CAS retry budget when racing to update
MaterializedViewRuntimeMetadata.
- // Up to maxTasksPerBatch executors can contend per batch completion; each
retry re-fetches the
- // latest version with jittered backoff (Thread.sleep below). 128 is well
above any realistic
- // maxTasksPerBatch and stays low enough that genuinely pathological
contention still surfaces as
- // a task failure (caught by Helix and retried at the task level).
Overridable per cluster via
- // `MaterializedViewTask.CLUSTER_CONFIG_KEY_MAX_RUNTIME_UPDATE_ATTEMPTS` (no
minion restart).
- private static final int DEFAULT_MAX_RUNTIME_UPDATE_ATTEMPTS = 128;
+ if (MaterializedViewTask.TASK_MODE_DELETE.equals(taskMode)) {
+ // DELETE writes `VALID + PartitionFingerprint.EMPTY` rather than
removing the entry —
+ // see `MaterializedViewPartitionManager#clearValid` for the design
rationale (no
+ // fingerprint validation needed; the source contained zero overlapping
segments by
+ // construction at scheduler dispatch time).
+ partitionManager.clearValid(tableName, windowStartMs);
Review Comment:
Addressed in a71f721743: DELETE now re-checks source emptiness inside
`clearValid`'s CAS loop (a `Supplier` re-reads the source on every attempt) and
leaves the bucket STALE for OVERWRITE when a backfill is detected, instead of
writing `VALID+EMPTY`. The narrow residual cross-ZK window is backstopped by a
periodic `VALID-empty` re-evaluation sweep in
`MaterializedViewConsistencyManager` (8e1295a357).
##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/materializedview/MaterializedViewTaskExecutor.java:
##########
@@ -173,7 +171,8 @@ public SegmentConversionResult executeTask(PinotTaskConfig
pinotTaskConfig)
String taskMode = configs.getOrDefault(MaterializedViewTask.TASK_MODE_KEY,
MaterializedViewTask.TASK_MODE_APPEND);
- // DELETE mode: skip query execution, only remove existing MV segments
+ // DELETE mode: skip query execution; remove existing MV segments and
rewrite the
+ // runtime PartitionInfo to VALID-empty (see updateMaterializedViewRuntime
DELETE branch).
if (MaterializedViewTask.TASK_MODE_DELETE.equals(taskMode)) {
Review Comment:
Fixed in a71f721743 — the DELETE-mode comments now reference
`executeDeleteTask` / `postProcess` (the
`MaterializedViewPartitionManager`-based path); `updateMaterializedViewRuntime`
is no longer referenced in the production code.
##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/materializedview/MaterializedViewTaskExecutor.java:
##########
@@ -392,8 +391,10 @@ private void revertWindowSegmentReplace(String
tableNameWithType, String lineage
}
/// Handles DELETE mode: removes all existing MV segments for the given time
window
- /// via segment lineage replace (segmentsFrom=[old segments], segmentsTo=[]).
- /// No query is executed and no new segments are created.
+ /// via segment lineage replace (segmentsFrom=[old segments],
segmentsTo=[]). No query
+ /// is executed and no new MV segments are created; the runtime
PartitionInfo is rewritten
+ /// to `VALID + PartitionFingerprint.EMPTY` by
[#updateMaterializedViewRuntime] in
+ /// [#postProcess], so the partition stays tracked-but-empty rather than
disappearing.
Review Comment:
Fixed in a71f721743 — `executeDeleteTask`'s javadoc now points to
`postProcess` instead of the removed `updateMaterializedViewRuntime`.
##########
pinot-materialized-view/src/main/java/org/apache/pinot/materializedview/scheduler/MaterializedViewTaskUtils.java:
##########
@@ -172,4 +180,51 @@ public static String buildSegmentName(String tableName,
long windowStartMs, long
String attemptId, int segIdx) {
return tableName + "_" + windowStartMs + "_" + windowEndMs + "_" +
attemptId + "_" + segIdx;
}
+
+ /// Computes a [PartitionFingerprint] over the source segments overlapping
+ /// `[windowStartMs, windowEndMs)`. Single source of truth — both the
scheduler (when
+ /// generating tasks) and the minion executor (when validating fingerprints
at commit time)
+ /// must call this method, never re-implement the algorithm.
+ ///
+ /// Algorithm (each step is part of the byte-equality contract — changing
any one of them
+ /// breaks the fingerprint and forces an MV-wide recompute):
+ ///
+ /// 1. **Filter** to segments overlapping the half-open window: `start <
windowEndMs` AND
+ /// `end >= windowStartMs`. Asymmetric `>=` on the lower bound is
intentional —
+ /// `SegmentZKMetadata#getEndTimeMs` is inclusive.
+ /// 2. **Sort** the surviving list by segment name. Listing order from
+ /// `getSegmentsZKMetadata` is not guaranteed to be stable across
calls, so the sort
+ /// makes the hash deterministic. The comparator is part of the
algorithm: changing
+ /// it (e.g. to sort by CRC) is byte-equivalent to changing the hash
encoding.
+ /// 3. **Hash** the sorted list with `farmHashFingerprint64`, feeding each
segment as
+ /// `<segmentName>\0<crc>\n`. FarmHash64 is non-cryptographic but
collision-resistant
+ /// for non-adversarial inputs; it replaces an earlier XOR-CRC scheme
that exhibited
+ /// cancellation collisions (swap two segments with equal combined
contribution →
+ /// identical fingerprint).
+ ///
+ /// Empty overlap returns [PartitionFingerprint#EMPTY] — by construction,
since
+ /// `farmHashFingerprint64` over zero input bytes is the constant baked into
[#EMPTY].
+ /// Callers MUST NOT pre-filter or pre-sort `allSegments`; the helper is
responsible for
+ /// both. Pre-sorting with a different comparator would silently break
byte-equality
+ /// against the executor's commit-time recomputation.
+ public static PartitionFingerprint computeWindowFingerprint(
+ List<SegmentZKMetadata> allSegments, long windowStartMs, long
windowEndMs) {
+ List<SegmentZKMetadata> overlapping = new ArrayList<>();
+ for (SegmentZKMetadata seg : allSegments) {
+ long segStartMs = seg.getStartTimeMs();
+ long segEndMs = seg.getEndTimeMs();
+ if (segStartMs < windowEndMs && segEndMs >= windowStartMs) {
+ overlapping.add(seg);
+ }
+ }
+ overlapping.sort(Comparator.comparing(SegmentZKMetadata::getSegmentName));
+ Hasher hasher = Hashing.farmHashFingerprint64().newHasher();
+ for (SegmentZKMetadata seg : overlapping) {
+ hasher.putString(seg.getSegmentName(), StandardCharsets.UTF_8);
+ hasher.putByte((byte) 0);
+ hasher.putLong(seg.getCrc());
+ hasher.putByte((byte) '\n');
+ }
+ return new PartitionFingerprint(overlapping.size(),
hasher.hash().asLong());
Review Comment:
Fixed in 8e1295a357 — `computeWindowFingerprint` now returns
`PartitionFingerprint.EMPTY` for empty overlap, matching the documented
contract and avoiding the per-call allocation. The value is byte-identical to
the prior hash-of-empty-input result, and
`testWindowFingerprintForEmptyOverlapMatchesEmptyConstant` pins the contract.
##########
pinot-materialized-view/src/main/java/org/apache/pinot/materializedview/consistency/MaterializedViewConsistencyManager.java:
##########
@@ -410,55 +405,40 @@ private boolean markPartitionsDirty(String viewTableName,
long affectedStartMs,
+ "mark partitions without a bucket size. Repair the MV table
config.",
viewTableName, bucketMs);
- // Iterate every bucket [partStart, partStart+bucketMs) that overlaps the
affected range
- // [affectedStartMs, affectedEndMs]. Two ranges overlap when start1 <=
end2 AND end1 >= start2.
+ // Cap affectedEndMs at watermarkMs. No partition with partStart >
watermarkMs can exist
+ // (the writer invariant), so any bucket beyond that cannot be marked
STALE anyway. This
+ // protects the bucket iteration below from a caller passing
Long.MAX_VALUE (full-range
+ // invalidation from notifyMaterializedViewConsistencyManager paths when
segment
+ // startTime/endTime is unknown), which would otherwise enumerate
~watermarkMs/bucketMs
+ // candidates — orders of magnitude more than the number of real
partitions.
+ long cappedEnd = Math.min(affectedEndMs, runtime.getWatermarkMs());
+ if (cappedEnd < affectedStartMs) {
+ LOGGER.debug("Affected range [{}, {}] is past watermarkMs ({}) for MV
table: {}; nothing to mark",
+ affectedStartMs, affectedEndMs, runtime.getWatermarkMs(),
viewTableName);
+ return Collections.emptyList();
+ }
+
+ // Enumerate every bucket [partStart, partStart+bucketMs) that overlaps
the affected range
+ // [affectedStartMs, cappedEnd]. Two ranges overlap when start1 <= end2
AND end1 >= start2.
// The first overlapping bucket has partStart = floorDiv(affectedStartMs,
bucketMs) * bucketMs;
- // the last has partStart <= affectedEndMs (any bucket whose start is past
affectedEndMs
- // cannot overlap because partStart > affectedEndMs >= affectedStartMs
implies the bucket
- // starts after the affected range ends). floorDiv (instead of /) is used
defensively so a
- // future caller passing negative affectedStartMs would still produce the
correct floor.
- // Only flip existing VALID entries to STALE. Absent buckets are NOT
synthesized — under
- // Design C, a bucket's absence from the partition map already means "MV
does not cover this
- // range", so the broker rewrite (PR 2) routes those queries to the base
table. Synthesizing
- // STALE entries for every uncovered bucket below `watermarkMs` would
explode the znode size
- // on a full-range invalidation (~watermarkMs / bucketMs entries) without
affecting routing
- // correctness — the bucket-iteration loop below stays O(affectedRange /
bucketMs) but the
- // persisted map grows only with real partitions.
+ // the last has partStart <= cappedEnd (any bucket whose start is past
cappedEnd cannot
+ // overlap because partStart > cappedEnd >= affectedStartMs implies the
bucket starts after
+ // the affected range ends). floorDiv (instead of /) is used defensively
so a future caller
+ // passing negative affectedStartMs would still produce the correct floor.
+ //
+ // Absent buckets are still enumerated: the manager's per-bucket lenient
filter no-ops them
+ // under the CAS lock. Synthesizing STALE entries for absent buckets is
intentionally NOT
+ // done here — under Design C, a bucket's absence already means "MV does
not cover this
+ // range" and the broker routes those queries to the base. Synthesizing
on every
+ // full-range invalidation would explode the znode size by
~watermarkMs/bucketMs entries
+ // without affecting routing correctness.
+ List<Long> buckets = new ArrayList<>();
long partStart = Math.floorDiv(affectedStartMs, bucketMs) * bucketMs;
- while (partStart <= affectedEndMs) {
- PartitionInfo info = updatedInfos.get(partStart);
- if (info != null && info.getState() == PartitionState.VALID) {
- updatedInfos.put(partStart, info.withState(PartitionState.STALE));
- anyChanged = true;
- markedCount++;
- }
+ while (partStart <= cappedEnd) {
+ buckets.add(partStart);
partStart += bucketMs;
}
Review Comment:
Fixed in 8e1295a357 — `enumerateCandidateBuckets` now iterates the existing
partition keys in range instead of every bucket slot, bounding the candidate
list to the partition count (a full-range invalidation with a small `bucketMs`
and a large watermark no longer allocates ~`watermarkMs/bucketMs` entries).
Functionally identical, since `markStale` already no-ops absent buckets under
its CAS lock.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]