imply-cheddar commented on code in PR #13197:
URL: https://github.com/apache/druid/pull/13197#discussion_r1009974733


##########
server/src/main/java/org/apache/druid/server/coordinator/SegmentHolder.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Ordering;
+import org.apache.druid.server.coordination.DataSegmentChangeRequest;
+import org.apache.druid.server.coordination.SegmentChangeRequestDrop;
+import org.apache.druid.server.coordination.SegmentChangeRequestLoad;
+import org.apache.druid.timeline.DataSegment;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Represents a segment queued for a load or drop operation in a LoadQueuePeon.
+ * <p>
+ * Requests are naturally ordered using the {@link 
#COMPARE_ACTION_THEN_INTERVAL}.
+ */
+public class SegmentHolder implements Comparable<SegmentHolder>
+{
+  /**
+   * Orders segment requests:
+   * <ul>
+   *   <li>first by action: all drops, then all loads, then all moves</li>
+   *   <li>then by interval: newest segments first</li>
+   * </ul>
+   */
+  public static final Comparator<SegmentHolder> COMPARE_ACTION_THEN_INTERVAL =
+      Ordering.explicit(SegmentAction.DROP, SegmentAction.LOAD, 
SegmentAction.REPLICATE, SegmentAction.MOVE_TO)
+              .onResultOf(SegmentHolder::getAction)
+              
.compound(DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST.onResultOf(SegmentHolder::getSegment));
+
+  private final DataSegment segment;
+  private final DataSegmentChangeRequest changeRequest;
+  private final SegmentAction action;
+
+  // Guaranteed to store only non-null elements
+  private final List<LoadPeonCallback> callbacks = new ArrayList<>();
+  private final AtomicLong firstRequestMillis = new AtomicLong(0);
+
+  SegmentHolder(
+      DataSegment segment,
+      SegmentAction action,
+      @Nullable LoadPeonCallback callback
+  )
+  {
+    this.segment = segment;
+    this.action = action;
+    this.changeRequest = (action == SegmentAction.DROP)
+                         ? new SegmentChangeRequestDrop(segment)
+                         : new SegmentChangeRequestLoad(segment);
+    if (callback != null) {
+      callbacks.add(callback);
+    }
+  }
+
+  public DataSegment getSegment()
+  {
+    return segment;
+  }
+
+  public SegmentAction getAction()
+  {
+    return action;
+  }
+
+  public boolean isLoad()
+  {
+    return action != SegmentAction.DROP;
+  }
+
+  public DataSegmentChangeRequest getChangeRequest()
+  {
+    return changeRequest;
+  }
+
+  public String getSegmentIdentifier()
+  {
+    return segment.getId().toString();
+  }
+
+  public void addCallback(@Nullable LoadPeonCallback callback)
+  {
+    if (callback != null) {
+      synchronized (callbacks) {
+        callbacks.add(callback);
+      }
+    }
+  }
+
+  /**
+   * Returns an immutable copy of all non-null callbacks for this queued 
segment.
+   */
+  public List<LoadPeonCallback> getCallbacks()
+  {
+    synchronized (callbacks) {
+      return ImmutableList.copyOf(callbacks);
+    }
+  }
+
+  public void markRequestSentToServer()
+  {
+    firstRequestMillis.compareAndSet(0L, System.currentTimeMillis());
+  }
+
+  public boolean isRequestSentToServer()
+  {
+    return firstRequestMillis.get() > 0;
+  }
+
+  public long getMillisSinceFirstRequestToServer()

Review Comment:
   stream of conciousness: should we really have this doing the subtraction 
(and taking the current time millis with each call)?  Or should we have this be 
able to return the firstRequestMillis and do the `.currentTimeMillis()` 
externally once?



##########
server/src/main/java/org/apache/druid/server/coordinator/SegmentAction.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+/**
+ * Represents actions that can be performed on a server for a single segment.
+ * <p>
+ * The different action types can be used to prioritize items in a 
LoadQueuePeon.
+ */
+public enum SegmentAction
+{
+  DROP,
+  LOAD,
+  REPLICATE,
+  MOVE_TO,

Review Comment:
   Please add Javadoc on each of these to explain what they mean.  "REPLICATE" 
and "LOAD" are especially important given that it is not unbelievable that some 
people will come and think that some usage of LOAD should be REPLICATE, when we 
are very explicitly defining them to mean throttled versus unthrottled.



##########
server/src/main/java/org/apache/druid/server/coordinator/HttpLoadQueuePeon.java:
##########
@@ -380,10 +407,12 @@ public void loadSegment(DataSegment segment, 
LoadPeonCallback callback)
       }
 
       SegmentHolder holder = segmentsToLoad.get(segment);
-
       if (holder == null) {
         log.trace("Server[%s] to load segment[%s] queued.", serverId, 
segment.getId());
-        segmentsToLoad.put(segment, new LoadSegmentHolder(segment, callback));
+        queuedSize.addAndGet(segment.getSize());
+        holder = new SegmentHolder(segment, action, callback);
+        segmentsToLoad.put(segment, holder);

Review Comment:
   The old code separated `segmentsToLoad` and `segmentsToDrop` so that it 
could prioritize drops over loads.  If I'm udnerstanding correctly, we are 
doing that prioriziation through the `queuedSegments` prioritization now, which 
makes me wonder if we need to keep the old `segmentsToLoad` and 
`segmentsToDrop` around anymore?  Are those data structures still used for some 
meaningful purpose?



##########
server/src/main/java/org/apache/druid/server/coordinator/SegmentLoader.java:
##########
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.google.common.collect.Sets;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.timeline.DataSegment;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * Used by the coordinator in each run for segment loading, dropping, balancing
+ * and broadcasting.
+ * <p>
+ * An instance of this class is freshly created for each coordinator run.
+ */
+public class SegmentLoader
+{
+  private static final EmittingLogger log = new 
EmittingLogger(SegmentLoader.class);
+
+  private final SegmentStateManager stateManager;
+  private final DruidCluster cluster;
+  private final CoordinatorStats stats = new CoordinatorStats();
+  private final SegmentReplicantLookup replicantLookup;
+  private final ReplicationThrottler replicationThrottler;
+  private final BalancerStrategy strategy;
+
+  private final Set<String> emptyTiers = new HashSet<>();
+
+  public SegmentLoader(
+      SegmentStateManager stateManager,
+      DruidCluster cluster,
+      SegmentReplicantLookup replicantLookup,
+      ReplicationThrottler replicationThrottler,
+      BalancerStrategy strategy
+  )
+  {
+    this.cluster = cluster;
+    this.strategy = strategy;
+    this.stateManager = stateManager;
+    this.replicantLookup = replicantLookup;
+    this.replicationThrottler = replicationThrottler;
+  }
+
+  public CoordinatorStats getStats()
+  {
+    return stats;
+  }
+
+  public void makeAlerts()
+  {
+    if (!emptyTiers.isEmpty()) {
+      log.makeAlert("Tiers %s have no servers! Check your cluster 
configuration.", emptyTiers).emit();
+    }
+  }
+
+  /**
+   * Moves the given segment between two servers of the same tier.
+   * <p>
+   * See if we can move balancing here.
+   */
+  public boolean moveSegment(DataSegment segment, ServerHolder fromServer, 
ServerHolder toServer)
+  {
+    final String tier = toServer.getServer().getTier();
+    if (!fromServer.getServer().getTier().equals(tier)) {
+      return false;
+    }
+
+    if (fromServer.isServingSegment(segment)) {
+      // Segment is loaded on fromServer, move it to toServer
+      return stateManager.moveSegment(segment, fromServer, toServer, 
replicationThrottler.getMaxLifetime());
+    } else if (!fromServer.isLoadingSegment(segment)) {
+      // Cannot move if fromServer is neither loading nor serving the segment
+      return false;
+    }
+
+    // Cancel the load on fromServer and load on toServer instead
+    final boolean loadCancelledOnFromServer =
+        stateManager.cancelOperation(SegmentState.LOADING, segment, 
fromServer);
+    if (loadCancelledOnFromServer) {
+      stats.addToTieredStat(CoordinatorStats.CANCELLED_LOADS, tier, 1);
+      int loadedCountOnTier = 
replicantLookup.getLoadedReplicas(segment.getId(), tier);
+      return stateManager.loadSegment(segment, toServer, loadedCountOnTier < 
1, replicationThrottler);
+    }
+
+    return false;
+  }
+
+  /**
+   * Queues load or drop of replicas of the given segment to achieve the
+   * target replication level on all the tiers.
+   */
+  public void updateReplicas(DataSegment segment, Map<String, Integer> 
tierToReplicaCount)
+  {
+    // Identify empty tiers and determine total required replicas
+    final AtomicInteger requiredTotalReplicas = new AtomicInteger(0);
+    final Set<String> allTiers = Sets.newHashSet(cluster.getTierNames());
+    tierToReplicaCount.forEach((tier, requiredReplicas) -> {
+      reportTierCapacityStats(segment, requiredReplicas, tier);
+      if (!allTiers.contains(tier)) {

Review Comment:
   Once again a nit, but you are doing an `if/else` and have a negative clause 
here.  You can just as easily have the positive clause and use the else 
statement for the `!` case.



##########
server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java:
##########
@@ -156,4 +161,45 @@ public Object2LongMap<String> 
getBroadcastUnderReplication(SegmentId segmentId)
     }
     return perTier;
   }
+
+  /**
+   * Counts of replicas of a segment in different states.
+   */
+  private static class ReplicaCount
+  {
+    int loaded;
+    int loading;
+    int dropping;
+    int moving;
+
+    void addLoaded()
+    {
+      ++loaded;
+    }
+
+    void addQueued(SegmentState state)
+    {
+      switch (state) {

Review Comment:
   Why no increment for `REPLICANT`?



##########
server/src/main/java/org/apache/druid/server/coordinator/SegmentLoader.java:
##########
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.google.common.collect.Sets;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.timeline.DataSegment;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * Used by the coordinator in each run for segment loading, dropping, balancing
+ * and broadcasting.
+ * <p>
+ * An instance of this class is freshly created for each coordinator run.
+ */
+public class SegmentLoader
+{
+  private static final EmittingLogger log = new 
EmittingLogger(SegmentLoader.class);
+
+  private final SegmentStateManager stateManager;
+  private final DruidCluster cluster;
+  private final CoordinatorStats stats = new CoordinatorStats();
+  private final SegmentReplicantLookup replicantLookup;
+  private final ReplicationThrottler replicationThrottler;
+  private final BalancerStrategy strategy;
+
+  private final Set<String> emptyTiers = new HashSet<>();
+
+  public SegmentLoader(
+      SegmentStateManager stateManager,
+      DruidCluster cluster,
+      SegmentReplicantLookup replicantLookup,
+      ReplicationThrottler replicationThrottler,
+      BalancerStrategy strategy
+  )
+  {
+    this.cluster = cluster;
+    this.strategy = strategy;
+    this.stateManager = stateManager;
+    this.replicantLookup = replicantLookup;
+    this.replicationThrottler = replicationThrottler;
+  }
+
+  public CoordinatorStats getStats()
+  {
+    return stats;
+  }
+
+  public void makeAlerts()
+  {
+    if (!emptyTiers.isEmpty()) {
+      log.makeAlert("Tiers %s have no servers! Check your cluster 
configuration.", emptyTiers).emit();
+    }
+  }
+
+  /**
+   * Moves the given segment between two servers of the same tier.
+   * <p>
+   * See if we can move balancing here.
+   */
+  public boolean moveSegment(DataSegment segment, ServerHolder fromServer, 
ServerHolder toServer)
+  {
+    final String tier = toServer.getServer().getTier();
+    if (!fromServer.getServer().getTier().equals(tier)) {
+      return false;
+    }
+
+    if (fromServer.isServingSegment(segment)) {
+      // Segment is loaded on fromServer, move it to toServer
+      return stateManager.moveSegment(segment, fromServer, toServer, 
replicationThrottler.getMaxLifetime());
+    } else if (!fromServer.isLoadingSegment(segment)) {
+      // Cannot move if fromServer is neither loading nor serving the segment
+      return false;
+    }
+
+    // Cancel the load on fromServer and load on toServer instead
+    final boolean loadCancelledOnFromServer =
+        stateManager.cancelOperation(SegmentState.LOADING, segment, 
fromServer);
+    if (loadCancelledOnFromServer) {
+      stats.addToTieredStat(CoordinatorStats.CANCELLED_LOADS, tier, 1);
+      int loadedCountOnTier = 
replicantLookup.getLoadedReplicas(segment.getId(), tier);
+      return stateManager.loadSegment(segment, toServer, loadedCountOnTier < 
1, replicationThrottler);
+    }
+
+    return false;
+  }
+
+  /**
+   * Queues load or drop of replicas of the given segment to achieve the
+   * target replication level on all the tiers.
+   */
+  public void updateReplicas(DataSegment segment, Map<String, Integer> 
tierToReplicaCount)
+  {
+    // Identify empty tiers and determine total required replicas
+    final AtomicInteger requiredTotalReplicas = new AtomicInteger(0);
+    final Set<String> allTiers = Sets.newHashSet(cluster.getTierNames());
+    tierToReplicaCount.forEach((tier, requiredReplicas) -> {
+      reportTierCapacityStats(segment, requiredReplicas, tier);
+      if (!allTiers.contains(tier)) {
+        emptyTiers.add(tier);
+      } else {
+        requiredTotalReplicas.addAndGet(requiredReplicas);
+      }
+    });
+
+    final int totalOverReplication =
+        replicantLookup.getTotalLoadedReplicas(segment.getId()) - 
requiredTotalReplicas.get();
+
+    // Update replicas in every tier
+    int totalDropsQueued = 0;
+    for (String tier : allTiers) {
+      totalDropsQueued += updateReplicasInTier(
+          segment,
+          tier,
+          tierToReplicaCount.getOrDefault(tier, 0),
+          totalOverReplication - totalDropsQueued
+      );
+    }
+  }
+
+  /**
+   * Queues load or drop operations on this tier based on the required
+   * number of replicas and the current state.
+   * <p>
+   * The {@code maxReplicasToDrop} helps to maintain the required level of
+   * replication in the cluster. This ensures that segment read concurrency 
does
+   * not suffer during a tier shift or load rule change.
+   * <p>
+   * Returns the number of new drop operations queued on this tier.
+   */
+  private int updateReplicasInTier(
+      DataSegment segment,
+      String tier,
+      int requiredReplicas,
+      int maxReplicasToDrop
+  )
+  {
+    final int projectedReplicas = 
replicantLookup.getProjectedReplicas(segment.getId(), tier);
+    final int movingReplicas = 
replicantLookup.getMovingReplicas(segment.getId(), tier);
+    final boolean shouldCancelMoves = requiredReplicas == 0 && movingReplicas 
> 0;
+
+    // Check if there is any action required on this tier
+    if (projectedReplicas == requiredReplicas && !shouldCancelMoves) {
+      return 0;
+    }
+
+    SegmentTierStatus segmentStatus = new SegmentTierStatus(segment, 
cluster.getHistoricalsByTier(tier));
+
+    // Cancel all moves in this tier if it does not need to have replicas
+    if (shouldCancelMoves) {
+      int cancelledMoves =
+          cancelOperations(movingReplicas, SegmentState.MOVING_TO, segment, 
segmentStatus);
+      stats.addToTieredStat(CoordinatorStats.CANCELLED_MOVES, tier, 
cancelledMoves);
+    }
+
+    // Cancel drops and queue loads if the projected count is below the 
requirement
+    if (projectedReplicas < requiredReplicas) {
+      int replicaDeficit = requiredReplicas - projectedReplicas;
+      int cancelledDrops =
+          cancelOperations(replicaDeficit, SegmentState.DROPPING, segment, 
segmentStatus);
+
+      // Cancelled drops can be counted as loaded replicas, thus reducing 
deficit
+      int numReplicasToLoad = replicaDeficit - cancelledDrops;
+      if (numReplicasToLoad > 0) {
+        boolean isFirstLoadOnTier = 
replicantLookup.getLoadedReplicas(segment.getId(), tier)
+                                    + cancelledDrops < 1;
+        int numLoadsQueued = loadReplicas(numReplicasToLoad, segment, tier, 
segmentStatus, isFirstLoadOnTier);
+        stats.addToTieredStat(CoordinatorStats.ASSIGNED_COUNT, tier, 
numLoadsQueued);
+      }
+    }
+
+    // Cancel loads and queue drops if the projected count exceeds the 
requirement
+    if (projectedReplicas > requiredReplicas) {
+      int replicaSurplus = projectedReplicas - requiredReplicas;
+      int cancelledLoads =
+          cancelOperations(replicaSurplus, SegmentState.LOADING, segment, 
segmentStatus);
+      stats.addToTieredStat(CoordinatorStats.CANCELLED_LOADS, tier, 
cancelledLoads);
+
+      int numReplicasToDrop = Math.min(replicaSurplus - cancelledLoads, 
maxReplicasToDrop);
+      if (numReplicasToDrop > 0) {
+        int dropsQueuedOnTier = dropReplicas(numReplicasToDrop, segment, tier, 
segmentStatus);
+        stats.addToTieredStat(CoordinatorStats.DROPPED_COUNT, tier, 
dropsQueuedOnTier);
+        return dropsQueuedOnTier;
+      }
+    }
+
+    return 0;
+  }
+
+  private void reportTierCapacityStats(DataSegment segment, int 
requiredReplicas, String tier)
+  {
+    stats.accumulateMaxTieredStat(
+        CoordinatorStats.MAX_REPLICATION_FACTOR,
+        tier,
+        requiredReplicas
+    );
+    stats.addToTieredStat(
+        CoordinatorStats.REQUIRED_CAPACITY,
+        tier,
+        segment.getSize() * requiredReplicas
+    );
+  }
+
+  /**
+   * Broadcasts the given segment to all servers that are broadcast targets and
+   * queues a drop of the segment from decommissioning servers.
+   */
+  public void broadcastSegment(DataSegment segment)
+  {
+    int assignedCount = 0;
+    int droppedCount = 0;
+    for (ServerHolder server : cluster.getAllServers()) {
+      // Ignore servers which are not broadcast targets
+      if (!server.getServer().getType().isSegmentBroadcastTarget()) {
+        continue;
+      }
+
+      if (server.isDecommissioning()) {
+        droppedCount += dropBroadcastSegment(segment, server) ? 1 : 0;
+      } else {
+        assignedCount += loadBroadcastSegment(segment, server) ? 1 : 0;
+      }
+    }
+
+    if (assignedCount > 0) {
+      stats.addToDataSourceStat(CoordinatorStats.BROADCAST_LOADS, 
segment.getDataSource(), assignedCount);
+    }
+    if (droppedCount > 0) {
+      stats.addToDataSourceStat(CoordinatorStats.BROADCAST_DROPS, 
segment.getDataSource(), droppedCount);
+    }
+  }
+
+  /**
+   * Marks the given segment as unused.
+   */
+  public void deleteSegment(DataSegment segment)
+  {
+    stateManager.deleteSegment(segment);
+    stats.addToGlobalStat(CoordinatorStats.DELETED_COUNT, 1);
+  }
+
+  /**
+   * Loads the broadcast segment if it is not loaded on the given server.
+   * Returns true only if the segment was successfully queued for load on the 
server.
+   */
+  private boolean loadBroadcastSegment(DataSegment segment, ServerHolder 
server)
+  {
+    final SegmentState state = server.getSegmentState(segment);
+    if (state == SegmentState.LOADED || state == SegmentState.LOADING) {
+      return false;
+    }
+
+    // Cancel drop if it is in progress
+    boolean dropCancelled = 
stateManager.cancelOperation(SegmentState.DROPPING, segment, server);
+    if (dropCancelled) {
+      return false;
+    }
+
+    if (server.canLoadSegment(segment)
+        && stateManager.loadSegment(segment, server, true, 
replicationThrottler)) {
+      return true;
+    } else {
+      log.makeAlert("Failed to assign broadcast segment for datasource [%s]", 
segment.getDataSource())
+         .addData("segmentId", segment.getId())
+         .addData("segmentSize", segment.getSize())
+         .addData("hostName", server.getServer().getHost())
+         .addData("availableSize", server.getAvailableSize())
+         .emit();
+      return false;
+    }
+  }
+
+  /**
+   * Drops the broadcast segment if it is loaded on the given server.
+   * Returns true only if the segment was successfully queued for drop on the 
server.
+   */
+  private boolean dropBroadcastSegment(DataSegment segment, ServerHolder 
server)
+  {
+    final SegmentState state = server.getSegmentState(segment);
+    if (state == SegmentState.NONE || state == SegmentState.DROPPING) {
+      return false;
+    }
+
+    // Cancel load if it is in progress
+    boolean loadCancelled = stateManager.cancelOperation(SegmentState.LOADING, 
segment, server);
+    if (loadCancelled) {
+      return false;
+    }
+
+    return stateManager.dropSegment(segment, server);
+  }
+
+  /**
+   * Queues drop of {@code numToDrop} replicas of the segment from a tier.
+   * Tries to drop replicas first from decommissioning servers and then from
+   * active servers.
+   * <p>
+   * Returns the number of successfully queued drop operations.
+   */
+  private int dropReplicas(
+      int numToDrop,
+      DataSegment segment,
+      String tier,
+      SegmentTierStatus segmentStatus
+  )
+  {
+    final List<ServerHolder> eligibleServers = 
segmentStatus.getServers(SegmentState.LOADED);
+    if (eligibleServers.isEmpty() || numToDrop <= 0) {
+      return 0;
+    }
+
+    final TreeSet<ServerHolder> eligibleLiveServers = new TreeSet<>();
+    final TreeSet<ServerHolder> eligibleDyingServers = new TreeSet<>();
+    for (ServerHolder server : eligibleServers) {
+      if (!server.isServingSegment(segment)) {
+        // ignore this server
+      } else if (server.isDecommissioning()) {
+        eligibleDyingServers.add(server);
+      } else {
+        eligibleLiveServers.add(server);
+      }
+    }
+
+    // Drop as many replicas as possible from decommissioning servers

Review Comment:
   Why drop things from a decommissioning server?  As long as the server is up, 
the dta is available it can be used.  If you don't want the server to be used 
for anything, just `kill -9` the process.  If it's up and working, keep using 
it until it's `kill -9`d.  If we are going to support decommissioning, it 
shouldn't be a "I need to remove things from this server" but rather "I'm going 
to pretend as if that server doesn't exist anymore".
   
   That said, decommissioning for historicals is not a really good model.  
Instead, we need the ability to start up as a replica.



##########
server/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyTest.java:
##########
@@ -84,29 +84,32 @@ public void findNewSegmentHomeReplicatorNotEnoughSpace()
         new LoadQueuePeonTester());
     serverHolders = new ArrayList<>();
     serverHolders.add(serverHolder);
-    final ServerHolder foundServerHolder = 
balancerStrategy.findNewSegmentHomeReplicator(proposedDataSegment, 
serverHolders);
     // since there is not enough space on server having available size 10L to 
host a segment of size 11L, it should be null
-    Assert.assertNull(foundServerHolder);
+    Assert.assertFalse(
+        balancerStrategy.findNewSegmentHomeReplicator(proposedDataSegment, 
serverHolders)
+                        .hasNext()
+    );

Review Comment:
   This change is doing what I always tell people happens to comments: the code 
changed and left the comment stale.  Please fix the comment along with the code 
(or maybe we don't need the comment anymore?



##########
server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java:
##########
@@ -119,35 +118,39 @@ public void 
testOneTierTwoReplicantsWithStrictReplicantLimit()
         )).atLeastOnce();
     EasyMock.replay(databaseRuleManager);
 
-    DruidCluster druidCluster = DruidClusterBuilder
+    // server1 has all the segments already loaded
+    final DruidServer server1 =
+        new DruidServer("server1", "hostNorm", null, 1000, 
ServerType.HISTORICAL, "normal", 0);
+    usedSegments.forEach(server1::addDataSegment);
+
+    final DruidServer server2 =
+        new DruidServer("server2", "hostNorm", null, 1000, 
ServerType.HISTORICAL, "normal", 0);
+
+    final DruidCluster druidCluster = DruidClusterBuilder
         .newBuilder()
         .addTier(
             "normal",
-            new ServerHolder(
-                new DruidServer("serverNorm", "hostNorm", null, 1000, 
ServerType.HISTORICAL, "normal", 0)
-                    .toImmutableDruidServer(),
-                mockPeon
-            ),
-            new ServerHolder(
-                new DruidServer("serverNorm2", "hostNorm2", null, 1000, 
ServerType.HISTORICAL, "normal", 0)
-                    .toImmutableDruidServer(),
-                mockPeon
-            )
-        )
-        .build();
+            new ServerHolder(server1.toImmutableDruidServer(), mockPeon),
+            new ServerHolder(server2.toImmutableDruidServer(), mockPeon)
+        ).build();
 
     ListeningExecutorService exec = 
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
     BalancerStrategy balancerStrategy = new 
CostBalancerStrategyFactory().createBalancerStrategy(exec);
 
     DruidCoordinatorRuntimeParams params = 
makeCoordinatorRuntimeParams(druidCluster, balancerStrategy)
-        
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(5).withMaxNonPrimaryReplicantsToLoad(10).build())
+        .withDynamicConfigs(
+            CoordinatorDynamicConfig
+                .builder()
+                .withMaxNonPrimaryReplicantsToLoad(10)
+                .build()
+        )
         .build();
 
     DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
     CoordinatorStats stats = afterParams.getCoordinatorStats();
 
-    Assert.assertEquals(34L, stats.getTieredStat("assignedCount", "normal"));
-    Assert.assertEquals(10L, 
stats.getGlobalStat("totalNonPrimaryReplicantsLoaded"));
+    // There are 24 under-replicated segments, but only 10 replicas are 
assigned

Review Comment:
   Is there a stat that can be looked at and asserted on the 24 number?



##########
server/src/main/java/org/apache/druid/server/coordinator/SegmentLoader.java:
##########
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.google.common.collect.Sets;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.timeline.DataSegment;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * Used by the coordinator in each run for segment loading, dropping, balancing
+ * and broadcasting.
+ * <p>
+ * An instance of this class is freshly created for each coordinator run.
+ */
+public class SegmentLoader
+{
+  private static final EmittingLogger log = new 
EmittingLogger(SegmentLoader.class);
+
+  private final SegmentStateManager stateManager;
+  private final DruidCluster cluster;
+  private final CoordinatorStats stats = new CoordinatorStats();
+  private final SegmentReplicantLookup replicantLookup;
+  private final ReplicationThrottler replicationThrottler;
+  private final BalancerStrategy strategy;
+
+  private final Set<String> emptyTiers = new HashSet<>();
+
+  public SegmentLoader(
+      SegmentStateManager stateManager,
+      DruidCluster cluster,
+      SegmentReplicantLookup replicantLookup,
+      ReplicationThrottler replicationThrottler,
+      BalancerStrategy strategy
+  )
+  {
+    this.cluster = cluster;
+    this.strategy = strategy;
+    this.stateManager = stateManager;
+    this.replicantLookup = replicantLookup;
+    this.replicationThrottler = replicationThrottler;
+  }
+
+  public CoordinatorStats getStats()
+  {
+    return stats;
+  }
+
+  public void makeAlerts()
+  {
+    if (!emptyTiers.isEmpty()) {
+      log.makeAlert("Tiers %s have no servers! Check your cluster 
configuration.", emptyTiers).emit();
+    }
+  }
+
+  /**
+   * Moves the given segment between two servers of the same tier.
+   * <p>
+   * See if we can move balancing here.
+   */
+  public boolean moveSegment(DataSegment segment, ServerHolder fromServer, 
ServerHolder toServer)
+  {
+    final String tier = toServer.getServer().getTier();
+    if (!fromServer.getServer().getTier().equals(tier)) {
+      return false;
+    }
+
+    if (fromServer.isServingSegment(segment)) {
+      // Segment is loaded on fromServer, move it to toServer
+      return stateManager.moveSegment(segment, fromServer, toServer, 
replicationThrottler.getMaxLifetime());
+    } else if (!fromServer.isLoadingSegment(segment)) {

Review Comment:
   This is a nit, but perhaps remove the negation here and swap the bodies of 
the clauses?  Every `!` in a boolean clause just invites confusion about what 
the clause is trying to say.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to