This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 2f269fe065a Returning correct Response Code HTTP 429 when taskQueue
reached maxSize (#15409)
2f269fe065a is described below
commit 2f269fe065a77cbf1e2fb2a1a49775f1875abe65
Author: Sachidananda Maharana <[email protected]>
AuthorDate: Wed Nov 22 14:27:20 2023 +0530
Returning correct Response Code HTTP 429 when taskQueue reached maxSize
(#15409)
Currently when we submit a task to druid and number of currently active
tasks has already reached (druid.indexer.queue.maxSize) then 500 ISE is thrown
as per shown in the screenshot in #15380.
This fix will return HTTP 429 Too Many Requests(with proper error message)
instead of 500 ISE, when we submit a task and queueSize has reached.
---
.../apache/druid/indexing/overlord/TaskQueue.java | 14 +++++++++++-
.../indexing/overlord/http/OverlordResource.java | 13 ++++++++---
.../druid/indexing/overlord/TaskQueueTest.java | 26 ++++++++++++++++++++++
.../SeekableStreamSupervisorStateTest.java | 1 -
4 files changed, 49 insertions(+), 5 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index 69762ba7190..3a06af69d1e 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -31,6 +31,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.annotations.SuppressFBWarnings;
import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.error.DruidException;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.Counters;
@@ -520,7 +521,18 @@ public class TaskQueue
try {
Preconditions.checkState(active, "Queue is not active!");
Preconditions.checkNotNull(task, "task");
- Preconditions.checkState(tasks.size() < config.getMaxSize(), "Too many
tasks (max = %s)", config.getMaxSize());
+ if (tasks.size() >= config.getMaxSize()) {
+ throw DruidException.forPersona(DruidException.Persona.ADMIN)
+ .ofCategory(DruidException.Category.CAPACITY_EXCEEDED)
+ .build(
+ StringUtils.format(
+ "Too many tasks are in the queue (Limit = %d),
" +
+ "(Current active tasks = %d). Retry
later or increase the druid.indexer.queue.maxSize",
+ config.getMaxSize(),
+ tasks.size()
+ )
+ );
+ }
// If this throws with any sort of exception, including
TaskExistsException, we don't want to
// insert the task into our queue. So don't catch it.
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
index f9604492dd4..01af55eadb3 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
@@ -34,7 +34,8 @@ import org.apache.druid.audit.AuditManager;
import org.apache.druid.client.indexing.ClientTaskQuery;
import org.apache.druid.common.config.ConfigManager.SetResult;
import org.apache.druid.common.config.JacksonConfigManager;
-import org.apache.druid.common.exception.DruidException;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.ErrorResponse;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskLocation;
@@ -227,9 +228,15 @@ public class OverlordResource
return Response.ok(ImmutableMap.of("task", task.getId())).build();
}
catch (DruidException e) {
+ return Response
+ .status(e.getStatusCode())
+ .entity(new ErrorResponse(e))
+ .build();
+ }
+ catch (org.apache.druid.common.exception.DruidException e) {
return Response.status(e.getResponseCode())
- .entity(ImmutableMap.of("error", e.getMessage()))
- .build();
+ .entity(ImmutableMap.of("error", e.getMessage()))
+ .build();
}
}
);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
index a175b239e44..a1a93e29cbf 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
@@ -28,6 +28,7 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.druid.common.guava.DSuppliers;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.WorkerNodeService;
+import org.apache.druid.error.DruidException;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
@@ -182,6 +183,31 @@ public class TaskQueueTest extends IngestionTestBase
Assert.assertEquals("Shutdown Task test",
statusOptional.get().getErrorMsg());
}
+ @Test(expected = DruidException.class)
+ public void testTaskErrorWhenExceptionIsThrownDueToQueueSize()
+ {
+ final TaskActionClientFactory actionClientFactory =
createActionClientFactory();
+ final TaskQueue taskQueue = new TaskQueue(
+ new TaskLockConfig(),
+ new TaskQueueConfig(1, null, null, null, null),
+ new DefaultTaskConfig(),
+ getTaskStorage(),
+ new SimpleTaskRunner(actionClientFactory),
+ actionClientFactory,
+ getLockbox(),
+ new NoopServiceEmitter()
+ );
+ taskQueue.setActive(true);
+
+ // Create a Task and add it to the TaskQueue
+ final TestTask task1 = new TestTask("t1", Intervals.of("2021-01/P1M"));
+ final TestTask task2 = new TestTask("t2", Intervals.of("2021-01/P1M"));
+ taskQueue.add(task1);
+
+ // we will get exception here as taskQueue size is 1
druid.indexer.queue.maxSize is already 1
+ taskQueue.add(task2);
+ }
+
@Test
public void testSetUseLineageBasedSegmentAllocationByDefault() throws
EntryExistsException
{
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 7a587bb196e..23ff4038633 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -66,7 +66,6 @@ import
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
import
org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
-import
org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamException;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]