clintropolis commented on code in PR #19461:
URL: https://github.com/apache/druid/pull/19461#discussion_r3243650280
##########
server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java:
##########
@@ -273,6 +274,311 @@ public void replicateSegment(DataSegment segment,
Map<String, Integer> tierToRep
}
}
+ /**
+ * Fingerprint-aware partial-load reconciler. Mirrors {@link
#replicateSegment} for replica-count and surplus
+ * accounting, but per-tier the load/drop decisions are made by classifying
replicas as matching or stale relative
+ * to the requested {@link PartialLoadProfile#fingerprint()} and applying
the load-then-drop swap pattern: stale
+ * replicas keep serving until matching replicas have actually loaded.
+ */
+ @Override
+ public void replicateSegmentPartially(
+ DataSegment segment,
+ PartialLoadProfile profile,
+ Map<String, Integer> tierToReplicaCount
+ )
+ {
+ final Map<String, Integer> effectiveTierToReplicaCount =
expandWithAliases(tierToReplicaCount);
+ final Set<String> allTiersInCluster =
Sets.newHashSet(cluster.getTierNames());
+
+ if (effectiveTierToReplicaCount.isEmpty()) {
+ replicaCountMap.computeIfAbsent(segment.getId(),
DruidServer.DEFAULT_TIER);
+ } else {
+ effectiveTierToReplicaCount.forEach((tier, requiredReplicas) -> {
+ reportTierCapacityStats(segment, requiredReplicas, tier);
+
+ SegmentReplicaCount replicaCount =
replicaCountMap.computeIfAbsent(segment.getId(), tier);
+ replicaCount.setRequired(requiredReplicas,
tierToHistoricalCount.getOrDefault(tier, 0));
+
+ if (!allTiersInCluster.contains(tier)) {
+
datasourceToInvalidLoadTiers.computeIfAbsent(segment.getDataSource(), ds -> new
HashSet<>())
+ .add(tier);
+ }
+ });
+ }
+
+ final SegmentReplicaCount replicaCountInCluster =
replicaCountMap.getTotal(segment.getId());
+ if (replicaCountInCluster.required() <= 0) {
+ segmentsWithZeroRequiredReplicas
+ .computeIfAbsent(segment.getDataSource(), ds -> new HashSet<>())
+ .add(segment);
+ }
+
+ final int replicaSurplus = replicaCountInCluster.loadedNotDropping()
+ - replicaCountInCluster.requiredAndLoadable();
+
+ int dropsQueued = 0;
+ for (String tier : allTiersInCluster) {
+ dropsQueued += updateReplicasInTierPartial(
+ segment,
+ profile,
+ tier,
+ effectiveTierToReplicaCount.getOrDefault(tier, 0),
+ replicaSurplus - dropsQueued
+ );
+ }
+ }
+
+ /**
+ * Per-tier reconciliation under the partial-load model; partial load
equivalent of
+ * {@link #updateReplicasInTier(DataSegment, String, int, int)}
+ *
+ * <h3>Algorithm</h3>
+ * <ol>
+ * <li><b>Classify.</b> Build {@link PartialSegmentStatusInTier} for this
tier; every server falls into at most
+ * one of: matching-loaded, stale-loaded (optionally also
eligible-for-additive-reload),
+ * matching-in-flight, stale-in-flight, eligible-for-fresh-load, or
unclassified (drop/move pending; see
+ * {@link PartialSegmentStatusInTier#classify} for why). Matching
means the announced fingerprint equals
+ * this request's fingerprint; stale is anything else, including a
non-profile regular full-load replica.</li>
+ * <li><b>Compute matching count:</b> matching-loaded + matching-in-flight
− pending-move-drop.</li>
+ * <li><b>If matching count is short of {@code requiredReplicas}</b>
(deficit):
+ * <ol type="a">
+ * <li>Cancel stale-in-flight loads to free their slots. Canceled
servers become same-run fresh-load
+ * destinations.</li>
+ * <li>Queue fresh partial-load requests up to the deficit.
Destination preference order, applied in
+ * {@link #loadPartialReplicas}:
+ * <ol>
+ * <li>Empty servers (clean slate, no in-place mutation
needed).</li>
+ * <li>Servers whose stale-in-flight load we just canceled in
(a), their slot is now free.</li>
+ * <li>Stale-loaded servers eligible for additive reload (the
historical fills in the missing parts in
+ * place). This is the fallback path that mitigates the "no
spare server" stuck state.
+ * Same-run dedup is enforced by {@link
ServerHolder#startOperation}, which rejects a second
+ * queue attempt on a server whose segment is already
queued.</li>
+ * </ol>
+ * </li>
+ * </ol>
+ * </li>
+ * <li><b>If matching count exceeds requirement</b> (surplus): drop the
excess like the full-load surplus
+ * path.</li>
+ * <li><b>Drop stale-loaded replicas</b> only when the count of
<em>actually-loaded</em> matching replicas
+ * already meets the requirement. This preserves availability across
the swap: stale replicas keep serving
+ * until matching replicas have completed loading and announced, then
get dropped. The
+ * {@code maxReplicasToDrop} budget caps how many drops we queue per
coordinator run to avoid drop
+ * storms.</li>
+ * </ol>
+ *
+ * <h3>Returns</h3>
+ * Total number of drop operations queued on this tier (matching surplus +
stale), used to budget cross-tier drop
+ * pressure across subsequent tier reconciliations in the same run.
+ */
+ private int updateReplicasInTierPartial(
+ DataSegment segment,
+ PartialLoadProfile profile,
+ String tier,
+ int requiredReplicas,
+ int maxReplicasToDrop
+ )
+ {
+ final SegmentReplicaCount replicaCountOnTier =
replicaCountMap.get(segment.getId(), tier);
+ final int movingReplicas = replicaCountOnTier.moving();
+ final int moveCompletedPendingDrop = Math.max(0,
replicaCountOnTier.moveCompletedPendingDrop());
+
+ final PartialSegmentStatusInTier status = new PartialSegmentStatusInTier(
+ segment,
+ profile.fingerprint(),
+ cluster.getManagedHistoricalsByTier(tier)
+ );
+
+ final int matchingProjected = status.getMatchingLoaded().size()
+ + status.getMatchingInFlight().size()
+ - moveCompletedPendingDrop;
+ final boolean shouldCancelMoves = requiredReplicas == 0 && movingReplicas
> 0;
+
+ // If everything's already in shape and no stale work, fast-exit.
+ if (matchingProjected == requiredReplicas
+ && !shouldCancelMoves
+ && status.getStaleInFlight().isEmpty()
+ && (status.getStaleLoaded().isEmpty() ||
status.getMatchingLoaded().size() < requiredReplicas)) {
+ return 0;
+ }
+
+ if (shouldCancelMoves) {
+ // Convert to SegmentStatusInTier for the existing cancelOperations
move-cancel helper.
+ final SegmentStatusInTier vanillaStatus =
+ new SegmentStatusInTier(segment,
cluster.getManagedHistoricalsByTier(tier));
+ cancelOperations(SegmentAction.MOVE_TO, movingReplicas, segment,
vanillaStatus);
+ cancelOperations(SegmentAction.MOVE_FROM, movingReplicas, segment,
vanillaStatus);
+ }
+
+ // Cancel stale in-flight: when there's a matching deficit we want their
slots back; when requirement is 0 we
+ // want them gone unconditionally so we don't realize a stale fingerprint
nobody asked for. Canceled servers
+ // become eligible for a fresh matching load later in this same run.
+ final int matchingDeficit = requiredReplicas - matchingProjected;
+ final List<ServerHolder> canceledStaleServers = new ArrayList<>();
+ if (matchingDeficit > 0 || requiredReplicas == 0) {
+ final int toCancel = requiredReplicas == 0 ?
status.getStaleInFlight().size() : matchingDeficit;
+ cancelLoadsOnServers(segment, status.getStaleInFlight(), toCancel,
canceledStaleServers);
+ if (!canceledStaleServers.isEmpty()) {
+ incrementStat(Stats.Segments.PARTIAL_STALE_CANCELLED, segment, tier,
canceledStaleServers.size());
+ }
+ }
+
+ // Queue fresh matching loads to fill the deficit.
+ if (matchingDeficit > 0) {
+ final int numLoadedReplicas = status.getMatchingLoaded().size() +
status.getStaleLoaded().size();
+ final int queued = loadPartialReplicas(
+ matchingDeficit,
+ numLoadedReplicas,
+ segment,
+ tier,
+ status,
+ canceledStaleServers,
+ profile
+ );
+ if (queued > 0) {
+ incrementStat(Stats.Segments.PARTIAL_ASSIGNED, segment, tier, queued);
+ }
+ }
+
+ int dropsQueuedOnTier = 0;
+ int dropBudget = maxReplicasToDrop;
+
+ // Surplus matching: drop excess matching replicas. Same shape as the
full-load surplus path.
+ // Note: cancellations of matching in-flight loads here are intentionally
not emitted as a separate stat. Unlike
+ // the stale-in-flight cancellations above (a distinct "rule churn"
event), these are a surplus-absorption
+ // mechanism: each canceled in-flight load just reduces how many physical
drops we'd otherwise need to queue
+ // (see `surplus - canceledMatching.size()` below). The full-load path in
`updateReplicasInTier` follows the same
+ // convention: canceled surplus loads aren't statted, only the resulting
drops are.
+ if (matchingProjected > requiredReplicas) {
+ final int surplus = matchingProjected - requiredReplicas;
+ final List<ServerHolder> canceledMatching = new ArrayList<>();
+ cancelLoadsOnServers(segment, status.getMatchingInFlight(), surplus,
canceledMatching);
+ final int numToDrop = Math.min(surplus - canceledMatching.size(),
dropBudget);
+ if (numToDrop > 0) {
+ final int dropped = dropFromList(numToDrop, segment,
status.getMatchingLoaded());
+ incrementStat(Stats.Segments.DROPPED, segment, tier, dropped);
+ dropsQueuedOnTier += dropped;
+ dropBudget -= dropped;
+ }
+ }
+
+ // Stale drops: only safe once matching-loaded already satisfies the
requirement (i.e., truly-serving matching
+ // replicas already cover the rule before we touch any stale). This is the
"load then drop" half of the swap.
+ if (status.getMatchingLoaded().size() >= requiredReplicas
+ && !status.getStaleLoaded().isEmpty()
+ && dropBudget > 0) {
+ final int numToDrop = Math.min(status.getStaleLoaded().size(),
dropBudget);
+ final int dropped = dropFromList(numToDrop, segment,
status.getStaleLoaded());
+ if (dropped > 0) {
+ incrementStat(Stats.Segments.PARTIAL_STALE_DROPPED, segment, tier,
dropped);
+ dropsQueuedOnTier += dropped;
+ }
+ }
+
+ return dropsQueuedOnTier;
+ }
+
+ /**
+ * Queues fresh partial-load requests on up to {@code numToLoad} eligible
servers. Preference order: empty
+ * (fresh-load) servers first; then servers whose stale-fingerprint
in-flight loads were just canceled (their slot
+ * is now free); then stale-loaded servers (additive reload; the historical
fills missing parts in place). The last
+ * fallback is what mitigates the "tier saturated with stale" stuck state.
+ */
+ private int loadPartialReplicas(
+ int numToLoad,
+ int numLoadedReplicas,
+ DataSegment segment,
+ String tier,
+ PartialSegmentStatusInTier status,
+ List<ServerHolder> canceledStaleServers,
+ PartialLoadProfile profile
+ )
+ {
+ final boolean isAlreadyLoadedOnTier = numLoadedReplicas >= 1;
+
+ if (isAlreadyLoadedOnTier &&
replicationThrottler.isReplicationThrottledForTier(tier)) {
+ return 0;
+ }
+
+ final List<ServerHolder> destinations = new ArrayList<>(
+ status.getEligibleForFreshLoad().size()
+ + canceledStaleServers.size()
+ + status.getEligibleForAdditiveReload().size()
+ );
+ destinations.addAll(status.getEligibleForFreshLoad());
+ destinations.addAll(canceledStaleServers);
+ destinations.addAll(status.getEligibleForAdditiveReload());
Review Comment:
Oh i see, yea, i guess there will be a temporary problem. I think I can put
an interim fix in until the actual functionality is there.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]