clintropolis commented on code in PR #19461:
URL: https://github.com/apache/druid/pull/19461#discussion_r3243650280


##########
server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java:
##########
@@ -273,6 +274,311 @@ public void replicateSegment(DataSegment segment, 
Map<String, Integer> tierToRep
     }
   }
 
+  /**
+   * Fingerprint-aware partial-load reconciler. Mirrors {@link 
#replicateSegment} for replica-count and surplus
+   * accounting, but per-tier the load/drop decisions are made by classifying 
replicas as matching or stale relative
+   * to the requested {@link PartialLoadProfile#fingerprint()} and applying 
the load-then-drop swap pattern: stale
+   * replicas keep serving until matching replicas have actually loaded.
+   */
+  @Override
+  public void replicateSegmentPartially(
+      DataSegment segment,
+      PartialLoadProfile profile,
+      Map<String, Integer> tierToReplicaCount
+  )
+  {
+    final Map<String, Integer> effectiveTierToReplicaCount = 
expandWithAliases(tierToReplicaCount);
+    final Set<String> allTiersInCluster = 
Sets.newHashSet(cluster.getTierNames());
+
+    if (effectiveTierToReplicaCount.isEmpty()) {
+      replicaCountMap.computeIfAbsent(segment.getId(), 
DruidServer.DEFAULT_TIER);
+    } else {
+      effectiveTierToReplicaCount.forEach((tier, requiredReplicas) -> {
+        reportTierCapacityStats(segment, requiredReplicas, tier);
+
+        SegmentReplicaCount replicaCount = 
replicaCountMap.computeIfAbsent(segment.getId(), tier);
+        replicaCount.setRequired(requiredReplicas, 
tierToHistoricalCount.getOrDefault(tier, 0));
+
+        if (!allTiersInCluster.contains(tier)) {
+          
datasourceToInvalidLoadTiers.computeIfAbsent(segment.getDataSource(), ds -> new 
HashSet<>())
+                                      .add(tier);
+        }
+      });
+    }
+
+    final SegmentReplicaCount replicaCountInCluster = 
replicaCountMap.getTotal(segment.getId());
+    if (replicaCountInCluster.required() <= 0) {
+      segmentsWithZeroRequiredReplicas
+          .computeIfAbsent(segment.getDataSource(), ds -> new HashSet<>())
+          .add(segment);
+    }
+
+    final int replicaSurplus = replicaCountInCluster.loadedNotDropping()
+                               - replicaCountInCluster.requiredAndLoadable();
+
+    int dropsQueued = 0;
+    for (String tier : allTiersInCluster) {
+      dropsQueued += updateReplicasInTierPartial(
+          segment,
+          profile,
+          tier,
+          effectiveTierToReplicaCount.getOrDefault(tier, 0),
+          replicaSurplus - dropsQueued
+      );
+    }
+  }
+
+  /**
+   * Per-tier reconciliation under the partial-load model; partial load 
equivalent of
+   * {@link #updateReplicasInTier(DataSegment, String, int, int)}
+   *
+   * <h3>Algorithm</h3>
+   * <ol>
+   *   <li><b>Classify.</b> Build {@link PartialSegmentStatusInTier} for this 
tier; every server falls into at most
+   *       one of: matching-loaded, stale-loaded (optionally also 
eligible-for-additive-reload),
+   *       matching-in-flight, stale-in-flight, eligible-for-fresh-load, or 
unclassified (drop/move pending; see
+   *       {@link PartialSegmentStatusInTier#classify} for why). Matching 
means the announced fingerprint equals
+   *       this request's fingerprint; stale is anything else, including a 
non-profile regular full-load replica.</li>
+   *   <li><b>Compute matching count:</b> matching-loaded + matching-in-flight 
− pending-move-drop.</li>
+   *   <li><b>If matching count is short of {@code requiredReplicas}</b> 
(deficit):
+   *     <ol type="a">
+   *       <li>Cancel stale-in-flight loads to free their slots. Canceled 
servers become same-run fresh-load
+   *           destinations.</li>
+   *       <li>Queue fresh partial-load requests up to the deficit. 
Destination preference order, applied in
+   *           {@link #loadPartialReplicas}:
+   *           <ol>
+   *             <li>Empty servers (clean slate, no in-place mutation 
needed).</li>
+   *             <li>Servers whose stale-in-flight load we just canceled in 
(a), their slot is now free.</li>
+   *             <li>Stale-loaded servers eligible for additive reload (the 
historical fills in the missing parts in
+   *                 place). This is the fallback path that mitigates the "no 
spare server" stuck state.
+   *                 Same-run dedup is enforced by {@link 
ServerHolder#startOperation}, which rejects a second
+   *                 queue attempt on a server whose segment is already 
queued.</li>
+   *           </ol>
+   *       </li>
+   *     </ol>
+   *   </li>
+   *   <li><b>If matching count exceeds requirement</b> (surplus): drop the 
excess like the full-load surplus
+   *       path.</li>
+   *   <li><b>Drop stale-loaded replicas</b> only when the count of 
<em>actually-loaded</em> matching replicas
+   *       already meets the requirement. This preserves availability across 
the swap: stale replicas keep serving
+   *       until matching replicas have completed loading and announced, then 
get dropped. The
+   *       {@code maxReplicasToDrop} budget caps how many drops we queue per 
coordinator run to avoid drop
+   *       storms.</li>
+   * </ol>
+   *
+   * <h3>Returns</h3>
+   * Total number of drop operations queued on this tier (matching surplus + 
stale), used to budget cross-tier drop
+   * pressure across subsequent tier reconciliations in the same run.
+   */
+  private int updateReplicasInTierPartial(
+      DataSegment segment,
+      PartialLoadProfile profile,
+      String tier,
+      int requiredReplicas,
+      int maxReplicasToDrop
+  )
+  {
+    final SegmentReplicaCount replicaCountOnTier = 
replicaCountMap.get(segment.getId(), tier);
+    final int movingReplicas = replicaCountOnTier.moving();
+    final int moveCompletedPendingDrop = Math.max(0, 
replicaCountOnTier.moveCompletedPendingDrop());
+
+    final PartialSegmentStatusInTier status = new PartialSegmentStatusInTier(
+        segment,
+        profile.fingerprint(),
+        cluster.getManagedHistoricalsByTier(tier)
+    );
+
+    final int matchingProjected = status.getMatchingLoaded().size()
+                                  + status.getMatchingInFlight().size()
+                                  - moveCompletedPendingDrop;
+    final boolean shouldCancelMoves = requiredReplicas == 0 && movingReplicas 
> 0;
+
+    // If everything's already in shape and no stale work, fast-exit.
+    if (matchingProjected == requiredReplicas
+        && !shouldCancelMoves
+        && status.getStaleInFlight().isEmpty()
+        && (status.getStaleLoaded().isEmpty() || 
status.getMatchingLoaded().size() < requiredReplicas)) {
+      return 0;
+    }
+
+    if (shouldCancelMoves) {
+      // Convert to SegmentStatusInTier for the existing cancelOperations 
move-cancel helper.
+      final SegmentStatusInTier vanillaStatus =
+          new SegmentStatusInTier(segment, 
cluster.getManagedHistoricalsByTier(tier));
+      cancelOperations(SegmentAction.MOVE_TO, movingReplicas, segment, 
vanillaStatus);
+      cancelOperations(SegmentAction.MOVE_FROM, movingReplicas, segment, 
vanillaStatus);
+    }
+
+    // Cancel stale in-flight: when there's a matching deficit we want their 
slots back; when requirement is 0 we
+    // want them gone unconditionally so we don't realize a stale fingerprint 
nobody asked for. Canceled servers
+    // become eligible for a fresh matching load later in this same run.
+    final int matchingDeficit = requiredReplicas - matchingProjected;
+    final List<ServerHolder> canceledStaleServers = new ArrayList<>();
+    if (matchingDeficit > 0 || requiredReplicas == 0) {
+      final int toCancel = requiredReplicas == 0 ? 
status.getStaleInFlight().size() : matchingDeficit;
+      cancelLoadsOnServers(segment, status.getStaleInFlight(), toCancel, 
canceledStaleServers);
+      if (!canceledStaleServers.isEmpty()) {
+        incrementStat(Stats.Segments.PARTIAL_STALE_CANCELLED, segment, tier, 
canceledStaleServers.size());
+      }
+    }
+
+    // Queue fresh matching loads to fill the deficit.
+    if (matchingDeficit > 0) {
+      final int numLoadedReplicas = status.getMatchingLoaded().size() + 
status.getStaleLoaded().size();
+      final int queued = loadPartialReplicas(
+          matchingDeficit,
+          numLoadedReplicas,
+          segment,
+          tier,
+          status,
+          canceledStaleServers,
+          profile
+      );
+      if (queued > 0) {
+        incrementStat(Stats.Segments.PARTIAL_ASSIGNED, segment, tier, queued);
+      }
+    }
+
+    int dropsQueuedOnTier = 0;
+    int dropBudget = maxReplicasToDrop;
+
+    // Surplus matching: drop excess matching replicas. Same shape as the 
full-load surplus path.
+    // Note: cancellations of matching in-flight loads here are intentionally 
not emitted as a separate stat. Unlike
+    // the stale-in-flight cancellations above (a distinct "rule churn" 
event), these are a surplus-absorption
+    // mechanism: each canceled in-flight load just reduces how many physical 
drops we'd otherwise need to queue
+    // (see `surplus - canceledMatching.size()` below). The full-load path in 
`updateReplicasInTier` follows the same
+    // convention: canceled surplus loads aren't statted, only the resulting 
drops are.
+    if (matchingProjected > requiredReplicas) {
+      final int surplus = matchingProjected - requiredReplicas;
+      final List<ServerHolder> canceledMatching = new ArrayList<>();
+      cancelLoadsOnServers(segment, status.getMatchingInFlight(), surplus, 
canceledMatching);
+      final int numToDrop = Math.min(surplus - canceledMatching.size(), 
dropBudget);
+      if (numToDrop > 0) {
+        final int dropped = dropFromList(numToDrop, segment, 
status.getMatchingLoaded());
+        incrementStat(Stats.Segments.DROPPED, segment, tier, dropped);
+        dropsQueuedOnTier += dropped;
+        dropBudget -= dropped;
+      }
+    }
+
+    // Stale drops: only safe once matching-loaded already satisfies the 
requirement (i.e., truly-serving matching
+    // replicas already cover the rule before we touch any stale). This is the 
"load then drop" half of the swap.
+    if (status.getMatchingLoaded().size() >= requiredReplicas
+        && !status.getStaleLoaded().isEmpty()
+        && dropBudget > 0) {
+      final int numToDrop = Math.min(status.getStaleLoaded().size(), 
dropBudget);
+      final int dropped = dropFromList(numToDrop, segment, 
status.getStaleLoaded());
+      if (dropped > 0) {
+        incrementStat(Stats.Segments.PARTIAL_STALE_DROPPED, segment, tier, 
dropped);
+        dropsQueuedOnTier += dropped;
+      }
+    }
+
+    return dropsQueuedOnTier;
+  }
+
+  /**
+   * Queues fresh partial-load requests on up to {@code numToLoad} eligible 
servers. Preference order: empty
+   * (fresh-load) servers first; then servers whose stale-fingerprint 
in-flight loads were just canceled (their slot
+   * is now free); then stale-loaded servers (additive reload; the historical 
fills missing parts in place). The last
+   * fallback is what mitigates the "tier saturated with stale" stuck state.
+   */
+  private int loadPartialReplicas(
+      int numToLoad,
+      int numLoadedReplicas,
+      DataSegment segment,
+      String tier,
+      PartialSegmentStatusInTier status,
+      List<ServerHolder> canceledStaleServers,
+      PartialLoadProfile profile
+  )
+  {
+    final boolean isAlreadyLoadedOnTier = numLoadedReplicas >= 1;
+
+    if (isAlreadyLoadedOnTier && 
replicationThrottler.isReplicationThrottledForTier(tier)) {
+      return 0;
+    }
+
+    final List<ServerHolder> destinations = new ArrayList<>(
+        status.getEligibleForFreshLoad().size()
+        + canceledStaleServers.size()
+        + status.getEligibleForAdditiveReload().size()
+    );
+    destinations.addAll(status.getEligibleForFreshLoad());
+    destinations.addAll(canceledStaleServers);
+    destinations.addAll(status.getEligibleForAdditiveReload());

Review Comment:
   Oh i see, yea, i guess there will be a temporary problem. I think I can put 
an interim fix in until the actual functionality is there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to