This is an automated email from the ASF dual-hosted git repository.
maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 2b8e7fc Add a flag to allow auto compaction task slot ratio to
consider auto scaler slots (#12228)
2b8e7fc is described below
commit 2b8e7fc0b4ab76af836594d3b14e9213fa64cff0
Author: Maytas Monsereenusorn <[email protected]>
AuthorDate: Sun Feb 6 20:46:05 2022 -0800
Add a flag to allow auto compaction task slot ratio to consider auto scaler
slots (#12228)
* add impl
* fix checkstyle
* add unit tests
* checkstyle
* add IT
* fix IT
* add comments
* fix checkstyle
---
...PendingTaskBasedWorkerProvisioningStrategy.java | 15 +-
.../overlord/autoscaling/ProvisioningStrategy.java | 15 ++
.../indexing/overlord/http/OverlordResource.java | 71 +++++++-
.../overlord/http/TotalWorkerCapacityResponse.java | 62 +++++++
.../PendingTaskBasedProvisioningStrategyTest.java | 102 ++++++++++-
.../overlord/http/OverlordResourceTest.java | 196 ++++++++++++++++++++-
.../druid/indexing/overlord/http/OverlordTest.java | 3 +-
.../clients/CompactionResourceTestClient.java | 19 +-
.../duty/ITAutoCompactionLockContentionTest.java | 2 +-
.../coordinator/duty/ITAutoCompactionTest.java | 31 +++-
.../client/indexing/HttpIndexingServiceClient.java | 26 +++
.../client/indexing/IndexingServiceClient.java | 9 +
.../indexing/IndexingTotalWorkerCapacityInfo.java | 62 +++++++
.../coordinator/CoordinatorCompactionConfig.java | 64 ++++---
.../server/coordinator/duty/CompactSegments.java | 15 +-
.../http/CoordinatorCompactionConfigsResource.java | 4 +-
.../indexing/HttpIndexingServiceClientTest.java | 31 ++++
.../client/indexing/NoopIndexingServiceClient.java | 6 +
.../coordinator/duty/CompactSegmentsTest.java | 55 +++++-
.../CoordinatorCompactionConfigsResourceTest.java | 4 +
20 files changed, 737 insertions(+), 55 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java
index 28d1bef..7d83b3d 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java
@@ -267,8 +267,7 @@ public class PendingTaskBasedWorkerProvisioningStrategy
extends AbstractWorkerPr
remoteTaskRunnerConfig,
workerConfig,
pendingTasks,
- workers,
- config.getWorkerCapacityHint()
+ workers
);
log.debug("More workers needed: %d", moreWorkersNeeded);
@@ -296,8 +295,7 @@ public class PendingTaskBasedWorkerProvisioningStrategy
extends AbstractWorkerPr
final WorkerTaskRunnerConfig workerTaskRunnerConfig,
final DefaultWorkerBehaviorConfig workerConfig,
final Collection<Task> pendingTasks,
- final Collection<ImmutableWorkerInfo> workers,
- final int workerCapacityHint
+ final Collection<ImmutableWorkerInfo> workers
)
{
final Collection<ImmutableWorkerInfo> validWorkers = Collections2.filter(
@@ -312,7 +310,7 @@ public class PendingTaskBasedWorkerProvisioningStrategy
extends AbstractWorkerPr
}
WorkerSelectStrategy workerSelectStrategy =
workerConfig.getSelectStrategy();
int need = 0;
- int capacity = getExpectedWorkerCapacity(workers, workerCapacityHint);
+ int capacity = getExpectedWorkerCapacity(workers);
log.info("Expected worker capacity: %d", capacity);
// Simulate assigning tasks to dummy workers using configured
workerSelectStrategy
@@ -458,14 +456,15 @@ public class PendingTaskBasedWorkerProvisioningStrategy
extends AbstractWorkerPr
return currValidWorkers;
}
- private static int getExpectedWorkerCapacity(final
Collection<ImmutableWorkerInfo> workers, final int workerCapacityHint)
+ @Override
+ public int getExpectedWorkerCapacity(final Collection<ImmutableWorkerInfo>
workers)
{
int size = workers.size();
if (size == 0) {
// No existing workers
- if (workerCapacityHint > 0) {
+ if (config.getWorkerCapacityHint() > 0) {
// Return workerCapacityHint if it is set in config
- return workerCapacityHint;
+ return config.getWorkerCapacityHint();
} else {
// Assume capacity per worker as 1
return 1;
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/ProvisioningStrategy.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/ProvisioningStrategy.java
index 1f940fd..d4e3b5b 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/ProvisioningStrategy.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/ProvisioningStrategy.java
@@ -20,8 +20,12 @@
package org.apache.druid.indexing.overlord.autoscaling;
import org.apache.druid.guice.annotations.ExtensionPoint;
+import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.TaskRunner;
+import javax.annotation.Nonnull;
+import java.util.Collection;
+
/**
* In general, the resource management is tied to the runner.
*/
@@ -35,4 +39,15 @@ public interface ProvisioningStrategy<T extends TaskRunner>
* @param runner The TaskRunner state holder this strategy should use during
execution
*/
ProvisioningService makeProvisioningService(T runner);
+
+ /**
+ * Returns the expected number of task slots available for each worker.
+ * This method can returns -1 if the provisioning strategy does not support
getting the expected worker capacity.
+ *
+ * @return the expected number of task slots available for each worker if
provisioning strategy support getting the expected worker capacity, otherwise -1
+ */
+ default int getExpectedWorkerCapacity(@Nonnull
Collection<ImmutableWorkerInfo> workers)
+ {
+ return -1;
+ }
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
index 0076eb0..85bb3db 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
@@ -45,6 +45,7 @@ import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionHolder;
import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskQueue;
@@ -53,8 +54,10 @@ import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorageQueryAdapter;
import org.apache.druid.indexing.overlord.WorkerTaskRunner;
import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter;
+import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.overlord.http.security.TaskResourceFilter;
+import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
@@ -122,6 +125,7 @@ public class OverlordResource
private final AuditManager auditManager;
private final AuthorizerMapper authorizerMapper;
private final WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter;
+ private final ProvisioningStrategy provisioningStrategy;
private AtomicReference<WorkerBehaviorConfig> workerConfigRef = null;
private static final List API_TASK_STATES = ImmutableList.of("pending",
"waiting", "running", "complete");
@@ -135,7 +139,8 @@ public class OverlordResource
JacksonConfigManager configManager,
AuditManager auditManager,
AuthorizerMapper authorizerMapper,
- WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter
+ WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter,
+ ProvisioningStrategy provisioningStrategy
)
{
this.taskMaster = taskMaster;
@@ -146,6 +151,7 @@ public class OverlordResource
this.auditManager = auditManager;
this.authorizerMapper = authorizerMapper;
this.workerTaskRunnerQueryAdapter = workerTaskRunnerQueryAdapter;
+ this.provisioningStrategy = provisioningStrategy;
}
/**
@@ -422,6 +428,69 @@ public class OverlordResource
return Response.ok(workerConfigRef.get()).build();
}
+ /**
+ * Gets the total worker capacity of varies states of the cluster.
+ */
+ @GET
+ @Path("/totalWorkerCapacity")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ResourceFilters(ConfigResourceFilter.class)
+ public Response getTotalWorkerCapacity()
+ {
+ // Calculate current cluster capacity
+ int currentCapacity;
+ Optional<TaskRunner> taskRunnerOptional = taskMaster.getTaskRunner();
+ if (!taskRunnerOptional.isPresent()) {
+ // Cannot serve call as not leader
+ return Response.status(Response.Status.SERVICE_UNAVAILABLE).build();
+ }
+ TaskRunner taskRunner = taskRunnerOptional.get();
+ Collection<ImmutableWorkerInfo> workers;
+ if (taskRunner instanceof WorkerTaskRunner) {
+ workers = ((WorkerTaskRunner) taskRunner).getWorkers();
+ currentCapacity = workers.stream().mapToInt(workerInfo ->
workerInfo.getWorker().getCapacity()).sum();
+ } else {
+ log.debug(
+ "Cannot calculate capacity as task runner [%s] of type [%s] does not
support listing workers",
+ taskRunner,
+ taskRunner.getClass().getName()
+ );
+ workers = ImmutableList.of();
+ currentCapacity = -1;
+ }
+
+ // Calculate maximum capacity with auto scale
+ int maximumCapacity;
+ if (workerConfigRef == null) {
+ workerConfigRef = configManager.watch(WorkerBehaviorConfig.CONFIG_KEY,
WorkerBehaviorConfig.class);
+ }
+ WorkerBehaviorConfig workerBehaviorConfig = workerConfigRef.get();
+ if (workerBehaviorConfig == null) {
+ // Auto scale not setup
+ log.debug("Cannot calculate maximum worker capacity as worker behavior
config is not configured");
+ maximumCapacity = -1;
+ } else if (workerBehaviorConfig instanceof DefaultWorkerBehaviorConfig) {
+ DefaultWorkerBehaviorConfig defaultWorkerBehaviorConfig =
(DefaultWorkerBehaviorConfig) workerBehaviorConfig;
+ if (defaultWorkerBehaviorConfig.getAutoScaler() == null) {
+ // Auto scale not setup
+ log.debug("Cannot calculate maximum worker capacity as auto scaler not
configured");
+ maximumCapacity = -1;
+ } else {
+ int maxWorker =
defaultWorkerBehaviorConfig.getAutoScaler().getMaxNumWorkers();
+ int expectedWorkerCapacity =
provisioningStrategy.getExpectedWorkerCapacity(workers);
+ maximumCapacity = expectedWorkerCapacity == -1 ? -1 : maxWorker *
expectedWorkerCapacity;
+ }
+ } else {
+ // Auto scale is not using DefaultWorkerBehaviorConfig
+ log.debug("Cannot calculate maximum worker capacity as
WorkerBehaviorConfig [%s] of type [%s] does not support getting max capacity",
+ workerBehaviorConfig,
+ workerBehaviorConfig.getClass().getSimpleName()
+ );
+ maximumCapacity = -1;
+ }
+ return Response.ok(new TotalWorkerCapacityResponse(currentCapacity,
maximumCapacity)).build();
+ }
+
// default value is used for backwards compatibility
@POST
@Path("/worker")
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/TotalWorkerCapacityResponse.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/TotalWorkerCapacityResponse.java
new file mode 100644
index 0000000..f40a8f2
--- /dev/null
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/TotalWorkerCapacityResponse.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.http;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Should be synchronized with
org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo
+ */
+public class TotalWorkerCapacityResponse
+{
+ /**
+ * The total worker capacity of the current state of the cluster. This can
be -1 if
+ * it cannot be determined.
+ */
+ private final int currentClusterCapacity;
+ /**
+ * The total worker capacity of the cluster including auto scaling
capability (scaling to max workers).
+ * This can be -1 if it cannot be determined or if auto scaling is not
configured.
+ */
+ private final int maximumCapacityWithAutoScale;
+
+ @JsonCreator
+ public TotalWorkerCapacityResponse(
+ @JsonProperty("currentClusterCapacity") int currentClusterCapacity,
+ @JsonProperty("maximumCapacityWithAutoScale") int
maximumCapacityWithAutoScale
+ )
+ {
+ this.currentClusterCapacity = currentClusterCapacity;
+ this.maximumCapacityWithAutoScale = maximumCapacityWithAutoScale;
+ }
+
+ @JsonProperty
+ public int getCurrentClusterCapacity()
+ {
+ return currentClusterCapacity;
+ }
+
+ @JsonProperty
+ public int getMaximumCapacityWithAutoScale()
+ {
+ return maximumCapacityWithAutoScale;
+ }
+}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java
index 34af0ca..249f510 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java
@@ -28,6 +28,7 @@ import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TestTasks;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.RemoteTaskRunner;
import org.apache.druid.indexing.overlord.RemoteTaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.ZkWorker;
@@ -54,8 +55,10 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
@@ -66,6 +69,7 @@ public class PendingTaskBasedProvisioningStrategyTest
{
private AutoScaler autoScaler;
private Task testTask;
+ private PendingTaskBasedWorkerProvisioningConfig config;
private PendingTaskBasedWorkerProvisioningStrategy strategy;
private AtomicReference<WorkerBehaviorConfig> workerConfig;
private ScheduledExecutorService executorService =
Execs.scheduledSingleThreaded("test service");
@@ -79,7 +83,7 @@ public class PendingTaskBasedProvisioningStrategyTest
testTask = TestTasks.immediateSuccess("task1");
- PendingTaskBasedWorkerProvisioningConfig config = new
PendingTaskBasedWorkerProvisioningConfig()
+ config = new PendingTaskBasedWorkerProvisioningConfig()
.setMaxScalingDuration(new Period(1000))
.setNumEventsToTrack(10)
.setPendingTaskTimeout(new Period(0))
@@ -109,6 +113,102 @@ public class PendingTaskBasedProvisioningStrategyTest
}
@Test
+ public void testGetExpectedWorkerCapacityWithNoWorkerAndHintIsValid()
+ {
+ int capacityHint = 10;
+ config = new PendingTaskBasedWorkerProvisioningConfig()
+ .setMaxScalingDuration(new Period(1000))
+ .setNumEventsToTrack(10)
+ .setPendingTaskTimeout(new Period(0))
+ .setWorkerVersion(MIN_VERSION)
+ .setMaxScalingStep(2)
+ .setWorkerCapacityHint(capacityHint);
+ strategy = new PendingTaskBasedWorkerProvisioningStrategy(
+ config,
+ DSuppliers.of(workerConfig),
+ new ProvisioningSchedulerConfig(),
+ new Supplier<ScheduledExecutorService>()
+ {
+ @Override
+ public ScheduledExecutorService get()
+ {
+ return executorService;
+ }
+ }
+ );
+ int expectedWorkerCapacity =
strategy.getExpectedWorkerCapacity(ImmutableList.of());
+ Assert.assertEquals(capacityHint, expectedWorkerCapacity);
+ }
+
+ @Test
+ public void testGetExpectedWorkerCapacityWithNoWorkerAndHintIsNotValid()
+ {
+ int capacityHint = -1;
+ config = new PendingTaskBasedWorkerProvisioningConfig()
+ .setMaxScalingDuration(new Period(1000))
+ .setNumEventsToTrack(10)
+ .setPendingTaskTimeout(new Period(0))
+ .setWorkerVersion(MIN_VERSION)
+ .setMaxScalingStep(2)
+ .setWorkerCapacityHint(capacityHint);
+ strategy = new PendingTaskBasedWorkerProvisioningStrategy(
+ config,
+ DSuppliers.of(workerConfig),
+ new ProvisioningSchedulerConfig(),
+ new Supplier<ScheduledExecutorService>()
+ {
+ @Override
+ public ScheduledExecutorService get()
+ {
+ return executorService;
+ }
+ }
+ );
+ int expectedWorkerCapacity =
strategy.getExpectedWorkerCapacity(ImmutableList.of());
+ Assert.assertEquals(1, expectedWorkerCapacity);
+ }
+
+ @Test
+ public void testGetExpectedWorkerCapacityWithSingleWorker()
+ {
+ int workerCapacity = 3;
+ Collection<ImmutableWorkerInfo> workerInfoCollection = ImmutableList.of(
+ new ImmutableWorkerInfo(
+ new Worker("http", "localhost0", "localhost0", workerCapacity,
"v1", WorkerConfig.DEFAULT_CATEGORY), 0,
+ new HashSet<>(),
+ new HashSet<>(),
+ DateTimes.nowUtc()
+ )
+ );
+ int expectedWorkerCapacity =
strategy.getExpectedWorkerCapacity(workerInfoCollection);
+ Assert.assertEquals(workerCapacity, expectedWorkerCapacity);
+ }
+
+ @Test
+ public void testGetExpectedWorkerCapacityWithMultipleWorker()
+ {
+ int workerOneCapacity = 3;
+ int workerTwoCapacity = 6;
+ Collection<ImmutableWorkerInfo> workerInfoCollection = ImmutableList.of(
+ new ImmutableWorkerInfo(
+ new Worker("http", "localhost0", "localhost0", workerOneCapacity,
"v1", WorkerConfig.DEFAULT_CATEGORY), 0,
+ new HashSet<>(),
+ new HashSet<>(),
+ DateTimes.nowUtc()
+ ),
+ new ImmutableWorkerInfo(
+ new Worker("http", "localhost0", "localhost0", workerTwoCapacity +
3, "v1", WorkerConfig.DEFAULT_CATEGORY), 0,
+ new HashSet<>(),
+ new HashSet<>(),
+ DateTimes.nowUtc()
+ )
+ );
+ int expectedWorkerCapacity =
strategy.getExpectedWorkerCapacity(workerInfoCollection);
+ // Use capacity of the first worker in the list
+ Assert.assertEquals(workerOneCapacity, expectedWorkerCapacity);
+ }
+
+ @Test
public void testFailIfMinWorkerIsZeroAndWorkerHintNotSet()
{
EmittingLogger mockLogger = EasyMock.createMock(EmittingLogger.class);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
index b919706..638a33d 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
@@ -24,7 +24,9 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskLocation;
@@ -37,13 +39,21 @@ import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskQueue;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorageQueryAdapter;
+import org.apache.druid.indexing.overlord.WorkerTaskRunner;
import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter;
+import org.apache.druid.indexing.overlord.autoscaling.AutoScaler;
+import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
+import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
+import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
+import org.apache.druid.indexing.worker.Worker;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.RE;
@@ -79,11 +89,14 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
public class OverlordResourceTest
{
private OverlordResource overlordResource;
private TaskMaster taskMaster;
+ private JacksonConfigManager configManager;
+ private ProvisioningStrategy provisioningStrategy;
private TaskStorageQueryAdapter taskStorageQueryAdapter;
private IndexerMetadataStorageAdapter indexerMetadataStorageAdapter;
private HttpServletRequest req;
@@ -97,6 +110,8 @@ public class OverlordResourceTest
public void setUp()
{
taskRunner = EasyMock.createMock(TaskRunner.class);
+ configManager = EasyMock.createMock(JacksonConfigManager.class);
+ provisioningStrategy = EasyMock.createMock(ProvisioningStrategy.class);
taskMaster = EasyMock.createStrictMock(TaskMaster.class);
taskStorageQueryAdapter =
EasyMock.createStrictMock(TaskStorageQueryAdapter.class);
indexerMetadataStorageAdapter =
EasyMock.createStrictMock(IndexerMetadataStorageAdapter.class);
@@ -145,10 +160,11 @@ public class OverlordResourceTest
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
null,
- null,
+ configManager,
null,
authMapper,
- workerTaskRunnerQueryAdapter
+ workerTaskRunnerQueryAdapter,
+ provisioningStrategy
);
}
@@ -1447,6 +1463,182 @@ public class OverlordResourceTest
Assert.assertEquals(ImmutableMap.of("error", "Worker API returns error!"),
response.getEntity());
}
+ @Test
+ public void testGetTotalWorkerCapacityNotLeader()
+ {
+ EasyMock.reset(taskMaster);
+ EasyMock.expect(taskMaster.getTaskRunner()).andReturn(
+ Optional.absent()
+ ).anyTimes();
+ EasyMock.replay(
+ taskRunner,
+ taskMaster,
+ taskStorageQueryAdapter,
+ indexerMetadataStorageAdapter,
+ req,
+ workerTaskRunnerQueryAdapter,
+ configManager
+ );
+ final Response response = overlordResource.getTotalWorkerCapacity();
+ Assert.assertEquals(HttpResponseStatus.SERVICE_UNAVAILABLE.getCode(),
response.getStatus());
+ }
+
+ @Test
+ public void testGetTotalWorkerCapacityWithUnknown()
+ {
+ WorkerBehaviorConfig workerBehaviorConfig =
EasyMock.createMock(WorkerBehaviorConfig.class);
+ AtomicReference<WorkerBehaviorConfig> workerBehaviorConfigAtomicReference
= new AtomicReference<>(workerBehaviorConfig);
+ EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY,
WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference);
+ EasyMock.replay(
+ taskRunner,
+ taskMaster,
+ taskStorageQueryAdapter,
+ indexerMetadataStorageAdapter,
+ req,
+ workerTaskRunnerQueryAdapter,
+ configManager
+ );
+ final Response response = overlordResource.getTotalWorkerCapacity();
+ Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
+ Assert.assertEquals(-1, ((TotalWorkerCapacityResponse)
response.getEntity()).getCurrentClusterCapacity());
+ Assert.assertEquals(-1, ((TotalWorkerCapacityResponse)
response.getEntity()).getMaximumCapacityWithAutoScale());
+ }
+
+ @Test
+ public void
testGetTotalWorkerCapacityWithWorkerTaskRunnerButWorkerBehaviorConfigNotConfigured()
+ {
+ AtomicReference<WorkerBehaviorConfig> workerBehaviorConfigAtomicReference
= new AtomicReference<>(null);
+ EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY,
WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference);
+ EasyMock.replay(
+ taskRunner,
+ taskMaster,
+ taskStorageQueryAdapter,
+ indexerMetadataStorageAdapter,
+ req,
+ workerTaskRunnerQueryAdapter,
+ configManager
+ );
+ final Response response = overlordResource.getTotalWorkerCapacity();
+ Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
+ Assert.assertEquals(-1, ((TotalWorkerCapacityResponse)
response.getEntity()).getCurrentClusterCapacity());
+ Assert.assertEquals(-1, ((TotalWorkerCapacityResponse)
response.getEntity()).getMaximumCapacityWithAutoScale());
+ }
+
+ @Test
+ public void
testGetTotalWorkerCapacityWithWorkerTaskRunnerButAutoScaleNotConfigured()
+ {
+ DefaultWorkerBehaviorConfig workerBehaviorConfig = new
DefaultWorkerBehaviorConfig(null, null);
+ AtomicReference<WorkerBehaviorConfig> workerBehaviorConfigAtomicReference
= new AtomicReference<>(workerBehaviorConfig);
+ EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY,
WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference);
+ EasyMock.replay(
+ taskRunner,
+ taskMaster,
+ taskStorageQueryAdapter,
+ indexerMetadataStorageAdapter,
+ req,
+ workerTaskRunnerQueryAdapter,
+ configManager
+ );
+ final Response response = overlordResource.getTotalWorkerCapacity();
+ Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
+ Assert.assertEquals(-1, ((TotalWorkerCapacityResponse)
response.getEntity()).getCurrentClusterCapacity());
+ Assert.assertEquals(-1, ((TotalWorkerCapacityResponse)
response.getEntity()).getMaximumCapacityWithAutoScale());
+ }
+
+ @Test
+ public void
testGetTotalWorkerCapacityWithAutoScaleConfiguredAndProvisioningStrategySupportExpectedWorkerCapacity()
+ {
+ int expectedWorkerCapacity = 3;
+ int maxNumWorkers = 2;
+ WorkerTaskRunner workerTaskRunner =
EasyMock.createMock(WorkerTaskRunner.class);
+ Collection<ImmutableWorkerInfo> workerInfos = ImmutableList.of(
+ new ImmutableWorkerInfo(
+ new Worker(
+ "http", "testWorker", "192.0.0.1", expectedWorkerCapacity,
"v1", WorkerConfig.DEFAULT_CATEGORY
+ ),
+ 2,
+ ImmutableSet.of("grp1", "grp2"),
+ ImmutableSet.of("task1", "task2"),
+ DateTimes.of("2015-01-01T01:01:01Z")
+ )
+ );
+ EasyMock.expect(workerTaskRunner.getWorkers()).andReturn(workerInfos);
+ EasyMock.reset(taskMaster);
+ EasyMock.expect(taskMaster.getTaskRunner()).andReturn(
+ Optional.of(workerTaskRunner)
+ ).anyTimes();
+
EasyMock.expect(provisioningStrategy.getExpectedWorkerCapacity(workerInfos)).andReturn(expectedWorkerCapacity).anyTimes();
+ AutoScaler autoScaler = EasyMock.createMock(AutoScaler.class);
+ EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
+ EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(maxNumWorkers);
+ DefaultWorkerBehaviorConfig workerBehaviorConfig = new
DefaultWorkerBehaviorConfig(null, autoScaler);
+ AtomicReference<WorkerBehaviorConfig> workerBehaviorConfigAtomicReference
= new AtomicReference<>(workerBehaviorConfig);
+ EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY,
WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference);
+ EasyMock.replay(
+ workerTaskRunner,
+ autoScaler,
+ taskRunner,
+ taskMaster,
+ taskStorageQueryAdapter,
+ indexerMetadataStorageAdapter,
+ req,
+ workerTaskRunnerQueryAdapter,
+ configManager,
+ provisioningStrategy
+ );
+ final Response response = overlordResource.getTotalWorkerCapacity();
+ Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
+ Assert.assertEquals(expectedWorkerCapacity, ((TotalWorkerCapacityResponse)
response.getEntity()).getCurrentClusterCapacity());
+ Assert.assertEquals(expectedWorkerCapacity * maxNumWorkers,
((TotalWorkerCapacityResponse)
response.getEntity()).getMaximumCapacityWithAutoScale());
+ }
+
+ @Test
+ public void
testGetTotalWorkerCapacityWithAutoScaleConfiguredAndProvisioningStrategyNotSupportExpectedWorkerCapacity()
+ {
+ int invalidExpectedCapacity = -1;
+ int maxNumWorkers = 2;
+ WorkerTaskRunner workerTaskRunner =
EasyMock.createMock(WorkerTaskRunner.class);
+ Collection<ImmutableWorkerInfo> workerInfos = ImmutableList.of(
+ new ImmutableWorkerInfo(
+ new Worker(
+ "http", "testWorker", "192.0.0.1", 3, "v1",
WorkerConfig.DEFAULT_CATEGORY
+ ),
+ 2,
+ ImmutableSet.of("grp1", "grp2"),
+ ImmutableSet.of("task1", "task2"),
+ DateTimes.of("2015-01-01T01:01:01Z")
+ )
+ );
+ EasyMock.expect(workerTaskRunner.getWorkers()).andReturn(workerInfos);
+ EasyMock.reset(taskMaster);
+ EasyMock.expect(taskMaster.getTaskRunner()).andReturn(
+ Optional.of(workerTaskRunner)
+ ).anyTimes();
+
EasyMock.expect(provisioningStrategy.getExpectedWorkerCapacity(workerInfos)).andReturn(invalidExpectedCapacity).anyTimes();
+ AutoScaler autoScaler = EasyMock.createMock(AutoScaler.class);
+ EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
+ EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(maxNumWorkers);
+ DefaultWorkerBehaviorConfig workerBehaviorConfig = new
DefaultWorkerBehaviorConfig(null, autoScaler);
+ AtomicReference<WorkerBehaviorConfig> workerBehaviorConfigAtomicReference
= new AtomicReference<>(workerBehaviorConfig);
+ EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY,
WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference);
+ EasyMock.replay(
+ workerTaskRunner,
+ autoScaler,
+ taskRunner,
+ taskMaster,
+ taskStorageQueryAdapter,
+ indexerMetadataStorageAdapter,
+ req,
+ workerTaskRunnerQueryAdapter,
+ configManager,
+ provisioningStrategy
+ );
+ final Response response = overlordResource.getTotalWorkerCapacity();
+ Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
+
Assert.assertEquals(workerInfos.stream().findFirst().get().getWorker().getCapacity(),
((TotalWorkerCapacityResponse)
response.getEntity()).getCurrentClusterCapacity());
+ Assert.assertEquals(invalidExpectedCapacity,
((TotalWorkerCapacityResponse)
response.getEntity()).getMaximumCapacityWithAutoScale());
+ }
+
private void expectAuthorizationTokenCheck()
{
expectAuthorizationTokenCheck(Users.DRUID);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
index 5655a13..40ed6e0 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
@@ -229,7 +229,8 @@ public class OverlordTest
null,
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
- workerTaskRunnerQueryAdapter
+ workerTaskRunnerQueryAdapter,
+ null
);
Response response = overlordResource.getLeader();
Assert.assertEquals(druidNode.getHostAndPort(), response.getEntity());
diff --git
a/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java
b/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java
index 2fd1a5e..8e5ee73 100644
---
a/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java
+++
b/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java
@@ -145,12 +145,21 @@ public class CompactionResourceTestClient
}
}
- public void updateCompactionTaskSlot(Double compactionTaskSlotRatio, Integer
maxCompactionTaskSlots) throws Exception
+ public void updateCompactionTaskSlot(Double compactionTaskSlotRatio, Integer
maxCompactionTaskSlots, Boolean useAutoScaleSlots) throws Exception
{
- String url =
StringUtils.format("%sconfig/compaction/taskslots?ratio=%s&max=%s",
- getCoordinatorURL(),
-
StringUtils.urlEncode(compactionTaskSlotRatio.toString()),
-
StringUtils.urlEncode(maxCompactionTaskSlots.toString()));
+ String url;
+ if (useAutoScaleSlots == null) {
+ url = StringUtils.format("%sconfig/compaction/taskslots?ratio=%s&max=%s",
+ getCoordinatorURL(),
+
StringUtils.urlEncode(compactionTaskSlotRatio.toString()),
+
StringUtils.urlEncode(maxCompactionTaskSlots.toString()));
+ } else {
+ url =
StringUtils.format("%sconfig/compaction/taskslots?ratio=%s&max=%s&useAutoScaleSlots=%s",
+ getCoordinatorURL(),
+
StringUtils.urlEncode(compactionTaskSlotRatio.toString()),
+
StringUtils.urlEncode(maxCompactionTaskSlots.toString()),
+
StringUtils.urlEncode(useAutoScaleSlots.toString()));
+ }
StatusResponseHolder response = httpClient.go(new Request(HttpMethod.POST,
new URL(url)), responseHandler).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java
index 84c1bf5..8d980d7 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java
@@ -306,7 +306,7 @@ public class ITAutoCompactionLockContentionTest extends
AbstractKafkaIndexingSer
{
final DataSourceCompactionConfig compactionConfig = CompactionUtil
.createCompactionConfig(fullDatasourceName,
Specs.MAX_ROWS_PER_SEGMENT, Period.ZERO);
- compactionResource.updateCompactionTaskSlot(0.5, 10);
+ compactionResource.updateCompactionTaskSlot(0.5, 10, null);
compactionResource.submitCompactionConfig(compactionConfig);
// Wait for compaction config to persist
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
index a79fe88..f9c462a 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
@@ -102,7 +102,7 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
public void setup() throws Exception
{
// Set comapction slot to 5
- updateCompactionTaskSlot(0.5, 10);
+ updateCompactionTaskSlot(0.5, 10, null);
fullDatasourceName = "wikipedia_index_test_" + UUID.randomUUID() +
config.getExtraDatasourceNameSuffix();
}
@@ -235,7 +235,7 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
public void testAutoCompactionDutyCanUpdateTaskSlots() throws Exception
{
// Set compactionTaskSlotRatio to 0 to prevent any compaction
- updateCompactionTaskSlot(0, 0);
+ updateCompactionTaskSlot(0, 0, null);
loadData(INDEX_TASK);
try (final Closeable ignored = unloader(fullDatasourceName)) {
final List<String> intervalsBeforeCompaction =
coordinator.getSegmentIntervals(fullDatasourceName);
@@ -252,7 +252,7 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
checkCompactionIntervals(intervalsBeforeCompaction);
Assert.assertNull(compactionResource.getCompactionStatus(fullDatasourceName));
// Update compaction slots to be 1
- updateCompactionTaskSlot(1, 1);
+ updateCompactionTaskSlot(1, 1, null);
// One day compacted (1 new segment) and one day remains uncompacted. (3
total)
forceTriggerAutoCompaction(3);
verifyQuery(INDEX_QUERIES_RESOURCE);
@@ -898,6 +898,24 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
}
}
+ @Test
+ public void testUpdateCompactionTaskSlotWithUseAutoScaleSlots() throws
Exception
+ {
+ // First try update without useAutoScaleSlots
+ updateCompactionTaskSlot(3, 5, null);
+ CoordinatorCompactionConfig coordinatorCompactionConfig =
compactionResource.getCoordinatorCompactionConfigs();
+ // Should be default value which is false
+ Assert.assertFalse(coordinatorCompactionConfig.isUseAutoScaleSlots());
+ // Now try update from default value to useAutoScaleSlots=true
+ updateCompactionTaskSlot(3, 5, true);
+ coordinatorCompactionConfig =
compactionResource.getCoordinatorCompactionConfigs();
+ Assert.assertTrue(coordinatorCompactionConfig.isUseAutoScaleSlots());
+ // Now try update from useAutoScaleSlots=true to useAutoScaleSlots=false
+ updateCompactionTaskSlot(3, 5, false);
+ coordinatorCompactionConfig =
compactionResource.getCoordinatorCompactionConfigs();
+ Assert.assertFalse(coordinatorCompactionConfig.isUseAutoScaleSlots());
+ }
+
private void loadData(String indexTask) throws Exception
{
loadData(indexTask, ImmutableMap.of());
@@ -1124,13 +1142,16 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
}
}
- private void updateCompactionTaskSlot(double compactionTaskSlotRatio, int
maxCompactionTaskSlots) throws Exception
+ private void updateCompactionTaskSlot(double compactionTaskSlotRatio, int
maxCompactionTaskSlots, Boolean useAutoScaleSlots) throws Exception
{
- compactionResource.updateCompactionTaskSlot(compactionTaskSlotRatio,
maxCompactionTaskSlots);
+ compactionResource.updateCompactionTaskSlot(compactionTaskSlotRatio,
maxCompactionTaskSlots, useAutoScaleSlots);
// Verify that the compaction config is updated correctly.
CoordinatorCompactionConfig coordinatorCompactionConfig =
compactionResource.getCoordinatorCompactionConfigs();
Assert.assertEquals(coordinatorCompactionConfig.getCompactionTaskSlotRatio(),
compactionTaskSlotRatio);
Assert.assertEquals(coordinatorCompactionConfig.getMaxCompactionTaskSlots(),
maxCompactionTaskSlots);
+ if (useAutoScaleSlots != null) {
+ Assert.assertEquals(coordinatorCompactionConfig.isUseAutoScaleSlots(),
useAutoScaleSlots.booleanValue());
+ }
}
private void getAndAssertCompactionStatus(
diff --git
a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
index ee21ad0..b63f3f0 100644
---
a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
+++
b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
@@ -221,6 +221,32 @@ public class HttpIndexingServiceClient implements
IndexingServiceClient
}
@Override
+ public int getTotalWorkerCapacityWithAutoScale()
+ {
+ try {
+ final StringFullResponseHolder response = druidLeaderClient.go(
+ druidLeaderClient.makeRequest(HttpMethod.GET,
"/druid/indexer/v1/totalWorkerCapacity")
+ .setHeader("Content-Type",
MediaType.APPLICATION_JSON)
+ );
+ if (!response.getStatus().equals(HttpResponseStatus.OK)) {
+ throw new ISE(
+ "Error while getting total worker capacity. status[%s]
content[%s]",
+ response.getStatus(),
+ response.getContent()
+ );
+ }
+ final IndexingTotalWorkerCapacityInfo indexingTotalWorkerCapacityInfo =
jsonMapper.readValue(
+ response.getContent(),
+ new TypeReference<IndexingTotalWorkerCapacityInfo>() {}
+ );
+ return indexingTotalWorkerCapacityInfo.getMaximumCapacityWithAutoScale();
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
public List<TaskStatusPlus> getActiveTasks()
{
// Must retrieve waiting, then pending, then running, so if tasks move
from one state to the next between
diff --git
a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
index 7429c55..000f132 100644
---
a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
+++
b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
@@ -49,8 +49,17 @@ public interface IndexingServiceClient
@Nullable Map<String, Object> context
);
+ /**
+ * Gets the total worker capacity of the current state of the cluster. This
can be -1 if it cannot be determined.
+ */
int getTotalWorkerCapacity();
+ /**
+ * Gets the total worker capacity of the cluster including auto scaling
capability (scaling to max workers).
+ * This can be -1 if it cannot be determined or if auto scaling is not
configured.
+ */
+ int getTotalWorkerCapacityWithAutoScale();
+
String runTask(String taskId, Object taskObject);
String cancelTask(String taskId);
diff --git
a/server/src/main/java/org/apache/druid/client/indexing/IndexingTotalWorkerCapacityInfo.java
b/server/src/main/java/org/apache/druid/client/indexing/IndexingTotalWorkerCapacityInfo.java
new file mode 100644
index 0000000..bd3194d
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/client/indexing/IndexingTotalWorkerCapacityInfo.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client.indexing;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Should be synchronized with
org.apache.druid.indexing.overlord.http.TotalWorkerCapacityResponse
+ */
+public class IndexingTotalWorkerCapacityInfo
+{
+ /**
+ * The total worker capacity of the current state of the cluster. This can
be -1 if
+ * it cannot be determined.
+ */
+ private final int currentClusterCapacity;
+ /**
+ * The total worker capacity of the cluster including auto scaling
capability (scaling to max workers).
+ * This can be -1 if it cannot be determined or if auto scaling is not
configured.
+ */
+ private final int maximumCapacityWithAutoScale;
+
+ @JsonCreator
+ public IndexingTotalWorkerCapacityInfo(
+ @JsonProperty("currentClusterCapacity") int currentClusterCapacity,
+ @JsonProperty("maximumCapacityWithAutoScale") int
maximumCapacityWithAutoScale
+ )
+ {
+ this.currentClusterCapacity = currentClusterCapacity;
+ this.maximumCapacityWithAutoScale = maximumCapacityWithAutoScale;
+ }
+
+ @JsonProperty
+ public int getCurrentClusterCapacity()
+ {
+ return currentClusterCapacity;
+ }
+
+ @JsonProperty
+ public int getMaximumCapacityWithAutoScale()
+ {
+ return maximumCapacityWithAutoScale;
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java
b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java
index 409a813..1c4ab4e 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java
@@ -39,10 +39,12 @@ public class CoordinatorCompactionConfig
private static final double DEFAULT_COMPACTION_TASK_RATIO = 0.1;
private static final int DEFAILT_MAX_COMPACTION_TASK_SLOTS =
Integer.MAX_VALUE;
+ private static final boolean DEFAULT_USE_AUTO_SCALE_SLOTS = false;
private final List<DataSourceCompactionConfig> compactionConfigs;
private final double compactionTaskSlotRatio;
private final int maxCompactionTaskSlots;
+ private final boolean useAutoScaleSlots;
public static CoordinatorCompactionConfig from(
CoordinatorCompactionConfig baseConfig,
@@ -52,31 +54,34 @@ public class CoordinatorCompactionConfig
return new CoordinatorCompactionConfig(
compactionConfigs,
baseConfig.compactionTaskSlotRatio,
- baseConfig.maxCompactionTaskSlots
+ baseConfig.maxCompactionTaskSlots,
+ baseConfig.useAutoScaleSlots
);
}
public static CoordinatorCompactionConfig from(
CoordinatorCompactionConfig baseConfig,
@Nullable Double compactionTaskSlotRatio,
- @Nullable Integer maxCompactionTaskSlots
+ @Nullable Integer maxCompactionTaskSlots,
+ @Nullable Boolean useAutoScaleSlots
)
{
return new CoordinatorCompactionConfig(
baseConfig.compactionConfigs,
compactionTaskSlotRatio == null ? baseConfig.compactionTaskSlotRatio :
compactionTaskSlotRatio,
- maxCompactionTaskSlots == null ? baseConfig.maxCompactionTaskSlots :
maxCompactionTaskSlots
+ maxCompactionTaskSlots == null ? baseConfig.maxCompactionTaskSlots :
maxCompactionTaskSlots,
+ useAutoScaleSlots == null ? baseConfig.useAutoScaleSlots :
useAutoScaleSlots
);
}
public static CoordinatorCompactionConfig
from(List<DataSourceCompactionConfig> compactionConfigs)
{
- return new CoordinatorCompactionConfig(compactionConfigs, null, null);
+ return new CoordinatorCompactionConfig(compactionConfigs, null, null,
null);
}
public static CoordinatorCompactionConfig empty()
{
- return new CoordinatorCompactionConfig(ImmutableList.of(), null, null);
+ return new CoordinatorCompactionConfig(ImmutableList.of(), null, null,
null);
}
public static AtomicReference<CoordinatorCompactionConfig> watch(final
JacksonConfigManager configManager)
@@ -113,7 +118,8 @@ public class CoordinatorCompactionConfig
public CoordinatorCompactionConfig(
@JsonProperty("compactionConfigs") List<DataSourceCompactionConfig>
compactionConfigs,
@JsonProperty("compactionTaskSlotRatio") @Nullable Double
compactionTaskSlotRatio,
- @JsonProperty("maxCompactionTaskSlots") @Nullable Integer
maxCompactionTaskSlots
+ @JsonProperty("maxCompactionTaskSlots") @Nullable Integer
maxCompactionTaskSlots,
+ @JsonProperty("useAutoScaleSlots") @Nullable Boolean useAutoScaleSlots
)
{
this.compactionConfigs = compactionConfigs;
@@ -123,6 +129,9 @@ public class CoordinatorCompactionConfig
this.maxCompactionTaskSlots = maxCompactionTaskSlots == null ?
DEFAILT_MAX_COMPACTION_TASK_SLOTS :
maxCompactionTaskSlots;
+ this.useAutoScaleSlots = useAutoScaleSlots == null ?
+ DEFAULT_USE_AUTO_SCALE_SLOTS :
+ useAutoScaleSlots;
}
@JsonProperty
@@ -143,20 +152,10 @@ public class CoordinatorCompactionConfig
return maxCompactionTaskSlots;
}
- @Override
- public String toString()
- {
- return "CoordinatorCompactionConfig{" +
- ", compactionConfigs=" + compactionConfigs +
- ", compactionTaskSlotRatio=" + compactionTaskSlotRatio +
- ", maxCompactionTaskSlots=" + maxCompactionTaskSlots +
- '}';
- }
-
- @Override
- public int hashCode()
+ @JsonProperty
+ public boolean isUseAutoScaleSlots()
{
- return Objects.hash(compactionConfigs, compactionTaskSlotRatio,
maxCompactionTaskSlots);
+ return useAutoScaleSlots;
}
@Override
@@ -168,16 +167,27 @@ public class CoordinatorCompactionConfig
if (o == null || getClass() != o.getClass()) {
return false;
}
-
CoordinatorCompactionConfig that = (CoordinatorCompactionConfig) o;
+ return Double.compare(that.compactionTaskSlotRatio,
compactionTaskSlotRatio) == 0 &&
+ maxCompactionTaskSlots == that.maxCompactionTaskSlots &&
+ useAutoScaleSlots == that.useAutoScaleSlots &&
+ Objects.equals(compactionConfigs, that.compactionConfigs);
+ }
- if (!Objects.equals(compactionConfigs, that.compactionConfigs)) {
- return false;
- }
- if (compactionTaskSlotRatio != that.compactionTaskSlotRatio) {
- return false;
- }
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(compactionConfigs, compactionTaskSlotRatio,
maxCompactionTaskSlots, useAutoScaleSlots);
+ }
- return maxCompactionTaskSlots == that.maxCompactionTaskSlots;
+ @Override
+ public String toString()
+ {
+ return "CoordinatorCompactionConfig{" +
+ "compactionConfigs=" + compactionConfigs +
+ ", compactionTaskSlotRatio=" + compactionTaskSlotRatio +
+ ", maxCompactionTaskSlots=" + maxCompactionTaskSlots +
+ ", useAutoScaleSlots=" + useAutoScaleSlots +
+ '}';
}
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index ad6566f..6598a6e 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -174,8 +174,21 @@ public class CompactSegments implements CoordinatorDuty
final CompactionSegmentIterator iterator =
policy.reset(compactionConfigs, dataSources,
intervalsToSkipCompaction);
+ int totalCapacity;
+ if (dynamicConfig.isUseAutoScaleSlots()) {
+ try {
+ totalCapacity =
indexingServiceClient.getTotalWorkerCapacityWithAutoScale();
+ }
+ catch (Exception e) {
+ LOG.warn("Failed to get total worker capacity with auto scale
slots. Falling back to current capacity count");
+ totalCapacity = indexingServiceClient.getTotalWorkerCapacity();
+ }
+ } else {
+ totalCapacity = indexingServiceClient.getTotalWorkerCapacity();
+ }
+
final int compactionTaskCapacity = (int) Math.min(
- indexingServiceClient.getTotalWorkerCapacity() *
dynamicConfig.getCompactionTaskSlotRatio(),
+ totalCapacity * dynamicConfig.getCompactionTaskSlotRatio(),
dynamicConfig.getMaxCompactionTaskSlots()
);
final int numAvailableCompactionTaskSlots;
diff --git
a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java
b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java
index cb17c45..e791b6a 100644
---
a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java
+++
b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java
@@ -93,6 +93,7 @@ public class CoordinatorCompactionConfigsResource
public Response setCompactionTaskLimit(
@QueryParam("ratio") Double compactionTaskSlotRatio,
@QueryParam("max") Integer maxCompactionTaskSlots,
+ @QueryParam("useAutoScaleSlots") Boolean useAutoScaleSlots,
@HeaderParam(AuditManager.X_DRUID_AUTHOR) @DefaultValue("") final String
author,
@HeaderParam(AuditManager.X_DRUID_COMMENT) @DefaultValue("") final
String comment,
@Context HttpServletRequest req
@@ -104,7 +105,8 @@ public class CoordinatorCompactionConfigsResource
final CoordinatorCompactionConfig newCompactionConfig =
CoordinatorCompactionConfig.from(
current,
compactionTaskSlotRatio,
- maxCompactionTaskSlots
+ maxCompactionTaskSlots,
+ useAutoScaleSlots
);
return manager.set(
diff --git
a/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java
b/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java
index 0886e12..445d129 100644
---
a/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java
+++
b/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java
@@ -342,5 +342,36 @@ public class HttpIndexingServiceClientTest
ClientCompactionTaskQuery taskQuery = (ClientCompactionTaskQuery)
captureTask.getValue();
Assert.assertNull(taskQuery.getIoConfig().getInputSpec().getSha256OfSortedSegmentIds());
}
+
+ @Test
+ public void testGetTotalWorkerCapacityWithAutoScale() throws Exception
+ {
+ int currentClusterCapacity = 5;
+ int maximumCapacityWithAutoScale = 10;
+ // Mock response for /druid/indexer/v1/totalWorkerCapacity
+ HttpResponse totalWorkerCapacityResponse =
EasyMock.createMock(HttpResponse.class);
+
EasyMock.expect(totalWorkerCapacityResponse.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
+ EasyMock.expect(totalWorkerCapacityResponse.getContent()).andReturn(new
BigEndianHeapChannelBuffer(0));
+ EasyMock.replay(totalWorkerCapacityResponse);
+ IndexingTotalWorkerCapacityInfo indexingTotalWorkerCapacityInfo = new
IndexingTotalWorkerCapacityInfo(currentClusterCapacity,
maximumCapacityWithAutoScale);
+ StringFullResponseHolder autoScaleResponseHolder = new
StringFullResponseHolder(
+ totalWorkerCapacityResponse,
+ StandardCharsets.UTF_8
+ ).addChunk(jsonMapper.writeValueAsString(indexingTotalWorkerCapacityInfo));
+ EasyMock.expect(druidLeaderClient.go(EasyMock.anyObject(Request.class)))
+ .andReturn(autoScaleResponseHolder)
+ .once();
+ EasyMock.expect(druidLeaderClient.makeRequest(HttpMethod.GET,
"/druid/indexer/v1/totalWorkerCapacity"))
+ .andReturn(new Request(
+ HttpMethod.GET,
+ new
URL("http://localhost:8090/druid/indexer/v1/totalWorkerCapacity")
+ ))
+ .once();
+ EasyMock.replay(druidLeaderClient);
+
+ final int actualResponse =
httpIndexingServiceClient.getTotalWorkerCapacityWithAutoScale();
+ Assert.assertEquals(maximumCapacityWithAutoScale, actualResponse);
+ EasyMock.verify(druidLeaderClient);
+ }
}
diff --git
a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
index 17cbc41..447a32d 100644
---
a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
+++
b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
@@ -69,6 +69,12 @@ public class NoopIndexingServiceClient implements
IndexingServiceClient
}
@Override
+ public int getTotalWorkerCapacityWithAutoScale()
+ {
+ return 0;
+ }
+
+ @Override
public String runTask(String taskId, Object taskObject)
{
return null;
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index df81886..2892d44 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -38,6 +38,7 @@ import
org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec;
import org.apache.druid.client.indexing.ClientTaskQuery;
import org.apache.druid.client.indexing.HttpIndexingServiceClient;
+import org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo;
import org.apache.druid.client.indexing.IndexingWorker;
import org.apache.druid.client.indexing.IndexingWorkerInfo;
import org.apache.druid.client.indexing.TaskPayloadResponse;
@@ -133,6 +134,7 @@ public class CompactSegmentsTest
private static final int TOTAL_BYTE_PER_DATASOURCE = 440;
private static final int TOTAL_SEGMENT_PER_DATASOURCE = 44;
private static final int TOTAL_INTERVAL_PER_DATASOURCE = 11;
+ private static final int MAXIMUM_CAPACITY_WITH_AUTO_SCALE = 10;
@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> constructorFeeder()
@@ -648,6 +650,36 @@ public class CompactSegmentsTest
}
@Test
+ public void
testRunMultipleCompactionTaskSlotsWithUseAutoScaleSlotsOverMaxSlot()
+ {
+ int maxCompactionSlot = 3;
+ Assert.assertTrue(maxCompactionSlot < MAXIMUM_CAPACITY_WITH_AUTO_SCALE);
+ final TestDruidLeaderClient leaderClient = new
TestDruidLeaderClient(JSON_MAPPER);
+ leaderClient.start();
+ final HttpIndexingServiceClient indexingServiceClient = new
HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
+ final CompactSegments compactSegments = new
CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
+ final CoordinatorStats stats = doCompactSegments(compactSegments,
createCompactionConfigs(), maxCompactionSlot, true);
+ Assert.assertEquals(maxCompactionSlot,
stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT));
+ Assert.assertEquals(maxCompactionSlot,
stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT));
+ Assert.assertEquals(maxCompactionSlot,
stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT));
+ }
+
+ @Test
+ public void
testRunMultipleCompactionTaskSlotsWithUseAutoScaleSlotsUnderMaxSlot()
+ {
+ int maxCompactionSlot = 100;
+ Assert.assertFalse(maxCompactionSlot < MAXIMUM_CAPACITY_WITH_AUTO_SCALE);
+ final TestDruidLeaderClient leaderClient = new
TestDruidLeaderClient(JSON_MAPPER);
+ leaderClient.start();
+ final HttpIndexingServiceClient indexingServiceClient = new
HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
+ final CompactSegments compactSegments = new
CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
+ final CoordinatorStats stats = doCompactSegments(compactSegments,
createCompactionConfigs(), maxCompactionSlot, true);
+ Assert.assertEquals(MAXIMUM_CAPACITY_WITH_AUTO_SCALE,
stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT));
+ Assert.assertEquals(MAXIMUM_CAPACITY_WITH_AUTO_SCALE,
stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT));
+ Assert.assertEquals(MAXIMUM_CAPACITY_WITH_AUTO_SCALE,
stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT));
+ }
+
+ @Test
public void testCompactWithoutGranularitySpec()
{
final HttpIndexingServiceClient mockIndexingServiceClient =
Mockito.mock(HttpIndexingServiceClient.class);
@@ -1795,14 +1827,25 @@ public class CompactSegmentsTest
@Nullable Integer numCompactionTaskSlots
)
{
+ return doCompactSegments(compactSegments, compactionConfigs,
numCompactionTaskSlots, false);
+ }
+
+ private CoordinatorStats doCompactSegments(
+ CompactSegments compactSegments,
+ List<DataSourceCompactionConfig> compactionConfigs,
+ @Nullable Integer numCompactionTaskSlots,
+ boolean useAutoScaleSlots
+ )
+ {
DruidCoordinatorRuntimeParams params = CoordinatorRuntimeParamsTestHelpers
.newBuilder()
.withUsedSegmentsTimelinesPerDataSourceInTest(dataSources)
.withCompactionConfig(
new CoordinatorCompactionConfig(
compactionConfigs,
- numCompactionTaskSlots == null ? null : 100., // 100% when
numCompactionTaskSlots is not null
- numCompactionTaskSlots
+ numCompactionTaskSlots == null ? null : 1., // 100% when
numCompactionTaskSlots is not null
+ numCompactionTaskSlots,
+ useAutoScaleSlots
)
)
.build();
@@ -1994,6 +2037,8 @@ public class CompactSegmentsTest
return handleTask(request);
} else if (urlString.contains("/druid/indexer/v1/workers")) {
return handleWorkers();
+ } else if (urlString.contains("/druid/indexer/v1/totalWorkerCapacity")) {
+ return handleTotalWorkerCapacity();
} else if (urlString.contains("/druid/indexer/v1/waitingTasks")
|| urlString.contains("/druid/indexer/v1/pendingTasks")
|| urlString.contains("/druid/indexer/v1/runningTasks")) {
@@ -2035,6 +2080,12 @@ public class CompactSegmentsTest
return
createStringFullResponseHolder(jsonMapper.writeValueAsString(workerInfos));
}
+ private StringFullResponseHolder handleTotalWorkerCapacity() throws
JsonProcessingException
+ {
+ IndexingTotalWorkerCapacityInfo info = new
IndexingTotalWorkerCapacityInfo(5, 10);
+ return
createStringFullResponseHolder(jsonMapper.writeValueAsString(info));
+ }
+
private StringFullResponseHolder handleTask(Request request) throws
IOException
{
final ClientTaskQuery taskQuery =
jsonMapper.readValue(request.getContent().array(), ClientTaskQuery.class);
diff --git
a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
index 4f75bf7..6a38d5b 100644
---
a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
+++
b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
@@ -122,6 +122,7 @@ public class CoordinatorCompactionConfigsResourceTest
Response result =
coordinatorCompactionConfigsResource.setCompactionTaskLimit(
compactionTaskSlotRatio,
maxCompactionTaskSlots,
+ true,
author,
comment,
mockHttpServletRequest
@@ -131,6 +132,7 @@ public class CoordinatorCompactionConfigsResourceTest
Assert.assertEquals(oldConfigCaptor.getValue(), OLD_CONFIG_IN_BYTES);
Assert.assertNotNull(newConfigCaptor.getValue());
Assert.assertEquals(newConfigCaptor.getValue().getMaxCompactionTaskSlots(),
maxCompactionTaskSlots);
+ Assert.assertTrue(newConfigCaptor.getValue().isUseAutoScaleSlots());
Assert.assertEquals(compactionTaskSlotRatio,
newConfigCaptor.getValue().getCompactionTaskSlotRatio(), 0);
}
@@ -279,6 +281,7 @@ public class CoordinatorCompactionConfigsResourceTest
Response result =
coordinatorCompactionConfigsResource.setCompactionTaskLimit(
compactionTaskSlotRatio,
maxCompactionTaskSlots,
+ true,
author,
comment,
mockHttpServletRequest
@@ -287,6 +290,7 @@ public class CoordinatorCompactionConfigsResourceTest
Assert.assertNull(oldConfigCaptor.getValue());
Assert.assertNotNull(newConfigCaptor.getValue());
Assert.assertEquals(newConfigCaptor.getValue().getMaxCompactionTaskSlots(),
maxCompactionTaskSlots);
+ Assert.assertTrue(newConfigCaptor.getValue().isUseAutoScaleSlots());
Assert.assertEquals(compactionTaskSlotRatio,
newConfigCaptor.getValue().getCompactionTaskSlotRatio(), 0);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]