This is an automated email from the ASF dual-hosted git repository.

maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b8e7fc  Add a flag to allow auto compaction task slot ratio to 
consider auto scaler slots (#12228)
2b8e7fc is described below

commit 2b8e7fc0b4ab76af836594d3b14e9213fa64cff0
Author: Maytas Monsereenusorn <[email protected]>
AuthorDate: Sun Feb 6 20:46:05 2022 -0800

    Add a flag to allow auto compaction task slot ratio to consider auto scaler 
slots (#12228)
    
    * add impl
    
    * fix checkstyle
    
    * add unit tests
    
    * checkstyle
    
    * add IT
    
    * fix IT
    
    * add comments
    
    * fix checkstyle
---
 ...PendingTaskBasedWorkerProvisioningStrategy.java |  15 +-
 .../overlord/autoscaling/ProvisioningStrategy.java |  15 ++
 .../indexing/overlord/http/OverlordResource.java   |  71 +++++++-
 .../overlord/http/TotalWorkerCapacityResponse.java |  62 +++++++
 .../PendingTaskBasedProvisioningStrategyTest.java  | 102 ++++++++++-
 .../overlord/http/OverlordResourceTest.java        | 196 ++++++++++++++++++++-
 .../druid/indexing/overlord/http/OverlordTest.java |   3 +-
 .../clients/CompactionResourceTestClient.java      |  19 +-
 .../duty/ITAutoCompactionLockContentionTest.java   |   2 +-
 .../coordinator/duty/ITAutoCompactionTest.java     |  31 +++-
 .../client/indexing/HttpIndexingServiceClient.java |  26 +++
 .../client/indexing/IndexingServiceClient.java     |   9 +
 .../indexing/IndexingTotalWorkerCapacityInfo.java  |  62 +++++++
 .../coordinator/CoordinatorCompactionConfig.java   |  64 ++++---
 .../server/coordinator/duty/CompactSegments.java   |  15 +-
 .../http/CoordinatorCompactionConfigsResource.java |   4 +-
 .../indexing/HttpIndexingServiceClientTest.java    |  31 ++++
 .../client/indexing/NoopIndexingServiceClient.java |   6 +
 .../coordinator/duty/CompactSegmentsTest.java      |  55 +++++-
 .../CoordinatorCompactionConfigsResourceTest.java  |   4 +
 20 files changed, 737 insertions(+), 55 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java
index 28d1bef..7d83b3d 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java
@@ -267,8 +267,7 @@ public class PendingTaskBasedWorkerProvisioningStrategy 
extends AbstractWorkerPr
                                   remoteTaskRunnerConfig,
                                   workerConfig,
                                   pendingTasks,
-                                  workers,
-                                  config.getWorkerCapacityHint()
+                                  workers
                               );
       log.debug("More workers needed: %d", moreWorkersNeeded);
 
@@ -296,8 +295,7 @@ public class PendingTaskBasedWorkerProvisioningStrategy 
extends AbstractWorkerPr
         final WorkerTaskRunnerConfig workerTaskRunnerConfig,
         final DefaultWorkerBehaviorConfig workerConfig,
         final Collection<Task> pendingTasks,
-        final Collection<ImmutableWorkerInfo> workers,
-        final int workerCapacityHint
+        final Collection<ImmutableWorkerInfo> workers
     )
     {
       final Collection<ImmutableWorkerInfo> validWorkers = Collections2.filter(
@@ -312,7 +310,7 @@ public class PendingTaskBasedWorkerProvisioningStrategy 
extends AbstractWorkerPr
       }
       WorkerSelectStrategy workerSelectStrategy = 
workerConfig.getSelectStrategy();
       int need = 0;
-      int capacity = getExpectedWorkerCapacity(workers, workerCapacityHint);
+      int capacity = getExpectedWorkerCapacity(workers);
       log.info("Expected worker capacity: %d", capacity);
 
       // Simulate assigning tasks to dummy workers using configured 
workerSelectStrategy
@@ -458,14 +456,15 @@ public class PendingTaskBasedWorkerProvisioningStrategy 
extends AbstractWorkerPr
     return currValidWorkers;
   }
 
-  private static int getExpectedWorkerCapacity(final 
Collection<ImmutableWorkerInfo> workers, final int workerCapacityHint)
+  @Override
+  public int getExpectedWorkerCapacity(final Collection<ImmutableWorkerInfo> 
workers)
   {
     int size = workers.size();
     if (size == 0) {
       // No existing workers
-      if (workerCapacityHint > 0) {
+      if (config.getWorkerCapacityHint() > 0) {
         // Return workerCapacityHint if it is set in config
-        return workerCapacityHint;
+        return config.getWorkerCapacityHint();
       } else {
         // Assume capacity per worker as 1
         return 1;
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/ProvisioningStrategy.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/ProvisioningStrategy.java
index 1f940fd..d4e3b5b 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/ProvisioningStrategy.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/ProvisioningStrategy.java
@@ -20,8 +20,12 @@
 package org.apache.druid.indexing.overlord.autoscaling;
 
 import org.apache.druid.guice.annotations.ExtensionPoint;
+import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
 import org.apache.druid.indexing.overlord.TaskRunner;
 
+import javax.annotation.Nonnull;
+import java.util.Collection;
+
 /**
  * In general, the resource management is tied to the runner.
  */
@@ -35,4 +39,15 @@ public interface ProvisioningStrategy<T extends TaskRunner>
    * @param runner The TaskRunner state holder this strategy should use during 
execution
    */
   ProvisioningService makeProvisioningService(T runner);
+
+  /**
+   * Returns the expected number of task slots available for each worker.
+   * This method can returns -1 if the provisioning strategy does not support 
getting the expected worker capacity.
+   *
+   * @return the expected number of task slots available for each worker if 
provisioning strategy support getting the expected worker capacity, otherwise -1
+   */
+  default int getExpectedWorkerCapacity(@Nonnull 
Collection<ImmutableWorkerInfo> workers)
+  {
+    return -1;
+  }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
index 0076eb0..85bb3db 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
@@ -45,6 +45,7 @@ import org.apache.druid.indexer.TaskStatusPlus;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.actions.TaskActionHolder;
 import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter;
 import org.apache.druid.indexing.overlord.TaskMaster;
 import org.apache.druid.indexing.overlord.TaskQueue;
@@ -53,8 +54,10 @@ import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
 import org.apache.druid.indexing.overlord.TaskStorageQueryAdapter;
 import org.apache.druid.indexing.overlord.WorkerTaskRunner;
 import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter;
+import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
 import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
 import org.apache.druid.indexing.overlord.http.security.TaskResourceFilter;
+import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
 import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
@@ -122,6 +125,7 @@ public class OverlordResource
   private final AuditManager auditManager;
   private final AuthorizerMapper authorizerMapper;
   private final WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter;
+  private final ProvisioningStrategy provisioningStrategy;
 
   private AtomicReference<WorkerBehaviorConfig> workerConfigRef = null;
   private static final List API_TASK_STATES = ImmutableList.of("pending", 
"waiting", "running", "complete");
@@ -135,7 +139,8 @@ public class OverlordResource
       JacksonConfigManager configManager,
       AuditManager auditManager,
       AuthorizerMapper authorizerMapper,
-      WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter
+      WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter,
+      ProvisioningStrategy provisioningStrategy
   )
   {
     this.taskMaster = taskMaster;
@@ -146,6 +151,7 @@ public class OverlordResource
     this.auditManager = auditManager;
     this.authorizerMapper = authorizerMapper;
     this.workerTaskRunnerQueryAdapter = workerTaskRunnerQueryAdapter;
+    this.provisioningStrategy = provisioningStrategy;
   }
 
   /**
@@ -422,6 +428,69 @@ public class OverlordResource
     return Response.ok(workerConfigRef.get()).build();
   }
 
+  /**
+   * Gets the total worker capacity of varies states of the cluster.
+   */
+  @GET
+  @Path("/totalWorkerCapacity")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(ConfigResourceFilter.class)
+  public Response getTotalWorkerCapacity()
+  {
+    // Calculate current cluster capacity
+    int currentCapacity;
+    Optional<TaskRunner> taskRunnerOptional = taskMaster.getTaskRunner();
+    if (!taskRunnerOptional.isPresent()) {
+      // Cannot serve call as not leader
+      return Response.status(Response.Status.SERVICE_UNAVAILABLE).build();
+    }
+    TaskRunner taskRunner = taskRunnerOptional.get();
+    Collection<ImmutableWorkerInfo> workers;
+    if (taskRunner instanceof WorkerTaskRunner) {
+      workers = ((WorkerTaskRunner) taskRunner).getWorkers();
+      currentCapacity = workers.stream().mapToInt(workerInfo -> 
workerInfo.getWorker().getCapacity()).sum();
+    } else {
+      log.debug(
+          "Cannot calculate capacity as task runner [%s] of type [%s] does not 
support listing workers",
+          taskRunner,
+          taskRunner.getClass().getName()
+      );
+      workers = ImmutableList.of();
+      currentCapacity = -1;
+    }
+
+    // Calculate maximum capacity with auto scale
+    int maximumCapacity;
+    if (workerConfigRef == null) {
+      workerConfigRef = configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, 
WorkerBehaviorConfig.class);
+    }
+    WorkerBehaviorConfig workerBehaviorConfig = workerConfigRef.get();
+    if (workerBehaviorConfig == null) {
+      // Auto scale not setup
+      log.debug("Cannot calculate maximum worker capacity as worker behavior 
config is not configured");
+      maximumCapacity = -1;
+    } else if (workerBehaviorConfig instanceof DefaultWorkerBehaviorConfig) {
+      DefaultWorkerBehaviorConfig defaultWorkerBehaviorConfig = 
(DefaultWorkerBehaviorConfig) workerBehaviorConfig;
+      if (defaultWorkerBehaviorConfig.getAutoScaler() == null) {
+        // Auto scale not setup
+        log.debug("Cannot calculate maximum worker capacity as auto scaler not 
configured");
+        maximumCapacity = -1;
+      } else {
+        int maxWorker = 
defaultWorkerBehaviorConfig.getAutoScaler().getMaxNumWorkers();
+        int expectedWorkerCapacity = 
provisioningStrategy.getExpectedWorkerCapacity(workers);
+        maximumCapacity = expectedWorkerCapacity == -1 ? -1 : maxWorker * 
expectedWorkerCapacity;
+      }
+    } else {
+      // Auto scale is not using DefaultWorkerBehaviorConfig
+      log.debug("Cannot calculate maximum worker capacity as 
WorkerBehaviorConfig [%s] of type [%s] does not support getting max capacity",
+                workerBehaviorConfig,
+                workerBehaviorConfig.getClass().getSimpleName()
+      );
+      maximumCapacity = -1;
+    }
+    return Response.ok(new TotalWorkerCapacityResponse(currentCapacity, 
maximumCapacity)).build();
+  }
+
   // default value is used for backwards compatibility
   @POST
   @Path("/worker")
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/TotalWorkerCapacityResponse.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/TotalWorkerCapacityResponse.java
new file mode 100644
index 0000000..f40a8f2
--- /dev/null
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/TotalWorkerCapacityResponse.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.http;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Should be synchronized with 
org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo
+ */
+public class TotalWorkerCapacityResponse
+{
+  /**
+   * The total worker capacity of the current state of the cluster. This can 
be -1 if
+   * it cannot be determined.
+   */
+  private final int currentClusterCapacity;
+  /**
+   * The total worker capacity of the cluster including auto scaling 
capability (scaling to max workers).
+   * This can be -1 if it cannot be determined or if auto scaling is not 
configured.
+   */
+  private final int maximumCapacityWithAutoScale;
+
+  @JsonCreator
+  public TotalWorkerCapacityResponse(
+      @JsonProperty("currentClusterCapacity") int currentClusterCapacity,
+      @JsonProperty("maximumCapacityWithAutoScale") int 
maximumCapacityWithAutoScale
+  )
+  {
+    this.currentClusterCapacity = currentClusterCapacity;
+    this.maximumCapacityWithAutoScale = maximumCapacityWithAutoScale;
+  }
+
+  @JsonProperty
+  public int getCurrentClusterCapacity()
+  {
+    return currentClusterCapacity;
+  }
+
+  @JsonProperty
+  public int getMaximumCapacityWithAutoScale()
+  {
+    return maximumCapacityWithAutoScale;
+  }
+}
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java
index 34af0ca..249f510 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java
@@ -28,6 +28,7 @@ import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.TestTasks;
 import org.apache.druid.indexing.common.task.NoopTask;
 import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
 import org.apache.druid.indexing.overlord.RemoteTaskRunner;
 import org.apache.druid.indexing.overlord.RemoteTaskRunnerWorkItem;
 import org.apache.druid.indexing.overlord.ZkWorker;
@@ -54,8 +55,10 @@ import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicReference;
@@ -66,6 +69,7 @@ public class PendingTaskBasedProvisioningStrategyTest
 {
   private AutoScaler autoScaler;
   private Task testTask;
+  private PendingTaskBasedWorkerProvisioningConfig config;
   private PendingTaskBasedWorkerProvisioningStrategy strategy;
   private AtomicReference<WorkerBehaviorConfig> workerConfig;
   private ScheduledExecutorService executorService = 
Execs.scheduledSingleThreaded("test service");
@@ -79,7 +83,7 @@ public class PendingTaskBasedProvisioningStrategyTest
 
     testTask = TestTasks.immediateSuccess("task1");
 
-    PendingTaskBasedWorkerProvisioningConfig config = new 
PendingTaskBasedWorkerProvisioningConfig()
+    config = new PendingTaskBasedWorkerProvisioningConfig()
         .setMaxScalingDuration(new Period(1000))
         .setNumEventsToTrack(10)
         .setPendingTaskTimeout(new Period(0))
@@ -109,6 +113,102 @@ public class PendingTaskBasedProvisioningStrategyTest
   }
 
   @Test
+  public void testGetExpectedWorkerCapacityWithNoWorkerAndHintIsValid()
+  {
+    int capacityHint = 10;
+    config = new PendingTaskBasedWorkerProvisioningConfig()
+        .setMaxScalingDuration(new Period(1000))
+        .setNumEventsToTrack(10)
+        .setPendingTaskTimeout(new Period(0))
+        .setWorkerVersion(MIN_VERSION)
+        .setMaxScalingStep(2)
+        .setWorkerCapacityHint(capacityHint);
+    strategy = new PendingTaskBasedWorkerProvisioningStrategy(
+        config,
+        DSuppliers.of(workerConfig),
+        new ProvisioningSchedulerConfig(),
+        new Supplier<ScheduledExecutorService>()
+        {
+          @Override
+          public ScheduledExecutorService get()
+          {
+            return executorService;
+          }
+        }
+    );
+    int expectedWorkerCapacity = 
strategy.getExpectedWorkerCapacity(ImmutableList.of());
+    Assert.assertEquals(capacityHint, expectedWorkerCapacity);
+  }
+
+  @Test
+  public void testGetExpectedWorkerCapacityWithNoWorkerAndHintIsNotValid()
+  {
+    int capacityHint = -1;
+    config = new PendingTaskBasedWorkerProvisioningConfig()
+        .setMaxScalingDuration(new Period(1000))
+        .setNumEventsToTrack(10)
+        .setPendingTaskTimeout(new Period(0))
+        .setWorkerVersion(MIN_VERSION)
+        .setMaxScalingStep(2)
+        .setWorkerCapacityHint(capacityHint);
+    strategy = new PendingTaskBasedWorkerProvisioningStrategy(
+        config,
+        DSuppliers.of(workerConfig),
+        new ProvisioningSchedulerConfig(),
+        new Supplier<ScheduledExecutorService>()
+        {
+          @Override
+          public ScheduledExecutorService get()
+          {
+            return executorService;
+          }
+        }
+    );
+    int expectedWorkerCapacity = 
strategy.getExpectedWorkerCapacity(ImmutableList.of());
+    Assert.assertEquals(1, expectedWorkerCapacity);
+  }
+
+  @Test
+  public void testGetExpectedWorkerCapacityWithSingleWorker()
+  {
+    int workerCapacity = 3;
+    Collection<ImmutableWorkerInfo> workerInfoCollection = ImmutableList.of(
+        new ImmutableWorkerInfo(
+            new Worker("http", "localhost0", "localhost0", workerCapacity, 
"v1", WorkerConfig.DEFAULT_CATEGORY), 0,
+            new HashSet<>(),
+            new HashSet<>(),
+            DateTimes.nowUtc()
+        )
+    );
+    int expectedWorkerCapacity = 
strategy.getExpectedWorkerCapacity(workerInfoCollection);
+    Assert.assertEquals(workerCapacity, expectedWorkerCapacity);
+  }
+
+  @Test
+  public void testGetExpectedWorkerCapacityWithMultipleWorker()
+  {
+    int workerOneCapacity = 3;
+    int workerTwoCapacity = 6;
+    Collection<ImmutableWorkerInfo> workerInfoCollection = ImmutableList.of(
+        new ImmutableWorkerInfo(
+            new Worker("http", "localhost0", "localhost0", workerOneCapacity, 
"v1", WorkerConfig.DEFAULT_CATEGORY), 0,
+            new HashSet<>(),
+            new HashSet<>(),
+            DateTimes.nowUtc()
+        ),
+        new ImmutableWorkerInfo(
+            new Worker("http", "localhost0", "localhost0", workerTwoCapacity + 
3, "v1", WorkerConfig.DEFAULT_CATEGORY), 0,
+            new HashSet<>(),
+            new HashSet<>(),
+            DateTimes.nowUtc()
+        )
+    );
+    int expectedWorkerCapacity = 
strategy.getExpectedWorkerCapacity(workerInfoCollection);
+    // Use capacity of the first worker in the list
+    Assert.assertEquals(workerOneCapacity, expectedWorkerCapacity);
+  }
+
+  @Test
   public void testFailIfMinWorkerIsZeroAndWorkerHintNotSet()
   {
     EmittingLogger mockLogger = EasyMock.createMock(EmittingLogger.class);
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
index b919706..638a33d 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
@@ -24,7 +24,9 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.common.config.JacksonConfigManager;
 import org.apache.druid.indexer.RunnerTaskState;
 import org.apache.druid.indexer.TaskInfo;
 import org.apache.druid.indexer.TaskLocation;
@@ -37,13 +39,21 @@ import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.task.AbstractTask;
 import org.apache.druid.indexing.common.task.NoopTask;
 import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter;
 import org.apache.druid.indexing.overlord.TaskMaster;
 import org.apache.druid.indexing.overlord.TaskQueue;
 import org.apache.druid.indexing.overlord.TaskRunner;
 import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
 import org.apache.druid.indexing.overlord.TaskStorageQueryAdapter;
+import org.apache.druid.indexing.overlord.WorkerTaskRunner;
 import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter;
+import org.apache.druid.indexing.overlord.autoscaling.AutoScaler;
+import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
+import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
+import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
+import org.apache.druid.indexing.worker.Worker;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.RE;
@@ -79,11 +89,14 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class OverlordResourceTest
 {
   private OverlordResource overlordResource;
   private TaskMaster taskMaster;
+  private JacksonConfigManager configManager;
+  private ProvisioningStrategy provisioningStrategy;
   private TaskStorageQueryAdapter taskStorageQueryAdapter;
   private IndexerMetadataStorageAdapter indexerMetadataStorageAdapter;
   private HttpServletRequest req;
@@ -97,6 +110,8 @@ public class OverlordResourceTest
   public void setUp()
   {
     taskRunner = EasyMock.createMock(TaskRunner.class);
+    configManager = EasyMock.createMock(JacksonConfigManager.class);
+    provisioningStrategy = EasyMock.createMock(ProvisioningStrategy.class);
     taskMaster = EasyMock.createStrictMock(TaskMaster.class);
     taskStorageQueryAdapter = 
EasyMock.createStrictMock(TaskStorageQueryAdapter.class);
     indexerMetadataStorageAdapter = 
EasyMock.createStrictMock(IndexerMetadataStorageAdapter.class);
@@ -145,10 +160,11 @@ public class OverlordResourceTest
         taskStorageQueryAdapter,
         indexerMetadataStorageAdapter,
         null,
-        null,
+        configManager,
         null,
         authMapper,
-        workerTaskRunnerQueryAdapter
+        workerTaskRunnerQueryAdapter,
+        provisioningStrategy
     );
   }
 
@@ -1447,6 +1463,182 @@ public class OverlordResourceTest
     Assert.assertEquals(ImmutableMap.of("error", "Worker API returns error!"), 
response.getEntity());
   }
 
+  @Test
+  public void testGetTotalWorkerCapacityNotLeader()
+  {
+    EasyMock.reset(taskMaster);
+    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(
+        Optional.absent()
+    ).anyTimes();
+    EasyMock.replay(
+        taskRunner,
+        taskMaster,
+        taskStorageQueryAdapter,
+        indexerMetadataStorageAdapter,
+        req,
+        workerTaskRunnerQueryAdapter,
+        configManager
+    );
+    final Response response = overlordResource.getTotalWorkerCapacity();
+    Assert.assertEquals(HttpResponseStatus.SERVICE_UNAVAILABLE.getCode(), 
response.getStatus());
+  }
+
+  @Test
+  public void testGetTotalWorkerCapacityWithUnknown()
+  {
+    WorkerBehaviorConfig workerBehaviorConfig = 
EasyMock.createMock(WorkerBehaviorConfig.class);
+    AtomicReference<WorkerBehaviorConfig> workerBehaviorConfigAtomicReference 
= new AtomicReference<>(workerBehaviorConfig);
+    EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, 
WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference);
+    EasyMock.replay(
+        taskRunner,
+        taskMaster,
+        taskStorageQueryAdapter,
+        indexerMetadataStorageAdapter,
+        req,
+        workerTaskRunnerQueryAdapter,
+        configManager
+    );
+    final Response response = overlordResource.getTotalWorkerCapacity();
+    Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
+    Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) 
response.getEntity()).getCurrentClusterCapacity());
+    Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) 
response.getEntity()).getMaximumCapacityWithAutoScale());
+  }
+
+  @Test
+  public void 
testGetTotalWorkerCapacityWithWorkerTaskRunnerButWorkerBehaviorConfigNotConfigured()
+  {
+    AtomicReference<WorkerBehaviorConfig> workerBehaviorConfigAtomicReference 
= new AtomicReference<>(null);
+    EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, 
WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference);
+    EasyMock.replay(
+        taskRunner,
+        taskMaster,
+        taskStorageQueryAdapter,
+        indexerMetadataStorageAdapter,
+        req,
+        workerTaskRunnerQueryAdapter,
+        configManager
+    );
+    final Response response = overlordResource.getTotalWorkerCapacity();
+    Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
+    Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) 
response.getEntity()).getCurrentClusterCapacity());
+    Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) 
response.getEntity()).getMaximumCapacityWithAutoScale());
+  }
+
+  @Test
+  public void 
testGetTotalWorkerCapacityWithWorkerTaskRunnerButAutoScaleNotConfigured()
+  {
+    DefaultWorkerBehaviorConfig workerBehaviorConfig = new 
DefaultWorkerBehaviorConfig(null, null);
+    AtomicReference<WorkerBehaviorConfig> workerBehaviorConfigAtomicReference 
= new AtomicReference<>(workerBehaviorConfig);
+    EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, 
WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference);
+    EasyMock.replay(
+        taskRunner,
+        taskMaster,
+        taskStorageQueryAdapter,
+        indexerMetadataStorageAdapter,
+        req,
+        workerTaskRunnerQueryAdapter,
+        configManager
+    );
+    final Response response = overlordResource.getTotalWorkerCapacity();
+    Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
+    Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) 
response.getEntity()).getCurrentClusterCapacity());
+    Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) 
response.getEntity()).getMaximumCapacityWithAutoScale());
+  }
+
+  @Test
+  public void 
testGetTotalWorkerCapacityWithAutoScaleConfiguredAndProvisioningStrategySupportExpectedWorkerCapacity()
+  {
+    int expectedWorkerCapacity = 3;
+    int maxNumWorkers = 2;
+    WorkerTaskRunner workerTaskRunner = 
EasyMock.createMock(WorkerTaskRunner.class);
+    Collection<ImmutableWorkerInfo> workerInfos = ImmutableList.of(
+        new ImmutableWorkerInfo(
+            new Worker(
+                "http", "testWorker", "192.0.0.1", expectedWorkerCapacity, 
"v1", WorkerConfig.DEFAULT_CATEGORY
+            ),
+            2,
+            ImmutableSet.of("grp1", "grp2"),
+            ImmutableSet.of("task1", "task2"),
+            DateTimes.of("2015-01-01T01:01:01Z")
+        )
+    );
+    EasyMock.expect(workerTaskRunner.getWorkers()).andReturn(workerInfos);
+    EasyMock.reset(taskMaster);
+    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(
+        Optional.of(workerTaskRunner)
+    ).anyTimes();
+    
EasyMock.expect(provisioningStrategy.getExpectedWorkerCapacity(workerInfos)).andReturn(expectedWorkerCapacity).anyTimes();
+    AutoScaler autoScaler = EasyMock.createMock(AutoScaler.class);
+    EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
+    EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(maxNumWorkers);
+    DefaultWorkerBehaviorConfig workerBehaviorConfig = new 
DefaultWorkerBehaviorConfig(null, autoScaler);
+    AtomicReference<WorkerBehaviorConfig> workerBehaviorConfigAtomicReference 
= new AtomicReference<>(workerBehaviorConfig);
+    EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, 
WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference);
+    EasyMock.replay(
+        workerTaskRunner,
+        autoScaler,
+        taskRunner,
+        taskMaster,
+        taskStorageQueryAdapter,
+        indexerMetadataStorageAdapter,
+        req,
+        workerTaskRunnerQueryAdapter,
+        configManager,
+        provisioningStrategy
+    );
+    final Response response = overlordResource.getTotalWorkerCapacity();
+    Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
+    Assert.assertEquals(expectedWorkerCapacity, ((TotalWorkerCapacityResponse) 
response.getEntity()).getCurrentClusterCapacity());
+    Assert.assertEquals(expectedWorkerCapacity * maxNumWorkers, 
((TotalWorkerCapacityResponse) 
response.getEntity()).getMaximumCapacityWithAutoScale());
+  }
+
+  @Test
+  public void 
testGetTotalWorkerCapacityWithAutoScaleConfiguredAndProvisioningStrategyNotSupportExpectedWorkerCapacity()
+  {
+    int invalidExpectedCapacity = -1;
+    int maxNumWorkers = 2;
+    WorkerTaskRunner workerTaskRunner = 
EasyMock.createMock(WorkerTaskRunner.class);
+    Collection<ImmutableWorkerInfo> workerInfos = ImmutableList.of(
+        new ImmutableWorkerInfo(
+            new Worker(
+                "http", "testWorker", "192.0.0.1", 3, "v1", 
WorkerConfig.DEFAULT_CATEGORY
+            ),
+            2,
+            ImmutableSet.of("grp1", "grp2"),
+            ImmutableSet.of("task1", "task2"),
+            DateTimes.of("2015-01-01T01:01:01Z")
+        )
+    );
+    EasyMock.expect(workerTaskRunner.getWorkers()).andReturn(workerInfos);
+    EasyMock.reset(taskMaster);
+    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(
+        Optional.of(workerTaskRunner)
+    ).anyTimes();
+    
EasyMock.expect(provisioningStrategy.getExpectedWorkerCapacity(workerInfos)).andReturn(invalidExpectedCapacity).anyTimes();
+    AutoScaler autoScaler = EasyMock.createMock(AutoScaler.class);
+    EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
+    EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(maxNumWorkers);
+    DefaultWorkerBehaviorConfig workerBehaviorConfig = new 
DefaultWorkerBehaviorConfig(null, autoScaler);
+    AtomicReference<WorkerBehaviorConfig> workerBehaviorConfigAtomicReference 
= new AtomicReference<>(workerBehaviorConfig);
+    EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, 
WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference);
+    EasyMock.replay(
+        workerTaskRunner,
+        autoScaler,
+        taskRunner,
+        taskMaster,
+        taskStorageQueryAdapter,
+        indexerMetadataStorageAdapter,
+        req,
+        workerTaskRunnerQueryAdapter,
+        configManager,
+        provisioningStrategy
+    );
+    final Response response = overlordResource.getTotalWorkerCapacity();
+    Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
+    
Assert.assertEquals(workerInfos.stream().findFirst().get().getWorker().getCapacity(),
 ((TotalWorkerCapacityResponse) 
response.getEntity()).getCurrentClusterCapacity());
+    Assert.assertEquals(invalidExpectedCapacity, 
((TotalWorkerCapacityResponse) 
response.getEntity()).getMaximumCapacityWithAutoScale());
+  }
+
   private void expectAuthorizationTokenCheck()
   {
     expectAuthorizationTokenCheck(Users.DRUID);
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
index 5655a13..40ed6e0 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
@@ -229,7 +229,8 @@ public class OverlordTest
         null,
         null,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
-        workerTaskRunnerQueryAdapter
+        workerTaskRunnerQueryAdapter,
+        null
     );
     Response response = overlordResource.getLeader();
     Assert.assertEquals(druidNode.getHostAndPort(), response.getEntity());
diff --git 
a/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java
 
b/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java
index 2fd1a5e..8e5ee73 100644
--- 
a/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java
+++ 
b/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java
@@ -145,12 +145,21 @@ public class CompactionResourceTestClient
     }
   }
 
-  public void updateCompactionTaskSlot(Double compactionTaskSlotRatio, Integer 
maxCompactionTaskSlots) throws Exception
+  public void updateCompactionTaskSlot(Double compactionTaskSlotRatio, Integer 
maxCompactionTaskSlots, Boolean useAutoScaleSlots) throws Exception
   {
-    String url = 
StringUtils.format("%sconfig/compaction/taskslots?ratio=%s&max=%s",
-                                    getCoordinatorURL(),
-                                    
StringUtils.urlEncode(compactionTaskSlotRatio.toString()),
-                                    
StringUtils.urlEncode(maxCompactionTaskSlots.toString()));
+    String url;
+    if (useAutoScaleSlots == null) {
+      url = StringUtils.format("%sconfig/compaction/taskslots?ratio=%s&max=%s",
+                                      getCoordinatorURL(),
+                                      
StringUtils.urlEncode(compactionTaskSlotRatio.toString()),
+                                      
StringUtils.urlEncode(maxCompactionTaskSlots.toString()));
+    } else {
+      url = 
StringUtils.format("%sconfig/compaction/taskslots?ratio=%s&max=%s&useAutoScaleSlots=%s",
+                                      getCoordinatorURL(),
+                                      
StringUtils.urlEncode(compactionTaskSlotRatio.toString()),
+                                      
StringUtils.urlEncode(maxCompactionTaskSlots.toString()),
+                                      
StringUtils.urlEncode(useAutoScaleSlots.toString()));
+    }
     StatusResponseHolder response = httpClient.go(new Request(HttpMethod.POST, 
new URL(url)), responseHandler).get();
     if (!response.getStatus().equals(HttpResponseStatus.OK)) {
       throw new ISE(
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java
index 84c1bf5..8d980d7 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java
@@ -306,7 +306,7 @@ public class ITAutoCompactionLockContentionTest extends 
AbstractKafkaIndexingSer
   {
     final DataSourceCompactionConfig compactionConfig = CompactionUtil
         .createCompactionConfig(fullDatasourceName, 
Specs.MAX_ROWS_PER_SEGMENT, Period.ZERO);
-    compactionResource.updateCompactionTaskSlot(0.5, 10);
+    compactionResource.updateCompactionTaskSlot(0.5, 10, null);
     compactionResource.submitCompactionConfig(compactionConfig);
 
     // Wait for compaction config to persist
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
index a79fe88..f9c462a 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
@@ -102,7 +102,7 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
   public void setup() throws Exception
   {
     // Set comapction slot to 5
-    updateCompactionTaskSlot(0.5, 10);
+    updateCompactionTaskSlot(0.5, 10, null);
     fullDatasourceName = "wikipedia_index_test_" + UUID.randomUUID() + 
config.getExtraDatasourceNameSuffix();
   }
 
@@ -235,7 +235,7 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
   public void testAutoCompactionDutyCanUpdateTaskSlots() throws Exception
   {
     // Set compactionTaskSlotRatio to 0 to prevent any compaction
-    updateCompactionTaskSlot(0, 0);
+    updateCompactionTaskSlot(0, 0, null);
     loadData(INDEX_TASK);
     try (final Closeable ignored = unloader(fullDatasourceName)) {
       final List<String> intervalsBeforeCompaction = 
coordinator.getSegmentIntervals(fullDatasourceName);
@@ -252,7 +252,7 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
       checkCompactionIntervals(intervalsBeforeCompaction);
       
Assert.assertNull(compactionResource.getCompactionStatus(fullDatasourceName));
       // Update compaction slots to be 1
-      updateCompactionTaskSlot(1, 1);
+      updateCompactionTaskSlot(1, 1, null);
       // One day compacted (1 new segment) and one day remains uncompacted. (3 
total)
       forceTriggerAutoCompaction(3);
       verifyQuery(INDEX_QUERIES_RESOURCE);
@@ -898,6 +898,24 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
     }
   }
 
+  @Test
+  public void testUpdateCompactionTaskSlotWithUseAutoScaleSlots() throws 
Exception
+  {
+    // First try update without useAutoScaleSlots
+    updateCompactionTaskSlot(3, 5, null);
+    CoordinatorCompactionConfig coordinatorCompactionConfig = 
compactionResource.getCoordinatorCompactionConfigs();
+    // Should be default value which is false
+    Assert.assertFalse(coordinatorCompactionConfig.isUseAutoScaleSlots());
+    // Now try update from default value to useAutoScaleSlots=true
+    updateCompactionTaskSlot(3, 5, true);
+    coordinatorCompactionConfig = 
compactionResource.getCoordinatorCompactionConfigs();
+    Assert.assertTrue(coordinatorCompactionConfig.isUseAutoScaleSlots());
+    // Now try update from useAutoScaleSlots=true to useAutoScaleSlots=false
+    updateCompactionTaskSlot(3, 5, false);
+    coordinatorCompactionConfig = 
compactionResource.getCoordinatorCompactionConfigs();
+    Assert.assertFalse(coordinatorCompactionConfig.isUseAutoScaleSlots());
+  }
+
   private void loadData(String indexTask) throws Exception
   {
     loadData(indexTask, ImmutableMap.of());
@@ -1124,13 +1142,16 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
     }
   }
 
-  private void updateCompactionTaskSlot(double compactionTaskSlotRatio, int 
maxCompactionTaskSlots) throws Exception
+  private void updateCompactionTaskSlot(double compactionTaskSlotRatio, int 
maxCompactionTaskSlots, Boolean useAutoScaleSlots) throws Exception
   {
-    compactionResource.updateCompactionTaskSlot(compactionTaskSlotRatio, 
maxCompactionTaskSlots);
+    compactionResource.updateCompactionTaskSlot(compactionTaskSlotRatio, 
maxCompactionTaskSlots, useAutoScaleSlots);
     // Verify that the compaction config is updated correctly.
     CoordinatorCompactionConfig coordinatorCompactionConfig = 
compactionResource.getCoordinatorCompactionConfigs();
     
Assert.assertEquals(coordinatorCompactionConfig.getCompactionTaskSlotRatio(), 
compactionTaskSlotRatio);
     
Assert.assertEquals(coordinatorCompactionConfig.getMaxCompactionTaskSlots(), 
maxCompactionTaskSlots);
+    if (useAutoScaleSlots != null) {
+      Assert.assertEquals(coordinatorCompactionConfig.isUseAutoScaleSlots(), 
useAutoScaleSlots.booleanValue());
+    }
   }
 
   private void getAndAssertCompactionStatus(
diff --git 
a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
 
b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
index ee21ad0..b63f3f0 100644
--- 
a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
+++ 
b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
@@ -221,6 +221,32 @@ public class HttpIndexingServiceClient implements 
IndexingServiceClient
   }
 
   @Override
+  public int getTotalWorkerCapacityWithAutoScale()
+  {
+    try {
+      final StringFullResponseHolder response = druidLeaderClient.go(
+          druidLeaderClient.makeRequest(HttpMethod.GET, 
"/druid/indexer/v1/totalWorkerCapacity")
+                           .setHeader("Content-Type", 
MediaType.APPLICATION_JSON)
+      );
+      if (!response.getStatus().equals(HttpResponseStatus.OK)) {
+        throw new ISE(
+            "Error while getting total worker capacity. status[%s] 
content[%s]",
+            response.getStatus(),
+            response.getContent()
+        );
+      }
+      final IndexingTotalWorkerCapacityInfo indexingTotalWorkerCapacityInfo = 
jsonMapper.readValue(
+          response.getContent(),
+          new TypeReference<IndexingTotalWorkerCapacityInfo>() {}
+      );
+      return indexingTotalWorkerCapacityInfo.getMaximumCapacityWithAutoScale();
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
   public List<TaskStatusPlus> getActiveTasks()
   {
     // Must retrieve waiting, then pending, then running, so if tasks move 
from one state to the next between
diff --git 
a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
 
b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
index 7429c55..000f132 100644
--- 
a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
+++ 
b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
@@ -49,8 +49,17 @@ public interface IndexingServiceClient
       @Nullable Map<String, Object> context
   );
 
+  /**
+   * Gets the total worker capacity of the current state of the cluster. This 
can be -1 if it cannot be determined.
+   */
   int getTotalWorkerCapacity();
 
+  /**
+   * Gets the total worker capacity of the cluster including auto scaling 
capability (scaling to max workers).
+   * This can be -1 if it cannot be determined or if auto scaling is not 
configured.
+   */
+  int getTotalWorkerCapacityWithAutoScale();
+
   String runTask(String taskId, Object taskObject);
 
   String cancelTask(String taskId);
diff --git 
a/server/src/main/java/org/apache/druid/client/indexing/IndexingTotalWorkerCapacityInfo.java
 
b/server/src/main/java/org/apache/druid/client/indexing/IndexingTotalWorkerCapacityInfo.java
new file mode 100644
index 0000000..bd3194d
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/client/indexing/IndexingTotalWorkerCapacityInfo.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client.indexing;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Should be synchronized with 
org.apache.druid.indexing.overlord.http.TotalWorkerCapacityResponse
+ */
+public class IndexingTotalWorkerCapacityInfo
+{
+  /**
+   * The total worker capacity of the current state of the cluster. This can 
be -1 if
+   * it cannot be determined.
+   */
+  private final int currentClusterCapacity;
+  /**
+   * The total worker capacity of the cluster including auto scaling 
capability (scaling to max workers).
+   * This can be -1 if it cannot be determined or if auto scaling is not 
configured.
+   */
+  private final int maximumCapacityWithAutoScale;
+
+  @JsonCreator
+  public IndexingTotalWorkerCapacityInfo(
+      @JsonProperty("currentClusterCapacity") int currentClusterCapacity,
+      @JsonProperty("maximumCapacityWithAutoScale") int 
maximumCapacityWithAutoScale
+  )
+  {
+    this.currentClusterCapacity = currentClusterCapacity;
+    this.maximumCapacityWithAutoScale = maximumCapacityWithAutoScale;
+  }
+
+  @JsonProperty
+  public int getCurrentClusterCapacity()
+  {
+    return currentClusterCapacity;
+  }
+
+  @JsonProperty
+  public int getMaximumCapacityWithAutoScale()
+  {
+    return maximumCapacityWithAutoScale;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java
index 409a813..1c4ab4e 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java
@@ -39,10 +39,12 @@ public class CoordinatorCompactionConfig
 
   private static final double DEFAULT_COMPACTION_TASK_RATIO = 0.1;
   private static final int DEFAILT_MAX_COMPACTION_TASK_SLOTS = 
Integer.MAX_VALUE;
+  private static final boolean DEFAULT_USE_AUTO_SCALE_SLOTS = false;
 
   private final List<DataSourceCompactionConfig> compactionConfigs;
   private final double compactionTaskSlotRatio;
   private final int maxCompactionTaskSlots;
+  private final boolean useAutoScaleSlots;
 
   public static CoordinatorCompactionConfig from(
       CoordinatorCompactionConfig baseConfig,
@@ -52,31 +54,34 @@ public class CoordinatorCompactionConfig
     return new CoordinatorCompactionConfig(
         compactionConfigs,
         baseConfig.compactionTaskSlotRatio,
-        baseConfig.maxCompactionTaskSlots
+        baseConfig.maxCompactionTaskSlots,
+        baseConfig.useAutoScaleSlots
     );
   }
 
   public static CoordinatorCompactionConfig from(
       CoordinatorCompactionConfig baseConfig,
       @Nullable Double compactionTaskSlotRatio,
-      @Nullable Integer maxCompactionTaskSlots
+      @Nullable Integer maxCompactionTaskSlots,
+      @Nullable Boolean useAutoScaleSlots
   )
   {
     return new CoordinatorCompactionConfig(
         baseConfig.compactionConfigs,
         compactionTaskSlotRatio == null ? baseConfig.compactionTaskSlotRatio : 
compactionTaskSlotRatio,
-        maxCompactionTaskSlots == null ? baseConfig.maxCompactionTaskSlots : 
maxCompactionTaskSlots
+        maxCompactionTaskSlots == null ? baseConfig.maxCompactionTaskSlots : 
maxCompactionTaskSlots,
+        useAutoScaleSlots == null ? baseConfig.useAutoScaleSlots : 
useAutoScaleSlots
     );
   }
 
   public static CoordinatorCompactionConfig 
from(List<DataSourceCompactionConfig> compactionConfigs)
   {
-    return new CoordinatorCompactionConfig(compactionConfigs, null, null);
+    return new CoordinatorCompactionConfig(compactionConfigs, null, null, 
null);
   }
 
   public static CoordinatorCompactionConfig empty()
   {
-    return new CoordinatorCompactionConfig(ImmutableList.of(), null, null);
+    return new CoordinatorCompactionConfig(ImmutableList.of(), null, null, 
null);
   }
 
   public static AtomicReference<CoordinatorCompactionConfig> watch(final 
JacksonConfigManager configManager)
@@ -113,7 +118,8 @@ public class CoordinatorCompactionConfig
   public CoordinatorCompactionConfig(
       @JsonProperty("compactionConfigs") List<DataSourceCompactionConfig> 
compactionConfigs,
       @JsonProperty("compactionTaskSlotRatio") @Nullable Double 
compactionTaskSlotRatio,
-      @JsonProperty("maxCompactionTaskSlots") @Nullable Integer 
maxCompactionTaskSlots
+      @JsonProperty("maxCompactionTaskSlots") @Nullable Integer 
maxCompactionTaskSlots,
+      @JsonProperty("useAutoScaleSlots") @Nullable Boolean useAutoScaleSlots
   )
   {
     this.compactionConfigs = compactionConfigs;
@@ -123,6 +129,9 @@ public class CoordinatorCompactionConfig
     this.maxCompactionTaskSlots = maxCompactionTaskSlots == null ?
                                   DEFAILT_MAX_COMPACTION_TASK_SLOTS :
                                   maxCompactionTaskSlots;
+    this.useAutoScaleSlots = useAutoScaleSlots == null ?
+                             DEFAULT_USE_AUTO_SCALE_SLOTS :
+                             useAutoScaleSlots;
   }
 
   @JsonProperty
@@ -143,20 +152,10 @@ public class CoordinatorCompactionConfig
     return maxCompactionTaskSlots;
   }
 
-  @Override
-  public String toString()
-  {
-    return "CoordinatorCompactionConfig{" +
-           ", compactionConfigs=" + compactionConfigs +
-           ", compactionTaskSlotRatio=" + compactionTaskSlotRatio +
-           ", maxCompactionTaskSlots=" + maxCompactionTaskSlots +
-           '}';
-  }
-
-  @Override
-  public int hashCode()
+  @JsonProperty
+  public boolean isUseAutoScaleSlots()
   {
-    return Objects.hash(compactionConfigs, compactionTaskSlotRatio, 
maxCompactionTaskSlots);
+    return useAutoScaleSlots;
   }
 
   @Override
@@ -168,16 +167,27 @@ public class CoordinatorCompactionConfig
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
-
     CoordinatorCompactionConfig that = (CoordinatorCompactionConfig) o;
+    return Double.compare(that.compactionTaskSlotRatio, 
compactionTaskSlotRatio) == 0 &&
+           maxCompactionTaskSlots == that.maxCompactionTaskSlots &&
+           useAutoScaleSlots == that.useAutoScaleSlots &&
+           Objects.equals(compactionConfigs, that.compactionConfigs);
+  }
 
-    if (!Objects.equals(compactionConfigs, that.compactionConfigs)) {
-      return false;
-    }
-    if (compactionTaskSlotRatio != that.compactionTaskSlotRatio) {
-      return false;
-    }
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(compactionConfigs, compactionTaskSlotRatio, 
maxCompactionTaskSlots, useAutoScaleSlots);
+  }
 
-    return maxCompactionTaskSlots == that.maxCompactionTaskSlots;
+  @Override
+  public String toString()
+  {
+    return "CoordinatorCompactionConfig{" +
+           "compactionConfigs=" + compactionConfigs +
+           ", compactionTaskSlotRatio=" + compactionTaskSlotRatio +
+           ", maxCompactionTaskSlots=" + maxCompactionTaskSlots +
+           ", useAutoScaleSlots=" + useAutoScaleSlots +
+           '}';
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index ad6566f..6598a6e 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -174,8 +174,21 @@ public class CompactSegments implements CoordinatorDuty
         final CompactionSegmentIterator iterator =
             policy.reset(compactionConfigs, dataSources, 
intervalsToSkipCompaction);
 
+        int totalCapacity;
+        if (dynamicConfig.isUseAutoScaleSlots()) {
+          try {
+            totalCapacity = 
indexingServiceClient.getTotalWorkerCapacityWithAutoScale();
+          }
+          catch (Exception e) {
+            LOG.warn("Failed to get total worker capacity with auto scale 
slots. Falling back to current capacity count");
+            totalCapacity = indexingServiceClient.getTotalWorkerCapacity();
+          }
+        } else {
+          totalCapacity = indexingServiceClient.getTotalWorkerCapacity();
+        }
+
         final int compactionTaskCapacity = (int) Math.min(
-            indexingServiceClient.getTotalWorkerCapacity() * 
dynamicConfig.getCompactionTaskSlotRatio(),
+            totalCapacity * dynamicConfig.getCompactionTaskSlotRatio(),
             dynamicConfig.getMaxCompactionTaskSlots()
         );
         final int numAvailableCompactionTaskSlots;
diff --git 
a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java
 
b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java
index cb17c45..e791b6a 100644
--- 
a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java
+++ 
b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java
@@ -93,6 +93,7 @@ public class CoordinatorCompactionConfigsResource
   public Response setCompactionTaskLimit(
       @QueryParam("ratio") Double compactionTaskSlotRatio,
       @QueryParam("max") Integer maxCompactionTaskSlots,
+      @QueryParam("useAutoScaleSlots") Boolean useAutoScaleSlots,
       @HeaderParam(AuditManager.X_DRUID_AUTHOR) @DefaultValue("") final String 
author,
       @HeaderParam(AuditManager.X_DRUID_COMMENT) @DefaultValue("") final 
String comment,
       @Context HttpServletRequest req
@@ -104,7 +105,8 @@ public class CoordinatorCompactionConfigsResource
       final CoordinatorCompactionConfig newCompactionConfig = 
CoordinatorCompactionConfig.from(
           current,
           compactionTaskSlotRatio,
-          maxCompactionTaskSlots
+          maxCompactionTaskSlots,
+          useAutoScaleSlots
       );
 
       return manager.set(
diff --git 
a/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java
 
b/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java
index 0886e12..445d129 100644
--- 
a/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java
+++ 
b/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java
@@ -342,5 +342,36 @@ public class HttpIndexingServiceClientTest
     ClientCompactionTaskQuery taskQuery = (ClientCompactionTaskQuery) 
captureTask.getValue();
     
Assert.assertNull(taskQuery.getIoConfig().getInputSpec().getSha256OfSortedSegmentIds());
   }
+
+  @Test
+  public void testGetTotalWorkerCapacityWithAutoScale() throws Exception
+  {
+    int currentClusterCapacity = 5;
+    int maximumCapacityWithAutoScale = 10;
+    // Mock response for /druid/indexer/v1/totalWorkerCapacity
+    HttpResponse totalWorkerCapacityResponse = 
EasyMock.createMock(HttpResponse.class);
+    
EasyMock.expect(totalWorkerCapacityResponse.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
+    EasyMock.expect(totalWorkerCapacityResponse.getContent()).andReturn(new 
BigEndianHeapChannelBuffer(0));
+    EasyMock.replay(totalWorkerCapacityResponse);
+    IndexingTotalWorkerCapacityInfo indexingTotalWorkerCapacityInfo = new 
IndexingTotalWorkerCapacityInfo(currentClusterCapacity, 
maximumCapacityWithAutoScale);
+    StringFullResponseHolder autoScaleResponseHolder = new 
StringFullResponseHolder(
+        totalWorkerCapacityResponse,
+        StandardCharsets.UTF_8
+    ).addChunk(jsonMapper.writeValueAsString(indexingTotalWorkerCapacityInfo));
+    EasyMock.expect(druidLeaderClient.go(EasyMock.anyObject(Request.class)))
+            .andReturn(autoScaleResponseHolder)
+            .once();
+    EasyMock.expect(druidLeaderClient.makeRequest(HttpMethod.GET, 
"/druid/indexer/v1/totalWorkerCapacity"))
+            .andReturn(new Request(
+                HttpMethod.GET,
+                new 
URL("http://localhost:8090/druid/indexer/v1/totalWorkerCapacity";)
+            ))
+            .once();
+    EasyMock.replay(druidLeaderClient);
+
+    final int actualResponse = 
httpIndexingServiceClient.getTotalWorkerCapacityWithAutoScale();
+    Assert.assertEquals(maximumCapacityWithAutoScale, actualResponse);
+    EasyMock.verify(druidLeaderClient);
+  }
 }
 
diff --git 
a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
 
b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
index 17cbc41..447a32d 100644
--- 
a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
+++ 
b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
@@ -69,6 +69,12 @@ public class NoopIndexingServiceClient implements 
IndexingServiceClient
   }
 
   @Override
+  public int getTotalWorkerCapacityWithAutoScale()
+  {
+    return 0;
+  }
+
+  @Override
   public String runTask(String taskId, Object taskObject)
   {
     return null;
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index df81886..2892d44 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -38,6 +38,7 @@ import 
org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
 import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec;
 import org.apache.druid.client.indexing.ClientTaskQuery;
 import org.apache.druid.client.indexing.HttpIndexingServiceClient;
+import org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo;
 import org.apache.druid.client.indexing.IndexingWorker;
 import org.apache.druid.client.indexing.IndexingWorkerInfo;
 import org.apache.druid.client.indexing.TaskPayloadResponse;
@@ -133,6 +134,7 @@ public class CompactSegmentsTest
   private static final int TOTAL_BYTE_PER_DATASOURCE = 440;
   private static final int TOTAL_SEGMENT_PER_DATASOURCE = 44;
   private static final int TOTAL_INTERVAL_PER_DATASOURCE = 11;
+  private static final int MAXIMUM_CAPACITY_WITH_AUTO_SCALE = 10;
 
   @Parameterized.Parameters(name = "{0}")
   public static Collection<Object[]> constructorFeeder()
@@ -648,6 +650,36 @@ public class CompactSegmentsTest
   }
 
   @Test
+  public void 
testRunMultipleCompactionTaskSlotsWithUseAutoScaleSlotsOverMaxSlot()
+  {
+    int maxCompactionSlot = 3;
+    Assert.assertTrue(maxCompactionSlot < MAXIMUM_CAPACITY_WITH_AUTO_SCALE);
+    final TestDruidLeaderClient leaderClient = new 
TestDruidLeaderClient(JSON_MAPPER);
+    leaderClient.start();
+    final HttpIndexingServiceClient indexingServiceClient = new 
HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
+    final CompactSegments compactSegments = new 
CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
+    final CoordinatorStats stats = doCompactSegments(compactSegments, 
createCompactionConfigs(), maxCompactionSlot, true);
+    Assert.assertEquals(maxCompactionSlot, 
stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT));
+    Assert.assertEquals(maxCompactionSlot, 
stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT));
+    Assert.assertEquals(maxCompactionSlot, 
stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT));
+  }
+
+  @Test
+  public void 
testRunMultipleCompactionTaskSlotsWithUseAutoScaleSlotsUnderMaxSlot()
+  {
+    int maxCompactionSlot = 100;
+    Assert.assertFalse(maxCompactionSlot < MAXIMUM_CAPACITY_WITH_AUTO_SCALE);
+    final TestDruidLeaderClient leaderClient = new 
TestDruidLeaderClient(JSON_MAPPER);
+    leaderClient.start();
+    final HttpIndexingServiceClient indexingServiceClient = new 
HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
+    final CompactSegments compactSegments = new 
CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
+    final CoordinatorStats stats = doCompactSegments(compactSegments, 
createCompactionConfigs(), maxCompactionSlot, true);
+    Assert.assertEquals(MAXIMUM_CAPACITY_WITH_AUTO_SCALE, 
stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT));
+    Assert.assertEquals(MAXIMUM_CAPACITY_WITH_AUTO_SCALE, 
stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT));
+    Assert.assertEquals(MAXIMUM_CAPACITY_WITH_AUTO_SCALE, 
stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT));
+  }
+
+  @Test
   public void testCompactWithoutGranularitySpec()
   {
     final HttpIndexingServiceClient mockIndexingServiceClient = 
Mockito.mock(HttpIndexingServiceClient.class);
@@ -1795,14 +1827,25 @@ public class CompactSegmentsTest
       @Nullable Integer numCompactionTaskSlots
   )
   {
+    return doCompactSegments(compactSegments, compactionConfigs, 
numCompactionTaskSlots, false);
+  }
+
+  private CoordinatorStats doCompactSegments(
+      CompactSegments compactSegments,
+      List<DataSourceCompactionConfig> compactionConfigs,
+      @Nullable Integer numCompactionTaskSlots,
+      boolean useAutoScaleSlots
+  )
+  {
     DruidCoordinatorRuntimeParams params = CoordinatorRuntimeParamsTestHelpers
         .newBuilder()
         .withUsedSegmentsTimelinesPerDataSourceInTest(dataSources)
         .withCompactionConfig(
             new CoordinatorCompactionConfig(
                 compactionConfigs,
-                numCompactionTaskSlots == null ? null : 100., // 100% when 
numCompactionTaskSlots is not null
-                numCompactionTaskSlots
+                numCompactionTaskSlots == null ? null : 1., // 100% when 
numCompactionTaskSlots is not null
+                numCompactionTaskSlots,
+                useAutoScaleSlots
             )
         )
         .build();
@@ -1994,6 +2037,8 @@ public class CompactSegmentsTest
         return handleTask(request);
       } else if (urlString.contains("/druid/indexer/v1/workers")) {
         return handleWorkers();
+      } else if (urlString.contains("/druid/indexer/v1/totalWorkerCapacity")) {
+        return handleTotalWorkerCapacity();
       } else if (urlString.contains("/druid/indexer/v1/waitingTasks")
                  || urlString.contains("/druid/indexer/v1/pendingTasks")
                  || urlString.contains("/druid/indexer/v1/runningTasks")) {
@@ -2035,6 +2080,12 @@ public class CompactSegmentsTest
       return 
createStringFullResponseHolder(jsonMapper.writeValueAsString(workerInfos));
     }
 
+    private StringFullResponseHolder handleTotalWorkerCapacity() throws 
JsonProcessingException
+    {
+      IndexingTotalWorkerCapacityInfo info = new 
IndexingTotalWorkerCapacityInfo(5, 10);
+      return 
createStringFullResponseHolder(jsonMapper.writeValueAsString(info));
+    }
+
     private StringFullResponseHolder handleTask(Request request) throws 
IOException
     {
       final ClientTaskQuery taskQuery = 
jsonMapper.readValue(request.getContent().array(), ClientTaskQuery.class);
diff --git 
a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
 
b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
index 4f75bf7..6a38d5b 100644
--- 
a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
@@ -122,6 +122,7 @@ public class CoordinatorCompactionConfigsResourceTest
     Response result = 
coordinatorCompactionConfigsResource.setCompactionTaskLimit(
         compactionTaskSlotRatio,
         maxCompactionTaskSlots,
+        true,
         author,
         comment,
         mockHttpServletRequest
@@ -131,6 +132,7 @@ public class CoordinatorCompactionConfigsResourceTest
     Assert.assertEquals(oldConfigCaptor.getValue(), OLD_CONFIG_IN_BYTES);
     Assert.assertNotNull(newConfigCaptor.getValue());
     
Assert.assertEquals(newConfigCaptor.getValue().getMaxCompactionTaskSlots(), 
maxCompactionTaskSlots);
+    Assert.assertTrue(newConfigCaptor.getValue().isUseAutoScaleSlots());
     Assert.assertEquals(compactionTaskSlotRatio, 
newConfigCaptor.getValue().getCompactionTaskSlotRatio(), 0);
   }
 
@@ -279,6 +281,7 @@ public class CoordinatorCompactionConfigsResourceTest
     Response result = 
coordinatorCompactionConfigsResource.setCompactionTaskLimit(
         compactionTaskSlotRatio,
         maxCompactionTaskSlots,
+        true,
         author,
         comment,
         mockHttpServletRequest
@@ -287,6 +290,7 @@ public class CoordinatorCompactionConfigsResourceTest
     Assert.assertNull(oldConfigCaptor.getValue());
     Assert.assertNotNull(newConfigCaptor.getValue());
     
Assert.assertEquals(newConfigCaptor.getValue().getMaxCompactionTaskSlots(), 
maxCompactionTaskSlots);
+    Assert.assertTrue(newConfigCaptor.getValue().isUseAutoScaleSlots());
     Assert.assertEquals(compactionTaskSlotRatio, 
newConfigCaptor.getValue().getCompactionTaskSlotRatio(), 0);
   }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to