This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 08af98c73bf Add the capability to turboload segments onto historicals 
(#17775)
08af98c73bf is described below

commit 08af98c73bf7f35e4c7e2b136a63158d11b02348
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Mon Mar 24 20:46:44 2025 +0530

    Add the capability to turboload segments onto historicals (#17775)
    
    Add the capability to set Historicals into a turbo loading mode,
    to focus on loading segments at the cost of query performance.
    
    Context
    --------
    Currently, when a new Historical is started, it initially starts out using 
a bootstrap thread pool.
    It uses this thread pool to load any existing cached segments and broadcast 
segments.
    Once it loads any segments from both these sources, the historical switches 
to a smaller thread-pool
    and begins to serve queries.
    
    In certain cases, it would be useful to have the historical switch back to 
this mode,
    and focus on loading segments, either to continue loading the initial 
non-bootstrap segments,
    or to catch up with assigned segments.
    
    This PR adds a coordinator dynamic config that allows servers to be 
configured to use
    the larger bootstrap threadpool to load segments faster.
    
    Changes
    ---------
    - Added a new dynamic coordinator configuration, `turboLoadingNodes`.
    - Ignore  `druid.coordinator.loadqueuepeon.http.batchSize` for servers in 
`turboLoadingNodes`
    - Add API on historical to return loading capabilities i.e. num loading 
threads in normal and turbo mode
---
 docs/api-reference/dynamic-configuration-api.md    | 11 ++-
 docs/configuration/index.md                        |  3 +-
 .../coordination/DataSegmentChangeHandler.java     |  1 +
 .../coordination/SegmentLoadDropHandler.java       | 77 +++++++++++++++---
 .../coordinator/CoordinatorDynamicConfig.java      | 50 ++++++++++--
 .../config/HttpLoadQueuePeonConfig.java            | 18 ++++-
 .../coordinator/loading/HttpLoadQueuePeon.java     | 93 +++++++++++++++++++---
 .../coordinator/loading/LoadQueueTaskMaster.java   | 17 +++-
 .../druid/server/http/SegmentListerResource.java   | 31 ++++++--
 .../server/http/SegmentLoadingCapabilities.java    | 64 +++++++++++++++
 .../SegmentLoadingMode.java}                       | 13 ++-
 .../coordination/SegmentLoadDropHandlerTest.java   | 23 +++---
 .../coordinator/DruidCoordinatorConfigTest.java    |  4 +-
 .../config/HttpLoadQueuePeonConfigTest.java        | 62 +++++++++++++++
 .../coordinator/loading/HttpLoadQueuePeonTest.java | 43 +++++++++-
 .../simulate/CoordinatorSimulationBuilder.java     | 11 ++-
 .../simulate/TestSegmentLoadingHttpClient.java     | 27 +++++++
 .../server/http/CoordinatorDynamicConfigTest.java  | 78 +++++++++++++-----
 .../http/SegmentLoadingCapabilitiesTest.java       | 52 ++++++++++++
 .../java/org/apache/druid/cli/CliCoordinator.java  |  6 +-
 20 files changed, 589 insertions(+), 95 deletions(-)

diff --git a/docs/api-reference/dynamic-configuration-api.md 
b/docs/api-reference/dynamic-configuration-api.md
index 90b7028f247..971aa81d206 100644
--- a/docs/api-reference/dynamic-configuration-api.md
+++ b/docs/api-reference/dynamic-configuration-api.md
@@ -105,7 +105,8 @@ Host: http://ROUTER_IP:ROUTER_PORT
     "maxNonPrimaryReplicantsToLoad": 2147483647,
     "useRoundRobinSegmentAssignment": true,
     "smartSegmentLoading": true,
-    "debugDimensions": null
+    "debugDimensions": null,
+    "turboLoadingNodes": []
 }
 ```
 
@@ -172,7 +173,8 @@ curl 
"http://ROUTER_IP:ROUTER_PORT/druid/coordinator/v1/config"; \
   "pauseCoordination": false,
   "replicateAfterLoadTimeout": false,
   "maxNonPrimaryReplicantsToLoad": 2147483647,
-  "useRoundRobinSegmentAssignment": true
+  "useRoundRobinSegmentAssignment": true,
+  "turboLoadingNodes": []
 }'
 ```
 
@@ -203,7 +205,8 @@ Content-Length: 683
   "pauseCoordination": false,
   "replicateAfterLoadTimeout": false,
   "maxNonPrimaryReplicantsToLoad": 2147483647,
-  "useRoundRobinSegmentAssignment": true
+  "useRoundRobinSegmentAssignment": true,
+  "turboLoadingNodes": []
 }
 ```
 
@@ -289,7 +292,7 @@ Host: http://ROUTER_IP:ROUTER_PORT
             "comment": "",
             "ip": "127.0.0.1"
         },
-        "payload": 
"{\"millisToWaitBeforeDeleting\":900000,\"maxSegmentsToMove\":5,\"replicantLifetime\":15,\"replicationThrottleLimit\":10,\"balancerComputeThreads\":1,\"killDataSourceWhitelist\":[],\"killPendingSegmentsSkipList\":[],\"maxSegmentsInNodeLoadingQueue\":100,\"decommissioningNodes\":[],\"decommissioningMaxPercentOfMaxSegmentsToMove\":70,\"pauseCoordination\":false,\"replicateAfterLoadTimeout\":false,\"maxNonPrimaryReplicantsToLoad\":2147483647,\"useRoundRobinSegmentAssignme
 [...]
+        "payload": 
"{\"millisToWaitBeforeDeleting\":900000,\"maxSegmentsToMove\":5,\"replicantLifetime\":15,\"replicationThrottleLimit\":10,\"balancerComputeThreads\":1,\"killDataSourceWhitelist\":[],\"killPendingSegmentsSkipList\":[],\"maxSegmentsInNodeLoadingQueue\":100,\"decommissioningNodes\":[],\"decommissioningMaxPercentOfMaxSegmentsToMove\":70,\"pauseCoordination\":false,\"replicateAfterLoadTimeout\":false,\"maxNonPrimaryReplicantsToLoad\":2147483647,\"useRoundRobinSegmentAssignme
 [...]
         "auditTime": "2023-10-03T20:59:51.622Z"
     }
 ]
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index d75309b7ac3..687e1d688a6 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -885,7 +885,7 @@ These Coordinator static configurations can be defined in 
the `coordinator/runti
 |`druid.coordinator.kill.maxInterval`|The largest interval, as an [ISO 8601 
duration](https://en.wikipedia.org/wiki/ISO_8601#Durations), of segments to 
delete per kill task. Set to zero, e.g. `PT0S`, for unlimited. This only 
applies when `druid.coordinator.kill.on=true`.|`P30D`|
 |`druid.coordinator.balancer.strategy`|The [balancing 
strategy](../design/coordinator.md#balancing-segments-in-a-tier) used by the 
Coordinator to distribute segments among the Historical servers in a tier. The 
`cost` strategy distributes segments by minimizing a cost function, 
`diskNormalized` weights these costs with the disk usage ratios of the servers 
and `random` distributes segments randomly.|`cost`|
 |`druid.coordinator.loadqueuepeon.http.repeatDelay`|The start and repeat delay 
(in milliseconds) for the load queue peon, which manages the load/drop queue of 
segments for any server.|1 minute|
-|`druid.coordinator.loadqueuepeon.http.batchSize`|Number of segment load/drop 
requests to batch in one HTTP request. Note that it must be smaller than 
`druid.segmentCache.numLoadingThreads` config on Historical service.|1|
+|`druid.coordinator.loadqueuepeon.http.batchSize`|Number of segment load/drop 
requests to batch in one HTTP request. Note that it must be smaller than or 
equal to the `druid.segmentCache.numLoadingThreads` config on Historical 
service. If this value is not configured, the coordinator uses the value of the 
`numLoadingThreads` for the respective server. | 
`druid.segmentCache.numLoadingThreads` |
 |`druid.coordinator.asOverlord.enabled`|Boolean value for whether this 
Coordinator service should act like an Overlord as well. This configuration 
allows users to simplify a Druid cluster by not having to deploy any standalone 
Overlord services. If set to true, then Overlord console is available at 
`http://coordinator-host:port/console.html` and be sure to set 
`druid.coordinator.asOverlord.overlordService` also.|false|
 |`druid.coordinator.asOverlord.overlordService`| Required, if 
`druid.coordinator.asOverlord.enabled` is `true`. This must be same value as 
`druid.service` on standalone Overlord services and 
`druid.selectors.indexing.serviceName` on Middle Managers.|NULL|
 
@@ -953,6 +953,7 @@ The following table shows the dynamic configuration 
properties for the Coordinat
 |`decommissioningNodes`|List of Historical servers to decommission. 
Coordinator will not assign new segments to decommissioning servers, and 
segments will be moved away from them to be placed on non-decommissioning 
servers at the maximum rate specified by `maxSegmentsToMove`.|none|
 |`pauseCoordination`|Boolean flag for whether or not the Coordinator should 
execute its various duties of coordinating the cluster. Setting this to true 
essentially pauses all coordination work while allowing the API to remain up. 
Duties that are paused include all classes that implement the `CoordinatorDuty` 
interface. Such duties include: segment balancing, segment compaction, 
submitting kill tasks for unused segments (if enabled), logging of used 
segments in the cluster, marking of ne [...]
 |`replicateAfterLoadTimeout`|Boolean flag for whether or not additional 
replication is needed for segments that have failed to load due to the expiry 
of `druid.coordinator.load.timeout`. If this is set to true, the Coordinator 
will attempt to replicate the failed segment on a different historical server. 
This helps improve the segment availability if there are a few slow Historicals 
in the cluster. However, the slow Historical may still load the segment later 
and the Coordinator may issu [...]
+|`turboLoadingNodes`| List of Historical servers to place in turbo loading 
mode. These servers use a larger thread-pool to load segments faster but at the 
cost of query performance. For servers specified in `turboLoadingNodes`, 
`druid.coordinator.loadqueuepeon.http.batchSize` is ignored and the coordinator 
uses the value of the respective `numLoadingThreads` instead. |none|
 
 ##### Smart segment loading
 
diff --git 
a/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeHandler.java
 
b/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeHandler.java
index cd2a8c3740f..70758cd63ab 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeHandler.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeHandler.java
@@ -28,5 +28,6 @@ import javax.annotation.Nullable;
 public interface DataSegmentChangeHandler
 {
   void addSegment(DataSegment segment, @Nullable DataSegmentChangeCallback 
callback);
+
   void removeSegment(DataSegment segment, @Nullable DataSegmentChangeCallback 
callback);
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
 
b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
index 12462adab2f..4d3e22c1a13 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
@@ -33,6 +33,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
 import org.apache.druid.segment.loading.SegmentLoadingException;
 import org.apache.druid.server.SegmentManager;
+import org.apache.druid.server.http.SegmentLoadingMode;
 import org.apache.druid.server.metrics.SegmentRowCountDistribution;
 import org.apache.druid.timeline.DataSegment;
 
@@ -44,8 +45,11 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -63,7 +67,8 @@ public class SegmentLoadDropHandler implements 
DataSegmentChangeHandler
   private final SegmentLoaderConfig config;
   private final DataSegmentAnnouncer announcer;
   private final SegmentManager segmentManager;
-  private final ScheduledExecutorService exec;
+  private final ScheduledExecutorService normalLoadExec;
+  private final ThreadPoolExecutor turboLoadExec;
 
   private final ConcurrentSkipListSet<DataSegment> segmentsToDelete;
 
@@ -88,9 +93,19 @@ public class SegmentLoadDropHandler implements 
DataSegmentChangeHandler
         config,
         announcer,
         segmentManager,
-        Executors.newScheduledThreadPool(
+        new ScheduledThreadPoolExecutor(
             config.getNumLoadingThreads(),
-            Execs.makeThreadFactory("SimpleDataSegmentChangeHandler-%s")
+            Execs.makeThreadFactory("SegmentLoadDropHandler-normal-%s")
+        ),
+        // Create a fixed size threadpool which has a timeout of 1 minute. 
Since they are all core threads, new threads
+        // will be created without enqueing the tasks till the capacity is 
reached.
+        new ThreadPoolExecutor(
+            config.getNumBootstrapThreads(),
+            config.getNumBootstrapThreads(),
+            60L,
+            TimeUnit.SECONDS,
+            new LinkedBlockingQueue<>(),
+            Execs.makeThreadFactory("SegmentLoadDropHandler-turbo-%s")
         )
     );
   }
@@ -100,13 +115,17 @@ public class SegmentLoadDropHandler implements 
DataSegmentChangeHandler
       SegmentLoaderConfig config,
       DataSegmentAnnouncer announcer,
       SegmentManager segmentManager,
-      ScheduledExecutorService exec
+      ScheduledExecutorService normalLoadExec,
+      ThreadPoolExecutor turboLoadExec
   )
   {
     this.config = config;
     this.announcer = announcer;
     this.segmentManager = segmentManager;
-    this.exec = exec;
+    this.normalLoadExec = normalLoadExec;
+    this.turboLoadExec = turboLoadExec;
+
+    this.turboLoadExec.allowCoreThreadTimeOut(true);
 
     this.segmentsToDelete = new ConcurrentSkipListSet<>();
     requestStatuses = 
CacheBuilder.newBuilder().maximumSize(config.getStatusQueueMaxSize()).initialCapacity(8).build();
@@ -214,7 +233,7 @@ public class SegmentLoadDropHandler implements 
DataSegmentChangeHandler
             "Completely removing segment[%s] in [%,d]ms.",
             segment.getId(), config.getDropSegmentDelayMillis()
         );
-        exec.schedule(
+        normalLoadExec.schedule(
             runnable,
             config.getDropSegmentDelayMillis(),
             TimeUnit.MILLISECONDS
@@ -244,14 +263,22 @@ public class SegmentLoadDropHandler implements 
DataSegmentChangeHandler
     return ImmutableList.copyOf(segmentsToDelete);
   }
 
-  public ListenableFuture<List<DataSegmentChangeResponse>> 
processBatch(List<DataSegmentChangeRequest> changeRequests)
+  /**
+   * Process a list of {@link DataSegmentChangeRequest}, invoking
+   * {@link #processRequest(DataSegmentChangeRequest, SegmentLoadingMode)} for 
each one. Handles the computation
+   * asynchronously and returns a future to the result.
+   */
+  public ListenableFuture<List<DataSegmentChangeResponse>> processBatch(
+      List<DataSegmentChangeRequest> changeRequests,
+      SegmentLoadingMode segmentLoadingMode
+  )
   {
     boolean isAnyRequestDone = false;
 
     Map<DataSegmentChangeRequest, AtomicReference<SegmentChangeStatus>> 
statuses = Maps.newHashMapWithExpectedSize(changeRequests.size());
 
     for (DataSegmentChangeRequest cr : changeRequests) {
-      AtomicReference<SegmentChangeStatus> status = processRequest(cr);
+      AtomicReference<SegmentChangeStatus> status = processRequest(cr, 
segmentLoadingMode);
       if (status.get().getState() != SegmentChangeStatus.State.PENDING) {
         isAnyRequestDone = true;
       }
@@ -271,7 +298,15 @@ public class SegmentLoadDropHandler implements 
DataSegmentChangeHandler
     return future;
   }
 
-  private AtomicReference<SegmentChangeStatus> 
processRequest(DataSegmentChangeRequest changeRequest)
+  /**
+   * Process a {@link DataSegmentChangeRequest}, invoking the request's
+   * {@link DataSegmentChangeRequest#go(DataSegmentChangeHandler, 
DataSegmentChangeCallback)}.
+   * The segmentLoadingMode parameter determines the thread pool to use.
+   */
+  private AtomicReference<SegmentChangeStatus> processRequest(
+      DataSegmentChangeRequest changeRequest,
+      SegmentLoadingMode segmentLoadingMode
+  )
   {
     synchronized (requestStatusesLock) {
       AtomicReference<SegmentChangeStatus> status = 
requestStatuses.getIfPresent(changeRequest);
@@ -282,10 +317,13 @@ public class SegmentLoadDropHandler implements 
DataSegmentChangeHandler
             new DataSegmentChangeHandler()
             {
               @Override
-              public void addSegment(DataSegment segment, @Nullable 
DataSegmentChangeCallback callback)
+              public void addSegment(
+                  DataSegment segment,
+                  @Nullable DataSegmentChangeCallback callback
+              )
               {
                 requestStatuses.put(changeRequest, new 
AtomicReference<>(SegmentChangeStatus.PENDING));
-                exec.submit(
+                getExecutorService(segmentLoadingMode).submit(
                     () -> SegmentLoadDropHandler.this.addSegment(
                         ((SegmentChangeRequestLoad) 
changeRequest).getSegment(),
                         () -> resolveWaitingFutures()
@@ -294,7 +332,10 @@ public class SegmentLoadDropHandler implements 
DataSegmentChangeHandler
               }
 
               @Override
-              public void removeSegment(DataSegment segment, @Nullable 
DataSegmentChangeCallback callback)
+              public void removeSegment(
+                  DataSegment segment,
+                  @Nullable DataSegmentChangeCallback callback
+              )
               {
                 requestStatuses.put(changeRequest, new 
AtomicReference<>(SegmentChangeStatus.PENDING));
                 SegmentLoadDropHandler.this.removeSegment(
@@ -386,5 +427,15 @@ public class SegmentLoadDropHandler implements 
DataSegmentChangeHandler
       return true;
     }
   }
+
+  private ExecutorService getExecutorService(SegmentLoadingMode loadingMode)
+  {
+    return loadingMode == SegmentLoadingMode.TURBO ? turboLoadExec : 
normalLoadExec;
+  }
+
+  public SegmentLoaderConfig getSegmentLoaderConfig()
+  {
+    return config;
+  }
 }
 
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
index d805bad5e01..806b6ebbee1 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
@@ -23,10 +23,12 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.ImmutableSet;
+import org.apache.druid.common.config.Configs;
 import org.apache.druid.common.config.JacksonConfigManager;
 import org.apache.druid.error.InvalidInput;
 import org.apache.druid.server.coordinator.duty.KillUnusedSegments;
 import org.apache.druid.server.coordinator.stats.Dimension;
+import org.apache.druid.server.http.SegmentLoadingMode;
 import org.apache.druid.utils.JvmUtils;
 
 import javax.annotation.Nullable;
@@ -70,6 +72,8 @@ public class CoordinatorDynamicConfig
   private final Map<String, String> debugDimensions;
   private final Map<Dimension, String> validDebugDimensions;
 
+  private final Set<String> turboLoadingNodes;
+
   /**
    * Stale pending segments belonging to the data sources in this list are not 
killed by {@code
    * KillStalePendingSegments}. In other words, segments in these data sources 
are "protected".
@@ -118,7 +122,8 @@ public class CoordinatorDynamicConfig
       @JsonProperty("replicateAfterLoadTimeout") boolean 
replicateAfterLoadTimeout,
       @JsonProperty("useRoundRobinSegmentAssignment") @Nullable Boolean 
useRoundRobinSegmentAssignment,
       @JsonProperty("smartSegmentLoading") @Nullable Boolean 
smartSegmentLoading,
-      @JsonProperty("debugDimensions") @Nullable Map<String, String> 
debugDimensions
+      @JsonProperty("debugDimensions") @Nullable Map<String, String> 
debugDimensions,
+      @JsonProperty("turboLoadingNodes") @Nullable Set<String> 
turboLoadingNodes
   )
   {
     this.markSegmentAsUnusedDelayMillis =
@@ -162,6 +167,7 @@ public class CoordinatorDynamicConfig
     );
     this.debugDimensions = debugDimensions;
     this.validDebugDimensions = validateDebugDimensions(debugDimensions);
+    this.turboLoadingNodes = Configs.valueOrDefault(turboLoadingNodes, 
Set.of());
   }
 
   private Map<Dimension, String> validateDebugDimensions(Map<String, String> 
debugDimensions)
@@ -200,6 +206,13 @@ public class CoordinatorDynamicConfig
     }
   }
 
+  public SegmentLoadingMode getLoadingModeForServer(String serverName)
+  {
+    return turboLoadingNodes.contains(serverName) ?
+           SegmentLoadingMode.TURBO :
+           SegmentLoadingMode.NORMAL;
+  }
+
   @JsonProperty("millisToWaitBeforeDeleting")
   public long getMarkSegmentAsUnusedDelayMillis()
   {
@@ -308,6 +321,19 @@ public class CoordinatorDynamicConfig
     return replicateAfterLoadTimeout;
   }
 
+  /**
+   * List of servers to put in turbo-loading mode. These servers will use a 
larger thread pool to load
+   * segments. This causes decreases the average time taken to load segments. 
However, this also means less resources
+   * available to query threads which may cause a drop in query performance.
+   *
+   * @return Set of host:port entries
+   */
+  @JsonProperty
+  public Set<String> getTurboLoadingNodes()
+  {
+    return turboLoadingNodes;
+  }
+
   @Override
   public String toString()
   {
@@ -326,6 +352,7 @@ public class CoordinatorDynamicConfig
            ", decommissioningNodes=" + decommissioningNodes +
            ", pauseCoordination=" + pauseCoordination +
            ", replicateAfterLoadTimeout=" + replicateAfterLoadTimeout +
+           ", turboLoadingNodes=" + turboLoadingNodes +
            '}';
   }
 
@@ -359,6 +386,7 @@ public class CoordinatorDynamicConfig
                dataSourcesToNotKillStalePendingSegmentsIn,
                that.dataSourcesToNotKillStalePendingSegmentsIn)
            && Objects.equals(decommissioningNodes, that.decommissioningNodes)
+           && Objects.equals(turboLoadingNodes, that.turboLoadingNodes)
            && Objects.equals(debugDimensions, that.debugDimensions);
   }
 
@@ -378,7 +406,8 @@ public class CoordinatorDynamicConfig
         dataSourcesToNotKillStalePendingSegmentsIn,
         decommissioningNodes,
         pauseCoordination,
-        debugDimensions
+        debugDimensions,
+        turboLoadingNodes
     );
   }
 
@@ -430,6 +459,7 @@ public class CoordinatorDynamicConfig
     private Boolean replicateAfterLoadTimeout;
     private Boolean useRoundRobinSegmentAssignment;
     private Boolean smartSegmentLoading;
+    private Set<String> turboLoadingNodes;
 
     public Builder()
     {
@@ -452,7 +482,8 @@ public class CoordinatorDynamicConfig
         @JsonProperty("replicateAfterLoadTimeout") @Nullable Boolean 
replicateAfterLoadTimeout,
         @JsonProperty("useRoundRobinSegmentAssignment") @Nullable Boolean 
useRoundRobinSegmentAssignment,
         @JsonProperty("smartSegmentLoading") @Nullable Boolean 
smartSegmentLoading,
-        @JsonProperty("debugDimensions") @Nullable Map<String, String> 
debugDimensions
+        @JsonProperty("debugDimensions") @Nullable Map<String, String> 
debugDimensions,
+        @JsonProperty("turboLoadingNodes") @Nullable Set<String> 
turboLoadingNodes
     )
     {
       this.markSegmentAsUnusedDelayMillis = markSegmentAsUnusedDelayMillis;
@@ -471,6 +502,7 @@ public class CoordinatorDynamicConfig
       this.useRoundRobinSegmentAssignment = useRoundRobinSegmentAssignment;
       this.smartSegmentLoading = smartSegmentLoading;
       this.debugDimensions = debugDimensions;
+      this.turboLoadingNodes = turboLoadingNodes;
     }
 
     public Builder withMarkSegmentAsUnusedDelayMillis(long leadingTimeMillis)
@@ -491,6 +523,12 @@ public class CoordinatorDynamicConfig
       return this;
     }
 
+    public Builder withTurboLoadingNodes(Set<String> turboLoadingNodes)
+    {
+      this.turboLoadingNodes = turboLoadingNodes;
+      return this;
+    }
+
     public Builder withReplicantLifetime(int replicantLifetime)
     {
       this.replicantLifetime = replicantLifetime;
@@ -582,7 +620,8 @@ public class CoordinatorDynamicConfig
           valueOrDefault(replicateAfterLoadTimeout, 
Defaults.REPLICATE_AFTER_LOAD_TIMEOUT),
           valueOrDefault(useRoundRobinSegmentAssignment, 
Defaults.USE_ROUND_ROBIN_ASSIGNMENT),
           valueOrDefault(smartSegmentLoading, Defaults.SMART_SEGMENT_LOADING),
-          debugDimensions
+          debugDimensions,
+          turboLoadingNodes
       );
     }
 
@@ -612,7 +651,8 @@ public class CoordinatorDynamicConfig
           valueOrDefault(replicateAfterLoadTimeout, 
defaults.getReplicateAfterLoadTimeout()),
           valueOrDefault(useRoundRobinSegmentAssignment, 
defaults.isUseRoundRobinSegmentAssignment()),
           valueOrDefault(smartSegmentLoading, 
defaults.isSmartSegmentLoading()),
-          valueOrDefault(debugDimensions, defaults.getDebugDimensions())
+          valueOrDefault(debugDimensions, defaults.getDebugDimensions()),
+          valueOrDefault(turboLoadingNodes, defaults.getTurboLoadingNodes())
       );
     }
   }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/config/HttpLoadQueuePeonConfig.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/config/HttpLoadQueuePeonConfig.java
index f6f402037d1..42af2e948cf 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/config/HttpLoadQueuePeonConfig.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/config/HttpLoadQueuePeonConfig.java
@@ -22,8 +22,11 @@ package org.apache.druid.server.coordinator.config;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.common.config.Configs;
+import org.apache.druid.java.util.common.RE;
 import org.joda.time.Duration;
 
+import javax.annotation.Nullable;
+
 public class HttpLoadQueuePeonConfig
 {
   private static final Duration DEFAULT_LOAD_TIMEOUT = 
Duration.standardMinutes(15);
@@ -35,21 +38,28 @@ public class HttpLoadQueuePeonConfig
   private final Duration repeatDelay;
 
   @JsonProperty
-  private final int batchSize;
+  @Nullable
+  private final Integer batchSize;
 
   @JsonCreator
   public HttpLoadQueuePeonConfig(
       @JsonProperty("hostTimeout") Duration hostTimeout,
       @JsonProperty("repeatDelay") Duration repeatDelay,
-      @JsonProperty("batchSize") Integer batchSize
+      @JsonProperty("batchSize") @Nullable Integer batchSize
   )
   {
     this.hostTimeout = Configs.valueOrDefault(hostTimeout, 
Duration.standardMinutes(5));
     this.repeatDelay = Configs.valueOrDefault(repeatDelay, 
Duration.standardMinutes(1));
-    this.batchSize = Configs.valueOrDefault(batchSize, 1);
+
+    if (batchSize != null && batchSize < 1) {
+      throw new RE("Batch size must be greater than 0.");
+    }
+
+    this.batchSize = batchSize;
   }
 
-  public int getBatchSize()
+  @Nullable
+  public Integer getBatchSize()
   {
     return batchSize;
   }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java
index 015b5686769..4f5ff04911e 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java
@@ -22,9 +22,11 @@ package org.apache.druid.server.coordinator.loading;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.common.config.Configs;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.RE;
 import org.apache.druid.java.util.common.StringUtils;
@@ -45,6 +47,8 @@ import 
org.apache.druid.server.coordinator.stats.CoordinatorStat;
 import org.apache.druid.server.coordinator.stats.Dimension;
 import org.apache.druid.server.coordinator.stats.RowKey;
 import org.apache.druid.server.coordinator.stats.Stats;
+import org.apache.druid.server.http.SegmentLoadingCapabilities;
+import org.apache.druid.server.http.SegmentLoadingMode;
 import org.apache.druid.timeline.DataSegment;
 import org.jboss.netty.handler.codec.http.HttpHeaders;
 import org.jboss.netty.handler.codec.http.HttpMethod;
@@ -53,7 +57,6 @@ import org.joda.time.Duration;
 import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.core.MediaType;
 import java.io.InputStream;
-import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -69,6 +72,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 
 /**
  *
@@ -82,6 +86,7 @@ public class HttpLoadQueuePeon implements LoadQueuePeon
       new TypeReference<>() {};
 
   private static final EmittingLogger log = new 
EmittingLogger(HttpLoadQueuePeon.class);
+  private static final long DEFAULT_TIMEOUT = 10000L;
 
   private final AtomicLong queuedSize = new AtomicLong(0);
   private final AtomicReference<CoordinatorRunStats> stats = new 
AtomicReference<>(new CoordinatorRunStats());
@@ -114,19 +119,21 @@ public class HttpLoadQueuePeon implements LoadQueuePeon
 
   private final ObjectMapper jsonMapper;
   private final HttpClient httpClient;
-  private final URL changeRequestURL;
   private final String serverId;
 
   private final AtomicBoolean mainLoopInProgress = new AtomicBoolean(false);
   private final ExecutorService callBackExecutor;
+  private final Supplier<SegmentLoadingMode> loadingModeSupplier;
 
   private final ObjectWriter requestBodyWriter;
+  private final SegmentLoadingCapabilities serverCapabilities;
 
   public HttpLoadQueuePeon(
       String baseUrl,
       ObjectMapper jsonMapper,
       HttpClient httpClient,
       HttpLoadQueuePeonConfig config,
+      Supplier<SegmentLoadingMode> loadingModeSupplier,
       ScheduledExecutorService processingExecutor,
       ExecutorService callBackExecutor
   )
@@ -139,17 +146,48 @@ public class HttpLoadQueuePeon implements LoadQueuePeon
     this.callBackExecutor = callBackExecutor;
 
     this.serverId = baseUrl;
+    this.loadingModeSupplier = loadingModeSupplier;
+    this.serverCapabilities = fetchSegmentLoadingCapabilities();
+  }
+
+  @VisibleForTesting
+  SegmentLoadingCapabilities fetchSegmentLoadingCapabilities()
+  {
     try {
-      this.changeRequestURL = new URL(
-          new URL(baseUrl),
-          StringUtils.nonStrictFormat(
-              "druid-internal/v1/segments/changeRequests?timeout=%d",
-              config.getHostTimeout().getMillis()
-          )
+      final URL segmentLoadingCapabilitiesURL = new URL(
+          new URL(serverId),
+          "druid-internal/v1/segments/loadCapabilities"
+      );
+
+      BytesAccumulatingResponseHandler responseHandler = new 
BytesAccumulatingResponseHandler();
+      InputStream stream = httpClient.go(
+          new Request(HttpMethod.GET, segmentLoadingCapabilitiesURL)
+              .addHeader(HttpHeaders.Names.ACCEPT, MediaType.APPLICATION_JSON),
+          responseHandler,
+          new Duration(DEFAULT_TIMEOUT)
+      ).get();
+
+      if (HttpServletResponse.SC_NOT_FOUND == responseHandler.getStatus()) {
+        int batchSize = config.getBatchSize() == null ? 1 : 
config.getBatchSize();
+        SegmentLoadingCapabilities defaultCapabilities = new 
SegmentLoadingCapabilities(batchSize, batchSize);
+        log.warn(
+            "Historical capabilities endpoint not found at URL[%s]. Using 
default values[%s].",
+            segmentLoadingCapabilitiesURL,
+            defaultCapabilities
+        );
+        return defaultCapabilities;
+      } else if (HttpServletResponse.SC_OK != responseHandler.getStatus()) {
+        log.makeAlert("Received status[%s] when fetching loading capabilities 
from server[%s]", responseHandler.getStatus(), serverId);
+        throw new RE("Received status[%s] when fetching loading capabilities 
from server[%s]", responseHandler.getStatus(), serverId);
+      }
+
+      return jsonMapper.readValue(
+          stream,
+          SegmentLoadingCapabilities.class
       );
     }
-    catch (MalformedURLException ex) {
-      throw new RuntimeException(ex);
+    catch (Throwable th) {
+      throw new RE(th, "Received error while fetching historical capabilities 
from Server[%s].", serverId);
     }
   }
 
@@ -160,7 +198,8 @@ public class HttpLoadQueuePeon implements LoadQueuePeon
       return;
     }
 
-    final int batchSize = config.getBatchSize();
+    final SegmentLoadingMode loadingMode = loadingModeSupplier.get();
+    final int batchSize = calculateBatchSize(loadingMode);
 
     final List<DataSegmentChangeRequest> newRequests = new 
ArrayList<>(batchSize);
 
@@ -194,19 +233,28 @@ public class HttpLoadQueuePeon implements LoadQueuePeon
     if (newRequests.isEmpty()) {
       log.trace(
           "[%s]Found no load/drop requests. SegmentsToLoad[%d], 
SegmentsToDrop[%d], batchSize[%d].",
-          serverId, segmentsToLoad.size(), segmentsToDrop.size(), 
config.getBatchSize()
+          serverId, segmentsToLoad.size(), segmentsToDrop.size(), batchSize
       );
       mainLoopInProgress.set(false);
       return;
     }
 
     try {
-      log.trace("Sending [%d] load/drop requests to Server[%s].", 
newRequests.size(), serverId);
+      log.trace("Sending [%d] load/drop requests to Server[%s] in 
loadingMode[%s].", newRequests.size(), serverId, loadingMode);
       final boolean hasLoadRequests = newRequests.stream().anyMatch(r -> r 
instanceof SegmentChangeRequestLoad);
       if (hasLoadRequests && !loadingRateTracker.isLoadingBatch()) {
         loadingRateTracker.markBatchLoadingStarted();
       }
 
+      final URL changeRequestURL = new URL(
+          new URL(serverId),
+          StringUtils.nonStrictFormat(
+              
"druid-internal/v1/segments/changeRequests?timeout=%d&loadingMode=%s",
+              config.getHostTimeout().getMillis(),
+              loadingMode
+          )
+      );
+
       BytesAccumulatingResponseHandler responseHandler = new 
BytesAccumulatingResponseHandler();
       ListenableFuture<InputStream> future = httpClient.go(
           new Request(HttpMethod.POST, changeRequestURL)
@@ -314,6 +362,25 @@ public class HttpLoadQueuePeon implements LoadQueuePeon
     }
   }
 
+  /**
+   * Calculates the number of segments the server is capable of handling at a 
time. If loading segments in turbo loading
+   * mode, returns the number of turbo loading threads on the server. 
Otherwise, return the value set by the batch size
+   * runtime parameter, or number of normal threads on the server if the 
parameter is not set.
+   * Always returns a positive integer.
+   */
+  @VisibleForTesting
+  int calculateBatchSize(SegmentLoadingMode loadingMode)
+  {
+    int batchSize;
+    if (SegmentLoadingMode.TURBO.equals(loadingMode)) {
+      batchSize = serverCapabilities.getNumTurboLoadingThreads();
+    } else {
+      batchSize = Configs.valueOrDefault(config.getBatchSize(), 
serverCapabilities.getNumLoadingThreads());
+    }
+
+    return Math.max(batchSize, 1);
+  }
+
   private void handleResponseStatus(DataSegmentChangeRequest changeRequest, 
SegmentChangeStatus status)
   {
     changeRequest.go(
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueueTaskMaster.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueueTaskMaster.java
index ed6d2d5a3ea..d9fdca36c67 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueueTaskMaster.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueueTaskMaster.java
@@ -25,6 +25,7 @@ import com.google.errorprone.annotations.concurrent.GuardedBy;
 import org.apache.druid.client.ImmutableDruidServer;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
 import org.apache.druid.server.coordinator.config.HttpLoadQueuePeonConfig;
 
 import java.util.List;
@@ -34,6 +35,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
 
 /**
  * Provides LoadQueuePeons
@@ -47,6 +49,7 @@ public class LoadQueueTaskMaster
   private final ExecutorService callbackExec;
   private final HttpLoadQueuePeonConfig config;
   private final HttpClient httpClient;
+  private final Supplier<CoordinatorDynamicConfig> 
coordinatorDynamicConfigSupplier;
 
   @GuardedBy("this")
   private final AtomicBoolean isLeader = new AtomicBoolean(false);
@@ -58,7 +61,8 @@ public class LoadQueueTaskMaster
       ScheduledExecutorService peonExec,
       ExecutorService callbackExec,
       HttpLoadQueuePeonConfig config,
-      HttpClient httpClient
+      HttpClient httpClient,
+      Supplier<CoordinatorDynamicConfig> coordinatorDynamicConfigSupplier
   )
   {
     this.jsonMapper = jsonMapper;
@@ -66,11 +70,20 @@ public class LoadQueueTaskMaster
     this.callbackExec = callbackExec;
     this.config = config;
     this.httpClient = httpClient;
+    this.coordinatorDynamicConfigSupplier = coordinatorDynamicConfigSupplier;
   }
 
   private LoadQueuePeon createPeon(ImmutableDruidServer server)
   {
-    return new HttpLoadQueuePeon(server.getURL(), jsonMapper, httpClient, 
config, peonExec, callbackExec);
+    return new HttpLoadQueuePeon(
+        server.getURL(),
+        jsonMapper,
+        httpClient,
+        config,
+        () -> 
coordinatorDynamicConfigSupplier.get().getLoadingModeForServer(server.getName()),
+        peonExec,
+        callbackExec
+    );
   }
 
   public Map<String, LoadQueuePeon> getAllPeons()
diff --git 
a/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java 
b/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java
index 7c4392daf4e..48e03727199 100644
--- 
a/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java
+++ 
b/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java
@@ -31,6 +31,7 @@ import org.apache.druid.client.HttpServerInventoryView;
 import org.apache.druid.guice.annotations.Json;
 import org.apache.druid.guice.annotations.Smile;
 import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.segment.loading.SegmentLoaderConfig;
 import org.apache.druid.server.coordination.BatchDataSegmentAnnouncer;
 import org.apache.druid.server.coordination.ChangeRequestHistory;
 import org.apache.druid.server.coordination.ChangeRequestsSnapshot;
@@ -54,6 +55,7 @@ import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
 import java.io.IOException;
 import java.util.List;
 
@@ -219,10 +221,10 @@ public class SegmentListerResource
    * This endpoint is used by HttpLoadQueuePeon to assign segment load/drop 
requests batch. This endpoint makes the
    * client wait till one of the following events occur. Note that this is 
implemented using async IO so no jetty
    * threads are held while in wait.
-   *
-   * (1) Given timeout elapses.
-   * (2) Some load/drop request completed.
-   *
+   * <ol>
+   *   <li>Given timeout elapses.</li>
+   *   <li>Some load/drop request completed.</li>
+   * </ol>
    * It returns a map of "load/drop request -> SUCCESS/FAILED/PENDING status" 
for each request in the batch.
    */
   @POST
@@ -231,6 +233,7 @@ public class SegmentListerResource
   @Consumes({MediaType.APPLICATION_JSON, 
SmileMediaTypes.APPLICATION_JACKSON_SMILE})
   public void applyDataSegmentChangeRequests(
       @QueryParam("timeout") long timeout,
+      @QueryParam("loadingMode") @Nullable SegmentLoadingMode loadingMode,
       List<DataSegmentChangeRequest> changeRequestList,
       @Context final HttpServletRequest req
   ) throws IOException
@@ -252,7 +255,7 @@ public class SegmentListerResource
 
     final ResponseContext context = createContext(req.getHeader("Accept"));
     final ListenableFuture<List<DataSegmentChangeResponse>> future =
-        loadDropRequestHandler.processBatch(changeRequestList);
+        loadDropRequestHandler.processBatch(changeRequestList, loadingMode == 
null ? SegmentLoadingMode.NORMAL : loadingMode);
 
     final AsyncContext asyncContext = req.startAsync();
 
@@ -327,6 +330,24 @@ public class SegmentListerResource
     asyncContext.setTimeout(timeout);
   }
 
+  @GET
+  @Path("/loadCapabilities")
+  @Produces({MediaType.APPLICATION_JSON, 
SmileMediaTypes.APPLICATION_JACKSON_SMILE})
+  public Response getSegmentLoadingCapabilities(
+      @Context final HttpServletRequest req
+  )
+  {
+    if (loadDropRequestHandler == null) {
+      return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
+    }
+
+    SegmentLoaderConfig config = 
loadDropRequestHandler.getSegmentLoaderConfig();
+    SegmentLoadingCapabilities capabilitiesResponse =
+        new SegmentLoadingCapabilities(config.getNumLoadingThreads(), 
config.getNumBootstrapThreads());
+
+    return 
Response.status(Response.Status.OK).entity(capabilitiesResponse).build();
+  }
+
   private void sendErrorResponse(HttpServletRequest req, int code, String 
error) throws IOException
   {
     AsyncContext asyncContext = req.startAsync();
diff --git 
a/server/src/main/java/org/apache/druid/server/http/SegmentLoadingCapabilities.java
 
b/server/src/main/java/org/apache/druid/server/http/SegmentLoadingCapabilities.java
new file mode 100644
index 00000000000..9dba8af5e6b
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/server/http/SegmentLoadingCapabilities.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.http;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Contains information related to the capability of a server to load 
segments, for example the number of threads
+ * available.
+ */
+public class SegmentLoadingCapabilities
+{
+  private final int numLoadingThreads;
+  private final int numTurboLoadingThreads;
+
+  @JsonCreator
+  public SegmentLoadingCapabilities(
+      @JsonProperty("numLoadingThreads") int numLoadingThreads,
+      @JsonProperty("numTurboLoadingThreads") int numTurboLoadingThreads
+  )
+  {
+    this.numLoadingThreads = numLoadingThreads;
+    this.numTurboLoadingThreads = numTurboLoadingThreads;
+  }
+
+  @JsonProperty("numLoadingThreads")
+  public int getNumLoadingThreads()
+  {
+    return numLoadingThreads;
+  }
+
+  @JsonProperty("numTurboLoadingThreads")
+  public int getNumTurboLoadingThreads()
+  {
+    return numTurboLoadingThreads;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "SegmentLoadingCapabilities{" +
+           "numLoadingThreads=" + numLoadingThreads +
+           ", numTurboLoadingThreads=" + numTurboLoadingThreads +
+           '}';
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeHandler.java
 b/server/src/main/java/org/apache/druid/server/http/SegmentLoadingMode.java
similarity index 70%
copy from 
server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeHandler.java
copy to 
server/src/main/java/org/apache/druid/server/http/SegmentLoadingMode.java
index cd2a8c3740f..b3896a540bd 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeHandler.java
+++ b/server/src/main/java/org/apache/druid/server/http/SegmentLoadingMode.java
@@ -17,16 +17,13 @@
  * under the License.
  */
 
-package org.apache.druid.server.coordination;
-
-import org.apache.druid.timeline.DataSegment;
-
-import javax.annotation.Nullable;
+package org.apache.druid.server.http;
 
 /**
+ * Determines the threadpool used by the historical to load segments.
  */
-public interface DataSegmentChangeHandler
+public enum SegmentLoadingMode
 {
-  void addSegment(DataSegment segment, @Nullable DataSegmentChangeCallback 
callback);
-  void removeSegment(DataSegment segment, @Nullable DataSegmentChangeCallback 
callback);
+  NORMAL,
+  TURBO
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
 
b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
index cd2fe2dbd63..7a8822a60d8 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
@@ -30,6 +30,7 @@ import org.apache.druid.segment.loading.SegmentLoaderConfig;
 import org.apache.druid.segment.loading.StorageLocationConfig;
 import org.apache.druid.server.SegmentManager;
 import org.apache.druid.server.coordination.SegmentChangeStatus.State;
+import org.apache.druid.server.http.SegmentLoadingMode;
 import org.apache.druid.timeline.DataSegment;
 import org.junit.Assert;
 import org.junit.Before;
@@ -48,6 +49,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.druid.server.TestSegmentUtils.makeSegment;
@@ -106,7 +108,7 @@ public class SegmentLoadDropHandlerTest
 
     scheduledExecutorFactory = (corePoolSize, nameFormat) -> {
       // Override normal behavior by adding the runnable to a list so that you 
can make sure
-      // all the shceduled runnables are executed by explicitly calling run() 
on each item in the list
+      // all the scheduled runnables are executed by explicitly calling run() 
on each item in the list
       return new ScheduledThreadPoolExecutor(corePoolSize, 
Execs.makeThreadFactory(nameFormat))
       {
         @Override
@@ -230,7 +232,7 @@ public class SegmentLoadDropHandlerTest
         new SegmentChangeRequestDrop(segment2)
     );
 
-    ListenableFuture<List<DataSegmentChangeResponse>> future = 
handler.processBatch(batch);
+    ListenableFuture<List<DataSegmentChangeResponse>> future = 
handler.processBatch(batch, SegmentLoadingMode.NORMAL);
 
     Map<DataSegmentChangeRequest, SegmentChangeStatus> expectedStatusMap = new 
HashMap<>();
     expectedStatusMap.put(batch.get(0), SegmentChangeStatus.PENDING);
@@ -244,7 +246,7 @@ public class SegmentLoadDropHandlerTest
       runnable.run();
     }
 
-    result = handler.processBatch(ImmutableList.of(new 
SegmentChangeRequestLoad(segment1))).get();
+    result = handler.processBatch(ImmutableList.of(new 
SegmentChangeRequestLoad(segment1)), SegmentLoadingMode.TURBO).get();
     Assert.assertEquals(SegmentChangeStatus.SUCCESS, 
result.get(0).getStatus());
 
     Assert.assertEquals(ImmutableList.of(segment1), 
segmentAnnouncer.getObservedSegments());
@@ -271,7 +273,7 @@ public class SegmentLoadDropHandlerTest
     DataSegment segment1 = makeSegment("batchtest1", "1", 
Intervals.of("P1d/2011-04-01"));
     List<DataSegmentChangeRequest> batch = ImmutableList.of(new 
SegmentChangeRequestLoad(segment1));
 
-    ListenableFuture<List<DataSegmentChangeResponse>> future = 
handler.processBatch(batch);
+    ListenableFuture<List<DataSegmentChangeResponse>> future = 
handler.processBatch(batch, SegmentLoadingMode.NORMAL);
 
     for (Runnable runnable : scheduledRunnable) {
       runnable.run();
@@ -280,7 +282,7 @@ public class SegmentLoadDropHandlerTest
     Assert.assertEquals(State.FAILED, result.get(0).getStatus().getState());
     Assert.assertEquals(ImmutableList.of(), 
segmentAnnouncer.getObservedSegments());
 
-    future = handler.processBatch(batch);
+    future = handler.processBatch(batch, SegmentLoadingMode.NORMAL);
     for (Runnable runnable : scheduledRunnable) {
       runnable.run();
     }
@@ -343,7 +345,7 @@ public class SegmentLoadDropHandlerTest
     List<DataSegmentChangeRequest> batch = ImmutableList.of(new 
SegmentChangeRequestLoad(segment1));
 
     // Request 1: Load the segment
-    ListenableFuture<List<DataSegmentChangeResponse>> future = 
handler.processBatch(batch);
+    ListenableFuture<List<DataSegmentChangeResponse>> future = 
handler.processBatch(batch, SegmentLoadingMode.NORMAL);
     for (Runnable runnable : scheduledRunnable) {
       runnable.run();
     }
@@ -354,7 +356,7 @@ public class SegmentLoadDropHandlerTest
 
     // Request 2: Drop the segment
     batch = ImmutableList.of(new SegmentChangeRequestDrop(segment1));
-    future = handler.processBatch(batch);
+    future = handler.processBatch(batch, SegmentLoadingMode.NORMAL);
     for (Runnable runnable : scheduledRunnable) {
       runnable.run();
     }
@@ -372,7 +374,7 @@ public class SegmentLoadDropHandlerTest
 
     // Request 3: Reload the segment
     batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1));
-    future = handler.processBatch(batch);
+    future = handler.processBatch(batch, SegmentLoadingMode.NORMAL);
     for (Runnable runnable : scheduledRunnable) {
       runnable.run();
     }
@@ -389,7 +391,7 @@ public class SegmentLoadDropHandlerTest
 
     // Request 4: Try to reload the segment - segment is loaded and announced 
again
     batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1));
-    future = handler.processBatch(batch);
+    future = handler.processBatch(batch, SegmentLoadingMode.NORMAL);
     for (Runnable runnable : scheduledRunnable) {
       runnable.run();
     }
@@ -420,7 +422,8 @@ public class SegmentLoadDropHandlerTest
         config,
         segmentAnnouncer,
         segmentManager,
-        scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]")
+        scheduledExecutorFactory.create(5, "LoadDropHandlerTest-[%d]"),
+        (ThreadPoolExecutor) scheduledExecutorFactory.create(5, 
"TurboSegmentLoadDropHandlerTest-[%d]")
     );
   }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java
index 36cb9c8439f..d91cb62050a 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java
@@ -101,7 +101,7 @@ public class DruidCoordinatorConfigTest
     Assert.assertEquals(Duration.standardMinutes(1), config.getRepeatDelay());
     Assert.assertEquals(Duration.standardMinutes(5), config.getHostTimeout());
     Assert.assertEquals(Duration.standardMinutes(15), config.getLoadTimeout());
-    Assert.assertEquals(1, config.getBatchSize());
+    Assert.assertNull(config.getBatchSize());
   }
 
   @Test
@@ -118,7 +118,7 @@ public class DruidCoordinatorConfigTest
     Assert.assertEquals(Duration.standardMinutes(20), config.getRepeatDelay());
     Assert.assertEquals(Duration.standardMinutes(10), config.getHostTimeout());
     Assert.assertEquals(Duration.standardMinutes(15), config.getLoadTimeout());
-    Assert.assertEquals(100, config.getBatchSize());
+    Assert.assertEquals(Integer.valueOf(100), config.getBatchSize());
   }
 
   @Test
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/config/HttpLoadQueuePeonConfigTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/config/HttpLoadQueuePeonConfigTest.java
new file mode 100644
index 00000000000..4d9a2400215
--- /dev/null
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/config/HttpLoadQueuePeonConfigTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.config;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.exc.ValueInstantiationException;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.MatcherAssert;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.internal.matchers.ThrowableMessageMatcher;
+
+public class HttpLoadQueuePeonConfigTest
+{
+  @Test
+  public void testValidateBatchSize() throws JsonProcessingException
+  {
+    ObjectMapper jsonMapper = new ObjectMapper();
+
+    MatcherAssert.assertThat(
+        Assert.assertThrows(ValueInstantiationException.class, () ->
+            jsonMapper.readValue("{\"batchSize\":0}", 
HttpLoadQueuePeonConfig.class)
+        ),
+        CoreMatchers.allOf(
+            CoreMatchers.instanceOf(ValueInstantiationException.class),
+            ThrowableMessageMatcher.hasMessage(
+                CoreMatchers.containsString("Batch size must be greater than 
0.")
+            )
+        )
+    );
+
+    HttpLoadQueuePeonConfig emptyConfig = jsonMapper.readValue(
+        "{}",
+        HttpLoadQueuePeonConfig.class
+    );
+    Assert.assertNull(emptyConfig.getBatchSize());
+
+    HttpLoadQueuePeonConfig config = jsonMapper.readValue(
+        "{\"batchSize\":2}",
+        HttpLoadQueuePeonConfig.class
+    );
+    Assert.assertEquals(2, config.getBatchSize().intValue());
+  }
+}
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java
index 1dcf384d2e0..e511ce77ee2 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java
@@ -37,6 +37,8 @@ import org.apache.druid.server.coordinator.CreateDataSegments;
 import org.apache.druid.server.coordinator.config.HttpLoadQueuePeonConfig;
 import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
 import 
org.apache.druid.server.coordinator.simulate.WrappingScheduledExecutorService;
+import org.apache.druid.server.http.SegmentLoadingCapabilities;
+import org.apache.druid.server.http.SegmentLoadingMode;
 import org.apache.druid.timeline.DataSegment;
 import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
@@ -81,13 +83,21 @@ public class HttpLoadQueuePeonTest
         MAPPER,
         httpClient,
         new HttpLoadQueuePeonConfig(null, null, 10),
+        () -> SegmentLoadingMode.NORMAL,
         new WrappingScheduledExecutorService(
             "HttpLoadQueuePeonTest-%s",
             httpClient.processingExecutor,
             true
         ),
         httpClient.callbackExecutor
-    );
+    )
+    {
+      @Override
+      SegmentLoadingCapabilities fetchSegmentLoadingCapabilities()
+      {
+        return new SegmentLoadingCapabilities(1, 3);
+      }
+    };
     httpLoadQueuePeon.start();
   }
 
@@ -316,6 +326,37 @@ public class HttpLoadQueuePeonTest
     );
   }
 
+  @Test
+  public void testBatchSize()
+  {
+    Assert.assertEquals(10, 
httpLoadQueuePeon.calculateBatchSize(SegmentLoadingMode.NORMAL));
+
+    // Without a batch size runtime parameter
+    httpLoadQueuePeon = new HttpLoadQueuePeon(
+        "http://dummy:4000";,
+        MAPPER,
+        httpClient,
+        new HttpLoadQueuePeonConfig(null, null, null),
+        () -> SegmentLoadingMode.NORMAL,
+        new WrappingScheduledExecutorService(
+            "HttpLoadQueuePeonTest-%s",
+            httpClient.processingExecutor,
+            true
+        ),
+        httpClient.callbackExecutor
+    )
+    {
+      @Override
+      SegmentLoadingCapabilities fetchSegmentLoadingCapabilities()
+      {
+        return new SegmentLoadingCapabilities(1, 3);
+      }
+    };
+
+    Assert.assertEquals(1, 
httpLoadQueuePeon.calculateBatchSize(SegmentLoadingMode.NORMAL));
+    Assert.assertEquals(3, 
httpLoadQueuePeon.calculateBatchSize(SegmentLoadingMode.TURBO));
+  }
+
   private LoadPeonCallback markSegmentProcessed(DataSegment segment)
   {
     return success -> httpClient.processedSegments.add(segment);
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
index ddc360aa012..938b6b09b1a 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
@@ -454,19 +454,22 @@ public class CoordinatorSimulationBuilder
           createBalancerStrategy(balancerStrategy),
           new HttpLoadQueuePeonConfig(null, null, null)
       );
+
+      JacksonConfigManager jacksonConfigManager = mockConfigManager();
+      setDynamicConfig(dynamicConfig);
+
       this.loadQueueTaskMaster = new LoadQueueTaskMaster(
           OBJECT_MAPPER,
           executorFactory.create(1, ExecutorFactory.LOAD_QUEUE_EXECUTOR),
           executorFactory.create(1, ExecutorFactory.LOAD_CALLBACK_EXECUTOR),
           coordinatorConfig.getHttpLoadQueuePeonConfig(),
-          httpClient
+          httpClient,
+          () -> dynamicConfig
       );
+
       this.loadQueueManager =
           new SegmentLoadQueueManager(coordinatorInventoryView, 
loadQueueTaskMaster);
 
-      JacksonConfigManager jacksonConfigManager = mockConfigManager();
-      setDynamicConfig(dynamicConfig);
-
       this.lookupCoordinatorManager = 
EasyMock.createNiceMock(LookupCoordinatorManager.class);
       mocks.add(jacksonConfigManager);
       mocks.add(lookupCoordinatorManager);
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java
index 5caa90d8dfd..877265d85b4 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningScheduledExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
 import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.java.util.http.client.Request;
 import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
@@ -32,6 +33,7 @@ import 
org.apache.druid.server.coordination.DataSegmentChangeHandler;
 import org.apache.druid.server.coordination.DataSegmentChangeRequest;
 import org.apache.druid.server.coordination.DataSegmentChangeResponse;
 import org.apache.druid.server.coordination.SegmentChangeStatus;
+import org.apache.druid.server.http.SegmentLoadingCapabilities;
 import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
 import org.jboss.netty.handler.codec.http.HttpResponse;
@@ -79,12 +81,16 @@ public class TestSegmentLoadingHttpClient implements 
HttpClient
   }
 
   @Override
+  @SuppressWarnings("unchecked")
   public <Intermediate, Final> ListenableFuture<Final> go(
       Request request,
       HttpResponseHandler<Intermediate, Final> handler,
       Duration readTimeout
   )
   {
+    if (request.getUrl().toString().contains("/loadCapabilities")) {
+      return getCapabilities(handler);
+    }
     return executorService.submit(() -> processRequest(request, handler));
   }
 
@@ -143,6 +149,27 @@ public class TestSegmentLoadingHttpClient implements 
HttpClient
         .collect(Collectors.toList());
   }
 
+  private <Intermediate, Final> ListenableFuture<Final> 
getCapabilities(HttpResponseHandler<Intermediate, Final> handler)
+  {
+    try {
+      // Set response content and status
+      final HttpResponse response = new 
DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+      response.setContent(ChannelBuffers.EMPTY_BUFFER);
+      handler.handleResponse(response, NOOP_TRAFFIC_COP);
+
+      // Serialize
+      SettableFuture future = SettableFuture.create();
+      try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+        objectMapper.writeValue(baos, new SegmentLoadingCapabilities(1, 1));
+        future.set(new ByteArrayInputStream(baos.toByteArray()));
+      }
+      return future;
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   /**
    * Processes each DataSegmentChangeRequest using the handler.
    */
diff --git 
a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
 
b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
index eb7fb199287..1be987ddc27 100644
--- 
a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
@@ -51,7 +51,8 @@ public class CoordinatorDynamicConfigTest
                      + "  \"maxSegmentsInNodeLoadingQueue\": 1,\n"
                      + "  \"decommissioningNodes\": [\"host1\", \"host2\"],\n"
                      + "  \"pauseCoordination\": false,\n"
-                     + "  \"replicateAfterLoadTimeout\": false\n"
+                     + "  \"replicateAfterLoadTimeout\": false,\n"
+                     + "  \"turboLoadingNodes\":[\"host1\", \"host3\"]\n"
                      + "}\n";
 
     CoordinatorDynamicConfig actual = mapper.readValue(
@@ -65,6 +66,7 @@ public class CoordinatorDynamicConfigTest
     );
     ImmutableSet<String> decommissioning = ImmutableSet.of("host1", "host2");
     ImmutableSet<String> whitelist = ImmutableSet.of("test1", "test2");
+    ImmutableSet<String> turboLoadingNodes = ImmutableSet.of("host1", "host3");
     assertConfig(
         actual,
         1,
@@ -78,7 +80,8 @@ public class CoordinatorDynamicConfigTest
         1,
         decommissioning,
         false,
-        false
+        false,
+        turboLoadingNodes
     );
 
     actual = 
CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual);
@@ -95,7 +98,8 @@ public class CoordinatorDynamicConfigTest
         1,
         ImmutableSet.of("host1"),
         false,
-        false
+        false,
+        turboLoadingNodes
     );
 
     actual = CoordinatorDynamicConfig.builder().build(actual);
@@ -112,7 +116,8 @@ public class CoordinatorDynamicConfigTest
         1,
         ImmutableSet.of("host1"),
         false,
-        false
+        false,
+        turboLoadingNodes
     );
 
     actual = 
CoordinatorDynamicConfig.builder().withPauseCoordination(true).build(actual);
@@ -129,7 +134,8 @@ public class CoordinatorDynamicConfigTest
         1,
         ImmutableSet.of("host1"),
         true,
-        false
+        false,
+        turboLoadingNodes
     );
 
     actual = 
CoordinatorDynamicConfig.builder().withReplicateAfterLoadTimeout(true).build(actual);
@@ -146,7 +152,8 @@ public class CoordinatorDynamicConfigTest
         1,
         ImmutableSet.of("host1"),
         true,
-        true
+        true,
+        turboLoadingNodes
     );
 
     actual = CoordinatorDynamicConfig.builder().build(actual);
@@ -163,7 +170,8 @@ public class CoordinatorDynamicConfigTest
         1,
         ImmutableSet.of("host1"),
         true,
-        true
+        true,
+        turboLoadingNodes
     );
 
     actual = 
CoordinatorDynamicConfig.builder().withKillTaskSlotRatio(0.1).build(actual);
@@ -180,7 +188,8 @@ public class CoordinatorDynamicConfigTest
         1,
         ImmutableSet.of("host1"),
         true,
-        true
+        true,
+        turboLoadingNodes
     );
 
     actual = 
CoordinatorDynamicConfig.builder().withMaxKillTaskSlots(5).build(actual);
@@ -197,7 +206,8 @@ public class CoordinatorDynamicConfigTest
         1,
         ImmutableSet.of("host1"),
         true,
-        true
+        true,
+        turboLoadingNodes
     );
   }
 
@@ -233,7 +243,8 @@ public class CoordinatorDynamicConfigTest
         true,
         false,
         false,
-        null
+        null,
+        ImmutableSet.of("host1")
     );
     
Assert.assertTrue(config.getSpecificDataSourcesToKillUnusedSegmentsIn().isEmpty());
   }
@@ -257,7 +268,8 @@ public class CoordinatorDynamicConfigTest
         true,
         false,
         false,
-        null
+        null,
+        ImmutableSet.of("host1")
     );
     Assert.assertEquals(ImmutableSet.of("test1"), 
config.getSpecificDataSourcesToKillUnusedSegmentsIn());
   }
@@ -272,7 +284,8 @@ public class CoordinatorDynamicConfigTest
                      + "  \"replicationThrottleLimit\": 1,\n"
                      + "  \"balancerComputeThreads\": 2, \n"
                      + "  \"killDataSourceWhitelist\": 
[\"test1\",\"test2\"],\n"
-                     + "  \"maxSegmentsInNodeLoadingQueue\": 1\n"
+                     + "  \"maxSegmentsInNodeLoadingQueue\": 1,\n"
+                     + "  \"turboLoadingNodes\": [\"host3\",\"host4\"]\n"
                      + "}\n";
 
     CoordinatorDynamicConfig actual = mapper.readValue(
@@ -286,6 +299,7 @@ public class CoordinatorDynamicConfigTest
     );
     ImmutableSet<String> decommissioning = ImmutableSet.of();
     ImmutableSet<String> whitelist = ImmutableSet.of("test1", "test2");
+    ImmutableSet<String> turboLoading = ImmutableSet.of("host3", "host4");
     assertConfig(
         actual,
         1,
@@ -299,7 +313,8 @@ public class CoordinatorDynamicConfigTest
         1,
         decommissioning,
         false,
-        false
+        false,
+        turboLoading
     );
 
     actual = 
CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual);
@@ -316,7 +331,8 @@ public class CoordinatorDynamicConfigTest
         1,
         ImmutableSet.of("host1"),
         false,
-        false
+        false,
+        turboLoading
     );
 
     actual = CoordinatorDynamicConfig.builder().build(actual);
@@ -333,7 +349,8 @@ public class CoordinatorDynamicConfigTest
         1,
         ImmutableSet.of("host1"),
         false,
-        false
+        false,
+        turboLoading
     );
   }
 
@@ -372,7 +389,8 @@ public class CoordinatorDynamicConfigTest
         1,
         ImmutableSet.of(),
         false,
-        false
+        false,
+        ImmutableSet.of()
     );
   }
 
@@ -411,7 +429,8 @@ public class CoordinatorDynamicConfigTest
         1,
         decommissioning,
         false,
-        false
+        false,
+        ImmutableSet.of()
     );
   }
 
@@ -446,7 +465,8 @@ public class CoordinatorDynamicConfigTest
         EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE,
         ImmutableSet.of(),
         false,
-        false
+        false,
+        ImmutableSet.of()
     );
   }
 
@@ -468,7 +488,8 @@ public class CoordinatorDynamicConfigTest
         EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE,
         emptyList,
         false,
-        false
+        false,
+        ImmutableSet.of()
     );
   }
 
@@ -493,7 +514,8 @@ public class CoordinatorDynamicConfigTest
         EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE,
         ImmutableSet.of(),
         false,
-        false
+        false,
+        ImmutableSet.of()
     );
   }
 
@@ -511,6 +533,18 @@ public class CoordinatorDynamicConfigTest
     );
   }
 
+  @Test
+  public void testTurboLoadingNodes()
+  {
+    CoordinatorDynamicConfig config = CoordinatorDynamicConfig
+        .builder()
+        .withTurboLoadingNodes(ImmutableSet.of("localhost:8083"))
+        .build();
+
+    Assert.assertEquals(SegmentLoadingMode.NORMAL, 
config.getLoadingModeForServer("localhost:8082"));
+    Assert.assertEquals(SegmentLoadingMode.TURBO, 
config.getLoadingModeForServer("localhost:8083"));
+  }
+
   @Test
   public void testEqualsAndHashCode()
   {
@@ -533,7 +567,8 @@ public class CoordinatorDynamicConfigTest
       int expectedMaxSegmentsInNodeLoadingQueue,
       Set<String> decommissioningNodes,
       boolean pauseCoordination,
-      boolean replicateAfterLoadTimeout
+      boolean replicateAfterLoadTimeout,
+      Set<String> turboLoadingNodes
   )
   {
     Assert.assertEquals(
@@ -554,6 +589,7 @@ public class CoordinatorDynamicConfigTest
     Assert.assertEquals(decommissioningNodes, 
config.getDecommissioningNodes());
     Assert.assertEquals(pauseCoordination, config.getPauseCoordination());
     Assert.assertEquals(replicateAfterLoadTimeout, 
config.getReplicateAfterLoadTimeout());
+    Assert.assertEquals(turboLoadingNodes, config.getTurboLoadingNodes());
   }
 
   private static int getDefaultNumBalancerThreads()
diff --git 
a/server/src/test/java/org/apache/druid/server/http/SegmentLoadingCapabilitiesTest.java
 
b/server/src/test/java/org/apache/druid/server/http/SegmentLoadingCapabilitiesTest.java
new file mode 100644
index 00000000000..0819439a0aa
--- /dev/null
+++ 
b/server/src/test/java/org/apache/druid/server/http/SegmentLoadingCapabilitiesTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.http;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SegmentLoadingCapabilitiesTest
+{
+  private final ObjectMapper jsonMapper = new DefaultObjectMapper();
+
+  @Test
+  public void testSerde() throws Exception
+  {
+    SegmentLoadingCapabilities capabilities = new 
SegmentLoadingCapabilities(1, 4);
+
+    SegmentLoadingCapabilities reread = 
jsonMapper.readValue(jsonMapper.writeValueAsString(capabilities), 
SegmentLoadingCapabilities.class);
+
+    Assert.assertEquals(capabilities.getNumLoadingThreads(), 
reread.getNumLoadingThreads());
+    Assert.assertEquals(capabilities.getNumTurboLoadingThreads(), 
reread.getNumTurboLoadingThreads());
+  }
+
+  @Test
+  public void testSerdeFromJson() throws JsonProcessingException
+  {
+    String json = "{\"numLoadingThreads\":3,\"numTurboLoadingThreads\":5}";
+    SegmentLoadingCapabilities reread = jsonMapper.readValue(json, 
SegmentLoadingCapabilities.class);
+
+    Assert.assertEquals(3, reread.getNumLoadingThreads());
+    Assert.assertEquals(5, reread.getNumTurboLoadingThreads());
+  }
+}
diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java 
b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
index b1fc5f63404..1f61d9716eb 100644
--- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
+++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
@@ -306,7 +306,8 @@ public class CliCoordinator extends ServerRunnable
               ScheduledExecutorFactory factory,
               DruidCoordinatorConfig config,
               @EscalatedGlobal HttpClient httpClient,
-              Lifecycle lifecycle
+              Lifecycle lifecycle,
+              CoordinatorConfigManager coordinatorConfigManager
           )
           {
             final ExecutorService callBackExec = 
Execs.singleThreaded("LoadQueuePeon-callbackexec--%d");
@@ -316,7 +317,8 @@ public class CliCoordinator extends ServerRunnable
                 factory.create(1, "Master-PeonExec--%d"),
                 callBackExec,
                 config.getHttpLoadQueuePeonConfig(),
-                httpClient
+                httpClient,
+                coordinatorConfigManager::getCurrentDynamicConfig
             );
           }
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to