clintropolis commented on code in PR #19461:
URL: https://github.com/apache/druid/pull/19461#discussion_r3239309182


##########
server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java:
##########
@@ -273,6 +274,273 @@ public void replicateSegment(DataSegment segment, 
Map<String, Integer> tierToRep
     }
   }
 
+  /**
+   * Fingerprint-aware partial-load reconciler. Mirrors {@link 
#replicateSegment} for replica-count and surplus
+   * accounting, but per-tier the load/drop decisions are made by classifying 
replicas as matching or stale relative
+   * to the requested {@link PartialLoadProfile#fingerprint()} and applying 
the load-then-drop swap pattern: stale
+   * replicas keep serving until matching replicas have actually loaded.
+   */
+  @Override
+  public void replicateSegmentPartially(
+      DataSegment segment,
+      PartialLoadProfile profile,
+      Map<String, Integer> tierToReplicaCount
+  )
+  {
+    final Map<String, Integer> effectiveTierToReplicaCount = 
expandWithAliases(tierToReplicaCount);
+    final Set<String> allTiersInCluster = 
Sets.newHashSet(cluster.getTierNames());
+
+    if (effectiveTierToReplicaCount.isEmpty()) {
+      replicaCountMap.computeIfAbsent(segment.getId(), 
DruidServer.DEFAULT_TIER);
+    } else {
+      effectiveTierToReplicaCount.forEach((tier, requiredReplicas) -> {
+        reportTierCapacityStats(segment, requiredReplicas, tier);
+
+        SegmentReplicaCount replicaCount = 
replicaCountMap.computeIfAbsent(segment.getId(), tier);
+        replicaCount.setRequired(requiredReplicas, 
tierToHistoricalCount.getOrDefault(tier, 0));
+
+        if (!allTiersInCluster.contains(tier)) {
+          
datasourceToInvalidLoadTiers.computeIfAbsent(segment.getDataSource(), ds -> new 
HashSet<>())
+                                      .add(tier);
+        }
+      });
+    }
+
+    final SegmentReplicaCount replicaCountInCluster = 
replicaCountMap.getTotal(segment.getId());
+    if (replicaCountInCluster.required() <= 0) {
+      segmentsWithZeroRequiredReplicas
+          .computeIfAbsent(segment.getDataSource(), ds -> new HashSet<>())
+          .add(segment);
+    }
+
+    final int replicaSurplus = replicaCountInCluster.loadedNotDropping()
+                               - replicaCountInCluster.requiredAndLoadable();
+
+    int dropsQueued = 0;
+    for (String tier : allTiersInCluster) {
+      dropsQueued += updateReplicasInTierPartial(
+          segment,
+          profile,
+          tier,
+          effectiveTierToReplicaCount.getOrDefault(tier, 0),
+          replicaSurplus - dropsQueued
+      );
+    }
+  }
+
+  /**
+   * Per-tier reconciliation under the partial-load model. The flow is:
+   * <ol>
+   *   <li>Compute matching count: matching-loaded + matching-in-flight − 
pending-move-drop.</li>
+   *   <li>If matching count is short of {@code requiredReplicas}, cancel 
stale-fingerprint in-flight loads to free
+   *       slots, then queue fresh partial-load requests on eligible servers 
(preferring empty servers, falling back
+   *       to additive reload on stale-loaded servers; the historical fills in 
missing parts in place).</li>
+   *   <li>If matching count exceeds requirement, drop the excess like the 
full-load surplus path.</li>
+   *   <li>Drop stale-loaded replicas only when the count of actually-loaded 
matching replicas already meets the
+   *       requirement; this preserves availability across the swap (stale 
replicas keep serving until matching
+   *       replicas have completed loading and announced).</li>
+   * </ol>
+   * Returns the total number of drop operations queued on this tier (matching 
surplus + stale), used to budget
+   * cross-tier drop pressure.
+   */
+  private int updateReplicasInTierPartial(
+      DataSegment segment,
+      PartialLoadProfile profile,
+      String tier,
+      int requiredReplicas,
+      int maxReplicasToDrop
+  )
+  {
+    final SegmentReplicaCount replicaCountOnTier = 
replicaCountMap.get(segment.getId(), tier);
+    final int movingReplicas = replicaCountOnTier.moving();
+    final int moveCompletedPendingDrop = Math.max(0, 
replicaCountOnTier.moveCompletedPendingDrop());
+
+    final PartialSegmentStatusInTier status = new PartialSegmentStatusInTier(
+        segment,
+        profile.fingerprint(),
+        cluster.getManagedHistoricalsByTier(tier)
+    );
+
+    final int matchingProjected = status.getMatchingLoaded().size()
+                                  + status.getMatchingInFlight().size()
+                                  - moveCompletedPendingDrop;
+    final boolean shouldCancelMoves = requiredReplicas == 0 && movingReplicas 
> 0;
+
+    // If everything's already in shape and no stale work, fast-exit.
+    if (matchingProjected == requiredReplicas
+        && !shouldCancelMoves
+        && status.getStaleInFlight().isEmpty()
+        && (status.getStaleLoaded().isEmpty() || 
status.getMatchingLoaded().size() < requiredReplicas)) {
+      return 0;
+    }
+
+    if (shouldCancelMoves) {
+      // Convert to SegmentStatusInTier for the existing cancelOperations 
move-cancel helper.
+      final SegmentStatusInTier vanillaStatus =
+          new SegmentStatusInTier(segment, 
cluster.getManagedHistoricalsByTier(tier));
+      cancelOperations(SegmentAction.MOVE_TO, movingReplicas, segment, 
vanillaStatus);
+      cancelOperations(SegmentAction.MOVE_FROM, movingReplicas, segment, 
vanillaStatus);
+    }
+
+    // Cancel stale in-flight: when there's a matching deficit we want their 
slots back; when requirement is 0 we
+    // want them gone unconditionally so we don't realize a stale fingerprint 
nobody asked for. Canceled servers
+    // become eligible for a fresh matching load later in this same run.
+    final int matchingDeficit = requiredReplicas - matchingProjected;
+    final List<ServerHolder> cancelledStaleServers = new ArrayList<>();
+    if (matchingDeficit > 0 || requiredReplicas == 0) {
+      final int toCancel = requiredReplicas == 0 ? 
status.getStaleInFlight().size() : matchingDeficit;
+      cancelLoadsOnServers(segment, status.getStaleInFlight(), toCancel, 
cancelledStaleServers);
+      if (!cancelledStaleServers.isEmpty()) {
+        incrementStat(Stats.Segments.PARTIAL_STALE_CANCELLED, segment, tier, 
cancelledStaleServers.size());
+      }
+    }
+
+    // Queue fresh matching loads to fill the deficit.
+    if (matchingDeficit > 0) {
+      final int numLoadedReplicas = status.getMatchingLoaded().size() + 
status.getStaleLoaded().size();
+      final int queued = loadPartialReplicas(
+          matchingDeficit, numLoadedReplicas, segment, tier, status, 
cancelledStaleServers, profile
+      );
+      if (queued > 0) {
+        incrementStat(Stats.Segments.PARTIAL_ASSIGNED, segment, tier, queued);
+      }
+    }
+
+    int dropsQueuedOnTier = 0;
+    int dropBudget = maxReplicasToDrop;
+
+    // Surplus matching: drop excess matching replicas. Same shape as the 
full-load surplus path.
+    if (matchingProjected > requiredReplicas) {
+      final int surplus = matchingProjected - requiredReplicas;
+      final List<ServerHolder> cancelledMatching = new ArrayList<>();
+      cancelLoadsOnServers(segment, status.getMatchingInFlight(), surplus, 
cancelledMatching);

Review Comment:
   added comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to