imply-cheddar commented on code in PR #13197:
URL: https://github.com/apache/druid/pull/13197#discussion_r994014871


##########
server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java:
##########
@@ -46,13 +46,18 @@
   ServerHolder findNewSegmentHomeBalancer(DataSegment proposalSegment, 
List<ServerHolder> serverHolders);
 
   /**
-   * Find the best server on which to place a {@link DataSegment} replica 
according to the balancing strategy
+   * Finds the best servers on which to place a replica of the {@code 
proposalSegment}
+   * according to the balancing strategy.

Review Comment:
   This comment makes the method seem like it's finding homes for replicas.  
But the name of the methods makes me think it's finding homes for new segments. 
 Or maybe "new" is trying to modify "home" instead of segment, English is hard. 
 
   
   Either way, I think that we would do better to have a much more clear 
distinction in language between a replica (i.e. the assignment of a segment to 
a node when the data is already available in the tier) versus a "new segment" 
(i.e. the assignment of a segment to a node when the data is currently 
unavailable in the tier).



##########
server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java:
##########
@@ -181,10 +183,10 @@ public int getNumberOfSegmentsInQueue()
   }
 
   @Override
-  public void loadSegment(final DataSegment segment, @Nullable final 
LoadPeonCallback callback)
+  public void loadSegment(final DataSegment segment, SegmentAction action, 
@Nullable final LoadPeonCallback callback)
   {
-    SegmentHolder segmentHolder = new SegmentHolder(segment, Action.LOAD, 
Collections.singletonList(callback));
-    final SegmentHolder existingHolder = segmentsToLoad.putIfAbsent(segment, 
segmentHolder);
+    QueuedSegment segmentHolder = new QueuedSegment(segment, action, callback);

Review Comment:
   this variable is now not a great name?
   
   That said, I would've expected the queue itself to return a `QueuedSegment` 
rather than something creating one of those before it's actually queued...



##########
server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java:
##########
@@ -138,7 +138,7 @@ public Set<ServerHolder> getBrokers()
     return brokers;
   }
 
-  public Iterable<String> getTierNames()
+  public Set<String> getTierNames()

Review Comment:
   Remind me if `DruidCluster` is mutable or not?  I ask because the switch of 
this response can enable people to think that they got a set, so they can 
mutate it.  Or otherwise if the `historicals.keySet()` is changing, they could 
see those changes when they didn't expect to.  The old signature would've 
likely caused used to read the `Iterable` into something else "effecting" 
immutable usage.  I'm wondering how much risk there is of that sort of thing 
going on?
   
   I.e. how beneficial is it really to return the Set from here instead of 
having the call sites build their own Set if they want a Set?



##########
server/src/main/java/org/apache/druid/server/coordinator/SegmentLoader.java:
##########
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.google.common.collect.Sets;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.timeline.DataSegment;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+/**
+ * Used by the coordinator in each run for segment loading, dropping, balancing
+ * and broadcasting.
+ * <p>
+ * An instance of this class is freshly created for each coordinator run.
+ */
+public class SegmentLoader
+{
+  private static final EmittingLogger log = new 
EmittingLogger(SegmentLoader.class);
+
+  private final SegmentStateManager stateManager;
+  private final DruidCluster cluster;
+  private final CoordinatorStats stats = new CoordinatorStats();
+  private final SegmentReplicantLookup replicantLookup;
+  private final BalancerStrategy strategy;
+  private final int maxLoadQueueSize;
+
+  public SegmentLoader(SegmentStateManager stateManager, 
DruidCoordinatorRuntimeParams runParams)
+  {
+    this.stateManager = stateManager;
+    this.strategy = runParams.getBalancerStrategy();
+    this.cluster = runParams.getDruidCluster();
+    this.replicantLookup = runParams.getSegmentReplicantLookup();
+    this.maxLoadQueueSize = runParams.getCoordinatorDynamicConfig()
+                                     .getMaxSegmentsInNodeLoadingQueue();
+  }
+
+  public CoordinatorStats getStats()
+  {
+    return stats;
+  }
+
+  /**
+   * Moves the given segment between two servers of the same tier.
+   * <p>
+   * See if we can move balancing here.
+   */
+  public boolean moveSegment(DataSegment segment, ServerHolder fromServer, 
ServerHolder toServer)
+  {
+    if 
(!fromServer.getServer().getTier().equals(toServer.getServer().getTier())) {
+      return false;
+    }
+
+    // fromServer must be loading or serving the segment
+    // and toServer must be able to load it
+    final SegmentState stateOnSrc = fromServer.getSegmentState(segment);
+    if ((stateOnSrc != SegmentState.LOADING && stateOnSrc != 
SegmentState.LOADED)
+        || !canLoadSegment(toServer, segment)) {
+      return false;
+    }
+
+    final boolean cancelSuccess = stateOnSrc == SegmentState.LOADING
+                                  && 
stateManager.cancelOperation(SegmentState.LOADING, segment, fromServer);
+
+    if (cancelSuccess) {
+      int loadedCountOnTier = replicantLookup
+          .getLoadedReplicants(segment.getId(), 
toServer.getServer().getTier());
+      stateManager.loadSegment(segment, toServer, loadedCountOnTier < 1);
+    } else {
+      return stateManager.moveSegment(segment, fromServer, toServer);
+    }
+
+    return true;
+  }
+
+  /**
+   * Queues load or drop of replicas of the given segment to achieve the
+   * target replication level in all the tiers.
+   */
+  public void updateReplicas(DataSegment segment, Map<String, Integer> 
tierToReplicaCount)
+  {
+    // Handle every target tier
+    tierToReplicaCount.forEach((tier, numReplicas) -> {
+      updateReplicasOnTier(segment, tier, tierToReplicaCount.get(tier));
+      stats.addToTieredStat(CoordinatorStats.REQUIRED_CAPACITY, tier, 
segment.getSize() * numReplicas);
+    });
+
+    // Find the minimum number of segments required for fault tolerance
+    final int totalTargetReplicas = tierToReplicaCount.values().stream()
+                                                      .reduce(0, Integer::sum);
+    final int minLoadedSegments = totalTargetReplicas > 1 ? 2 : 1;
+
+    // Drop segment from unneeded tiers if requirement is met across target 
tiers
+    int loadedTargetReplicas = 0;
+    final Set<String> targetTiers = tierToReplicaCount.keySet();
+    for (String tier : targetTiers) {
+      loadedTargetReplicas += 
replicantLookup.getLoadedReplicants(segment.getId(), tier);
+    }
+    if (loadedTargetReplicas < minLoadedSegments) {
+      return;
+    }

Review Comment:
   I'm perhaps missing something here, but the logic seems to be saying "if the 
replication factor is set higher than 1, make sure that there are 2 replicas 
and no more", which seems... weird.  We should have the target number that the 
user told us to have?



##########
server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java:
##########
@@ -63,7 +56,7 @@
  * of the same or different methods.
  */
 @Deprecated
-public class CuratorLoadQueuePeon extends LoadQueuePeon
+public class CuratorLoadQueuePeon implements LoadQueuePeon

Review Comment:
   I *think* the change is trying to simplify the management of the queues.  In 
which case, I *think* the abstraction you want is a `LoadQueue` that is given 
segment load/drop requests and then can be read from "in order".  That could 
then be used by either Peon to do what it needs to that.
   
   Additionally though, the CuratorLoadQueuePeon is effectively broken at this 
point anyway because it puts all of the znodes on ZK as quickly as possible and 
it's probably too expensive to really fix (we should just use http and ignore 
zk for this), so I don't see a reason to fix it.  So, another approach is to 
consider the ZK based stuff dead and only improve on the http stuff.
   
   We should likely queue up the death of the zk-based stuff too.



##########
server/src/main/java/org/apache/druid/server/coordinator/SegmentLoader.java:
##########
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.google.common.collect.Sets;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.timeline.DataSegment;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+/**
+ * Used by the coordinator in each run for segment loading, dropping, balancing
+ * and broadcasting.
+ * <p>
+ * An instance of this class is freshly created for each coordinator run.
+ */
+public class SegmentLoader
+{
+  private static final EmittingLogger log = new 
EmittingLogger(SegmentLoader.class);
+
+  private final SegmentStateManager stateManager;
+  private final DruidCluster cluster;
+  private final CoordinatorStats stats = new CoordinatorStats();
+  private final SegmentReplicantLookup replicantLookup;
+  private final BalancerStrategy strategy;
+  private final int maxLoadQueueSize;
+
+  public SegmentLoader(SegmentStateManager stateManager, 
DruidCoordinatorRuntimeParams runParams)
+  {
+    this.stateManager = stateManager;
+    this.strategy = runParams.getBalancerStrategy();
+    this.cluster = runParams.getDruidCluster();
+    this.replicantLookup = runParams.getSegmentReplicantLookup();
+    this.maxLoadQueueSize = runParams.getCoordinatorDynamicConfig()
+                                     .getMaxSegmentsInNodeLoadingQueue();
+  }
+
+  public CoordinatorStats getStats()
+  {
+    return stats;
+  }
+
+  /**
+   * Moves the given segment between two servers of the same tier.
+   * <p>
+   * See if we can move balancing here.
+   */
+  public boolean moveSegment(DataSegment segment, ServerHolder fromServer, 
ServerHolder toServer)
+  {
+    if 
(!fromServer.getServer().getTier().equals(toServer.getServer().getTier())) {
+      return false;
+    }
+
+    // fromServer must be loading or serving the segment
+    // and toServer must be able to load it
+    final SegmentState stateOnSrc = fromServer.getSegmentState(segment);
+    if ((stateOnSrc != SegmentState.LOADING && stateOnSrc != 
SegmentState.LOADED)
+        || !canLoadSegment(toServer, segment)) {
+      return false;
+    }
+
+    final boolean cancelSuccess = stateOnSrc == SegmentState.LOADING
+                                  && 
stateManager.cancelOperation(SegmentState.LOADING, segment, fromServer);
+
+    if (cancelSuccess) {
+      int loadedCountOnTier = replicantLookup
+          .getLoadedReplicants(segment.getId(), 
toServer.getServer().getTier());
+      stateManager.loadSegment(segment, toServer, loadedCountOnTier < 1);
+    } else {
+      return stateManager.moveSegment(segment, fromServer, toServer);
+    }
+
+    return true;
+  }
+
+  /**
+   * Queues load or drop of replicas of the given segment to achieve the
+   * target replication level in all the tiers.
+   */
+  public void updateReplicas(DataSegment segment, Map<String, Integer> 
tierToReplicaCount)
+  {
+    // Handle every target tier
+    tierToReplicaCount.forEach((tier, numReplicas) -> {
+      updateReplicasOnTier(segment, tier, tierToReplicaCount.get(tier));
+      stats.addToTieredStat(CoordinatorStats.REQUIRED_CAPACITY, tier, 
segment.getSize() * numReplicas);
+    });
+
+    // Find the minimum number of segments required for fault tolerance
+    final int totalTargetReplicas = tierToReplicaCount.values().stream()
+                                                      .reduce(0, Integer::sum);
+    final int minLoadedSegments = totalTargetReplicas > 1 ? 2 : 1;
+
+    // Drop segment from unneeded tiers if requirement is met across target 
tiers
+    int loadedTargetReplicas = 0;
+    final Set<String> targetTiers = tierToReplicaCount.keySet();
+    for (String tier : targetTiers) {
+      loadedTargetReplicas += 
replicantLookup.getLoadedReplicants(segment.getId(), tier);
+    }
+    if (loadedTargetReplicas < minLoadedSegments) {
+      return;
+    }
+
+    final Set<String> dropTiers = Sets.newHashSet(cluster.getTierNames());
+    dropTiers.removeAll(targetTiers);
+    for (String dropTier : dropTiers) {
+      updateReplicasOnTier(segment, dropTier, 0);
+    }
+  }
+
+  /**
+   * Broadcasts the given segment to all servers that are broadcast targets and
+   * queues a drop of the segment from decommissioning servers.
+   */
+  public void broadcastSegment(DataSegment segment)
+  {
+    int assignedCount = 0;
+    int droppedCount = 0;
+    for (ServerHolder server : cluster.getAllServers()) {
+      // Ignore servers which are not broadcast targets
+      if (!server.getServer().getType().isSegmentBroadcastTarget()) {
+        continue;
+      }
+
+      if (server.isDecommissioning()) {
+        droppedCount += dropBroadcastSegment(segment, server) ? 1 : 0;
+      } else {
+        assignedCount += loadBroadcastSegment(segment, server) ? 1 : 0;
+      }
+    }
+
+    if (assignedCount > 0) {
+      stats.addToDataSourceStat(CoordinatorStats.BROADCAST_LOADS, 
segment.getDataSource(), assignedCount);
+    }
+    if (droppedCount > 0) {
+      stats.addToDataSourceStat(CoordinatorStats.BROADCAST_DROPS, 
segment.getDataSource(), droppedCount);
+    }
+  }
+
+  /**
+   * Marks the given segment as unused.
+   */
+  public void deleteSegment(DataSegment segment)
+  {
+    stateManager.deleteSegment(segment);
+    stats.addToGlobalStat(CoordinatorStats.DELETED_SEGMENTS, 1);
+  }
+
+  /**
+   * Checks if the server can load the given segment.
+   * <p>
+   * A load is possible only if the server meets all of the following criteria:
+   * <ul>
+   *   <li>is not already serving or loading the segment</li>
+   *   <li>is not being decommissioned</li>
+   *   <li>has not already exceeded the load queue limit in this run</li>
+   *   <li>has available disk space</li>
+   * </ul>
+   */
+  public boolean canLoadSegment(ServerHolder server, DataSegment segment)
+  {
+    return server.canLoadSegment(segment)

Review Comment:
   I find myself wondering why the ServerHolder doesn't know about its own 
loadQueue?  Seems like this entire thing can be delegated.



##########
server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java:
##########
@@ -255,6 +253,8 @@ public DruidCoordinator(
     this.coordLeaderSelector = coordLeaderSelector;
     this.objectMapper = objectMapper;
     this.compactSegments = initializeCompactSegmentsDuty();
+    this.segmentStateManager =
+        new SegmentStateManager(serverInventoryView, segmentsMetadataManager, 
taskMaster.isHttpLoading());

Review Comment:
   Mixing `new` with DI always creates sadness.  This should be DI'd not new'd



##########
server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java:
##########
@@ -986,6 +887,7 @@ public DruidCoordinatorRuntimeParams 
run(DruidCoordinatorRuntimeParams params)
 
       stopPeonsForDisappearedServers(currentServers);
 
+      segmentStateManager.prepareForRun(params);

Review Comment:
   this call is a bit scary to me.  It means that the `segmentStateManager` 
might be used in a context where it wasn't initialized "properly" (through a 
bug or whatever).  Perhaps consider the `Coordinator` having a factory for 
`SegmentStateManager` objects which are build with params, those objects then 
will only exist fully initialized.  Alternatively, the params could be passed 
in as part of the arguments to the various methods on `SegmentStateManager` 
instead.



##########
server/src/main/java/org/apache/druid/server/coordinator/HttpLoadQueuePeon.java:
##########
@@ -366,8 +372,9 @@ public void stop()
   }
 
   @Override
-  public void loadSegment(DataSegment segment, LoadPeonCallback callback)
+  public void loadSegment(DataSegment segment, SegmentAction action, 
LoadPeonCallback callback)
   {
+    Preconditions.checkArgument(action != SegmentAction.DROP);

Review Comment:
   If this exception actually gets thrown, what's that going to do to the code 
cycle?  Will it effectively cause a deadlock in the coordinator because every 
run will cause the exception to get thrown and it never makes progress?  
Perhaps it would be better to check, log and return with an immediate failure 
on the callback or something than to throw an exception and potentially halt 
progress?



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java:
##########
@@ -43,41 +42,31 @@ public class EmitClusterStatsAndMetrics implements 
CoordinatorDuty
 {
   private static final Logger log = new 
Logger(EmitClusterStatsAndMetrics.class);
 
-  public static final String TOTAL_CAPACITY = "totalCapacity";
   public static final String TOTAL_HISTORICAL_COUNT = "totalHistoricalCount";
   public static final String MAX_REPLICATION_FACTOR = "maxReplicationFactor";
 
   private final DruidCoordinator coordinator;
   private final String groupName;
   private final boolean isContainCompactSegmentDuty;
+  private final ServiceEmitter emitter;
 
-  public EmitClusterStatsAndMetrics(DruidCoordinator coordinator, String 
groupName, boolean isContainCompactSegmentDuty)
+  public EmitClusterStatsAndMetrics(
+      DruidCoordinator coordinator,
+      String groupName,
+      boolean isContainCompactSegmentDuty,
+      ServiceEmitter emitter

Review Comment:
   The old code passed the emitter in on the actual emit call instead of on the 
constructor here.  Technically speaking, the `ServiceEmitter` carries with it a 
number of dimensions that it automatically adds to metrics, so having it at the 
call-site instead of the constructor allows for the metrics to be emitted with 
different sets of dimensions that this class doesn't know about.  I do now know 
if we were actually taking advantage of that in the previous code, but if we 
were then this refactor could be breaking things.  Have you validated that we 
aren't taking advantage of that?



##########
server/src/main/java/org/apache/druid/server/coordinator/HttpLoadQueuePeon.java:
##########
@@ -470,134 +486,98 @@ public Set<DataSegment> getSegmentsMarkedToDrop()
     return Collections.unmodifiableSet(segmentsMarkedToDrop);
   }
 
-  private abstract class SegmentHolder
+  /**
+   * A request is considered to have timed out if the time elapsed since it was
+   * first sent to the server is greater than the configured load timeout.
+   *
+   * @see DruidCoordinatorConfig#getLoadTimeoutDelay()
+   */
+  private boolean hasRequestTimedOut(QueuedSegment holder)
   {
-    private final DataSegment segment;
-    private final DataSegmentChangeRequest changeRequest;
-    private final List<LoadPeonCallback> callbacks = new ArrayList<>();
-
-    // Time when this request was sent to target server the first time.
-    private volatile long scheduleTime = -1;
-
-    private SegmentHolder(
-        DataSegment segment,
-        DataSegmentChangeRequest changeRequest,
-        LoadPeonCallback callback
-    )
-    {
-      this.segment = segment;
-      this.changeRequest = changeRequest;
-
-      if (callback != null) {
-        this.callbacks.add(callback);
-      }
-    }
-
-    public void addCallback(LoadPeonCallback newCallback)
-    {
-      synchronized (callbacks) {
-        if (newCallback != null) {
-          callbacks.add(newCallback);
-        }
-      }
-    }
-
-    public DataSegment getSegment()
-    {
-      return segment;
-    }
-
-    public DataSegmentChangeRequest getChangeRequest()
-    {
-      return changeRequest;
-    }
-
-    public boolean hasTimedOut()
-    {
-      if (scheduleTime < 0) {
-        scheduleTime = System.currentTimeMillis();
-        return false;
-      } else if (System.currentTimeMillis() - scheduleTime > 
config.getLoadTimeoutDelay().getMillis()) {
-        return true;
-      } else {
-        return false;
-      }
-    }
+    return System.currentTimeMillis() - holder.getFirstRequestTimeMillis()

Review Comment:
   I fear that this logic is broken.  The call to `getFirstRequestTimeMillis()` 
ends up actually setting the current timestamp.  But, I don't know why this 
check is only actually done once the request is sent to the downstream server.  
Rather than relying on such side-effects we would be better off having an 
explicit lifecycle to the assignment and asking the holder itself how long it 
has been queued at the server.  I.e. 
`holder.getTimeElapsedSinceFirstRequestToServer()` or something like that, 
which can return `null` if there hasn't been any request to the server (and 
thus should never be timed out)



##########
server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java:
##########
@@ -26,24 +26,20 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.NavigableSet;
-import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 
 public class RandomBalancerStrategy implements BalancerStrategy
 {
   @Override
-  public ServerHolder findNewSegmentHomeReplicator(DataSegment 
proposalSegment, List<ServerHolder> serverHolders)
+  public Iterator<ServerHolder> findNewSegmentHomeReplicator(DataSegment 
proposalSegment, List<ServerHolder> serverHolders)

Review Comment:
   Sometimes returning an `Iterator<>` is right, this signature feels like we 
might as well just return the `List<>` instead?



##########
server/src/main/java/org/apache/druid/server/coordinator/SegmentLoader.java:
##########
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.google.common.collect.Sets;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.timeline.DataSegment;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+/**
+ * Used by the coordinator in each run for segment loading, dropping, balancing
+ * and broadcasting.
+ * <p>
+ * An instance of this class is freshly created for each coordinator run.
+ */
+public class SegmentLoader
+{
+  private static final EmittingLogger log = new 
EmittingLogger(SegmentLoader.class);
+
+  private final SegmentStateManager stateManager;
+  private final DruidCluster cluster;
+  private final CoordinatorStats stats = new CoordinatorStats();
+  private final SegmentReplicantLookup replicantLookup;
+  private final BalancerStrategy strategy;
+  private final int maxLoadQueueSize;
+
+  public SegmentLoader(SegmentStateManager stateManager, 
DruidCoordinatorRuntimeParams runParams)
+  {
+    this.stateManager = stateManager;
+    this.strategy = runParams.getBalancerStrategy();
+    this.cluster = runParams.getDruidCluster();
+    this.replicantLookup = runParams.getSegmentReplicantLookup();
+    this.maxLoadQueueSize = runParams.getCoordinatorDynamicConfig()
+                                     .getMaxSegmentsInNodeLoadingQueue();
+  }
+
+  public CoordinatorStats getStats()
+  {
+    return stats;
+  }
+
+  /**
+   * Moves the given segment between two servers of the same tier.
+   * <p>
+   * See if we can move balancing here.
+   */
+  public boolean moveSegment(DataSegment segment, ServerHolder fromServer, 
ServerHolder toServer)
+  {
+    if 
(!fromServer.getServer().getTier().equals(toServer.getServer().getTier())) {
+      return false;
+    }
+
+    // fromServer must be loading or serving the segment
+    // and toServer must be able to load it
+    final SegmentState stateOnSrc = fromServer.getSegmentState(segment);
+    if ((stateOnSrc != SegmentState.LOADING && stateOnSrc != 
SegmentState.LOADED)
+        || !canLoadSegment(toServer, segment)) {
+      return false;
+    }
+
+    final boolean cancelSuccess = stateOnSrc == SegmentState.LOADING
+                                  && 
stateManager.cancelOperation(SegmentState.LOADING, segment, fromServer);
+
+    if (cancelSuccess) {
+      int loadedCountOnTier = replicantLookup
+          .getLoadedReplicants(segment.getId(), 
toServer.getServer().getTier());
+      stateManager.loadSegment(segment, toServer, loadedCountOnTier < 1);
+    } else {
+      return stateManager.moveSegment(segment, fromServer, toServer);
+    }
+
+    return true;
+  }
+
+  /**
+   * Queues load or drop of replicas of the given segment to achieve the
+   * target replication level in all the tiers.
+   */
+  public void updateReplicas(DataSegment segment, Map<String, Integer> 
tierToReplicaCount)
+  {
+    // Handle every target tier
+    tierToReplicaCount.forEach((tier, numReplicas) -> {
+      updateReplicasOnTier(segment, tier, tierToReplicaCount.get(tier));
+      stats.addToTieredStat(CoordinatorStats.REQUIRED_CAPACITY, tier, 
segment.getSize() * numReplicas);
+    });
+
+    // Find the minimum number of segments required for fault tolerance
+    final int totalTargetReplicas = tierToReplicaCount.values().stream()
+                                                      .reduce(0, Integer::sum);
+    final int minLoadedSegments = totalTargetReplicas > 1 ? 2 : 1;
+
+    // Drop segment from unneeded tiers if requirement is met across target 
tiers
+    int loadedTargetReplicas = 0;
+    final Set<String> targetTiers = tierToReplicaCount.keySet();
+    for (String tier : targetTiers) {
+      loadedTargetReplicas += 
replicantLookup.getLoadedReplicants(segment.getId(), tier);
+    }
+    if (loadedTargetReplicas < minLoadedSegments) {
+      return;
+    }
+
+    final Set<String> dropTiers = Sets.newHashSet(cluster.getTierNames());
+    dropTiers.removeAll(targetTiers);
+    for (String dropTier : dropTiers) {
+      updateReplicasOnTier(segment, dropTier, 0);
+    }
+  }
+
+  /**
+   * Broadcasts the given segment to all servers that are broadcast targets and
+   * queues a drop of the segment from decommissioning servers.
+   */
+  public void broadcastSegment(DataSegment segment)
+  {
+    int assignedCount = 0;
+    int droppedCount = 0;
+    for (ServerHolder server : cluster.getAllServers()) {
+      // Ignore servers which are not broadcast targets
+      if (!server.getServer().getType().isSegmentBroadcastTarget()) {
+        continue;
+      }
+
+      if (server.isDecommissioning()) {
+        droppedCount += dropBroadcastSegment(segment, server) ? 1 : 0;
+      } else {
+        assignedCount += loadBroadcastSegment(segment, server) ? 1 : 0;
+      }
+    }
+
+    if (assignedCount > 0) {
+      stats.addToDataSourceStat(CoordinatorStats.BROADCAST_LOADS, 
segment.getDataSource(), assignedCount);
+    }
+    if (droppedCount > 0) {
+      stats.addToDataSourceStat(CoordinatorStats.BROADCAST_DROPS, 
segment.getDataSource(), droppedCount);
+    }
+  }
+
+  /**
+   * Marks the given segment as unused.
+   */
+  public void deleteSegment(DataSegment segment)
+  {
+    stateManager.deleteSegment(segment);
+    stats.addToGlobalStat(CoordinatorStats.DELETED_SEGMENTS, 1);
+  }
+
+  /**
+   * Checks if the server can load the given segment.
+   * <p>
+   * A load is possible only if the server meets all of the following criteria:
+   * <ul>
+   *   <li>is not already serving or loading the segment</li>
+   *   <li>is not being decommissioned</li>
+   *   <li>has not already exceeded the load queue limit in this run</li>
+   *   <li>has available disk space</li>
+   * </ul>
+   */
+  public boolean canLoadSegment(ServerHolder server, DataSegment segment)
+  {
+    return server.canLoadSegment(segment)
+           && (maxLoadQueueSize == 0 || maxLoadQueueSize > 
server.getSegmentsQueuedForLoad());
+  }
+
+  /**
+   * Loads the broadcast segment if it is not loaded on the given server.
+   * Returns true only if the segment was successfully queued for load on the 
server.
+   */
+  private boolean loadBroadcastSegment(DataSegment segment, ServerHolder 
server)
+  {
+    final SegmentState state = server.getSegmentState(segment);
+    if (state == SegmentState.LOADED || state == SegmentState.LOADING) {
+      return false;
+    }
+
+    // Cancel drop if it is in progress
+    boolean dropCancelled = 
stateManager.cancelOperation(SegmentState.DROPPING, segment, server);
+    if (dropCancelled) {
+      return false;
+    }
+
+    if (canLoadSegment(server, segment)
+        && stateManager.loadSegment(segment, server, true)) {
+      return true;
+    } else {
+      log.makeAlert("Failed to broadcast segment for [%s]", 
segment.getDataSource())

Review Comment:
   Be careful with messages in this code.  "Failed to broadcast segment" is 
ambiguous about whether there was an issue with the server actually downloading 
the segment (definitely not the case given the code here) versus an issue with 
the coordinator believing that it is safe to assign the segment (much more 
likely).  We should be very explicit about what it is that is happening and, 
also, what we might expect the end user to do about it.



##########
server/src/main/java/org/apache/druid/server/coordinator/HttpLoadQueuePeon.java:
##########
@@ -65,7 +66,7 @@
 
 /**
  */
-public class HttpLoadQueuePeon extends LoadQueuePeon
+public class HttpLoadQueuePeon implements LoadQueuePeon

Review Comment:
   Fwiw, I feel like the rename of `SegmentHolder` to `QueuedSegment` is a bit 
gratuitous. It's generating a lot of changes and I'm not sure it's really that 
much more descriptive.  Sometimes renames are helpful, but this one I'm 
honestly wondering if the scales aren't tipped towards it just making it harder 
to sift out the signal from the noise in the review.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to