This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f269fe065a   Returning correct Response Code HTTP 429 when taskQueue 
reached maxSize (#15409)
2f269fe065a is described below

commit 2f269fe065a77cbf1e2fb2a1a49775f1875abe65
Author: Sachidananda Maharana <[email protected]>
AuthorDate: Wed Nov 22 14:27:20 2023 +0530

      Returning correct Response Code HTTP 429 when taskQueue reached maxSize 
(#15409)
    
    Currently when we submit a task to druid and number of currently active 
tasks has already reached (druid.indexer.queue.maxSize) then 500 ISE is thrown 
as per shown in the screenshot in #15380.
    
    This fix will return HTTP 429 Too Many Requests(with proper error message) 
instead of 500 ISE, when we submit a task and queueSize has reached.
---
 .../apache/druid/indexing/overlord/TaskQueue.java  | 14 +++++++++++-
 .../indexing/overlord/http/OverlordResource.java   | 13 ++++++++---
 .../druid/indexing/overlord/TaskQueueTest.java     | 26 ++++++++++++++++++++++
 .../SeekableStreamSupervisorStateTest.java         |  1 -
 4 files changed, 49 insertions(+), 5 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index 69762ba7190..3a06af69d1e 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -31,6 +31,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
 import org.apache.druid.annotations.SuppressFBWarnings;
 import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.Counters;
@@ -520,7 +521,18 @@ public class TaskQueue
     try {
       Preconditions.checkState(active, "Queue is not active!");
       Preconditions.checkNotNull(task, "task");
-      Preconditions.checkState(tasks.size() < config.getMaxSize(), "Too many 
tasks (max = %s)", config.getMaxSize());
+      if (tasks.size() >= config.getMaxSize()) {
+        throw DruidException.forPersona(DruidException.Persona.ADMIN)
+                .ofCategory(DruidException.Category.CAPACITY_EXCEEDED)
+                .build(
+                        StringUtils.format(
+                                "Too many tasks are in the queue (Limit = %d), 
" +
+                                        "(Current active tasks = %d). Retry 
later or increase the druid.indexer.queue.maxSize",
+                                config.getMaxSize(),
+                                tasks.size()
+                        )
+                );
+      }
 
       // If this throws with any sort of exception, including 
TaskExistsException, we don't want to
       // insert the task into our queue. So don't catch it.
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
index f9604492dd4..01af55eadb3 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
@@ -34,7 +34,8 @@ import org.apache.druid.audit.AuditManager;
 import org.apache.druid.client.indexing.ClientTaskQuery;
 import org.apache.druid.common.config.ConfigManager.SetResult;
 import org.apache.druid.common.config.JacksonConfigManager;
-import org.apache.druid.common.exception.DruidException;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.ErrorResponse;
 import org.apache.druid.indexer.RunnerTaskState;
 import org.apache.druid.indexer.TaskInfo;
 import org.apache.druid.indexer.TaskLocation;
@@ -227,9 +228,15 @@ public class OverlordResource
             return Response.ok(ImmutableMap.of("task", task.getId())).build();
           }
           catch (DruidException e) {
+            return Response
+                    .status(e.getStatusCode())
+                    .entity(new ErrorResponse(e))
+                    .build();
+          }
+          catch (org.apache.druid.common.exception.DruidException e) {
             return Response.status(e.getResponseCode())
-                           .entity(ImmutableMap.of("error", e.getMessage()))
-                           .build();
+                    .entity(ImmutableMap.of("error", e.getMessage()))
+                    .build();
           }
         }
     );
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
index a175b239e44..a1a93e29cbf 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
@@ -28,6 +28,7 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.druid.common.guava.DSuppliers;
 import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
 import org.apache.druid.discovery.WorkerNodeService;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.indexer.RunnerTaskState;
 import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskState;
@@ -182,6 +183,31 @@ public class TaskQueueTest extends IngestionTestBase
     Assert.assertEquals("Shutdown Task test", 
statusOptional.get().getErrorMsg());
   }
 
+  @Test(expected = DruidException.class)
+  public void testTaskErrorWhenExceptionIsThrownDueToQueueSize()
+  {
+    final TaskActionClientFactory actionClientFactory = 
createActionClientFactory();
+    final TaskQueue taskQueue = new TaskQueue(
+            new TaskLockConfig(),
+            new TaskQueueConfig(1, null, null, null, null),
+            new DefaultTaskConfig(),
+            getTaskStorage(),
+            new SimpleTaskRunner(actionClientFactory),
+            actionClientFactory,
+            getLockbox(),
+            new NoopServiceEmitter()
+    );
+    taskQueue.setActive(true);
+
+    // Create a Task and add it to the TaskQueue
+    final TestTask task1 = new TestTask("t1", Intervals.of("2021-01/P1M"));
+    final TestTask task2 = new TestTask("t2", Intervals.of("2021-01/P1M"));
+    taskQueue.add(task1);
+
+    // we will get exception here as taskQueue size is 1 
druid.indexer.queue.maxSize is already 1
+    taskQueue.add(task2);
+  }
+
   @Test
   public void testSetUseLineageBasedSegmentAllocationByDefault() throws 
EntryExistsException
   {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 7a587bb196e..23ff4038633 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -66,7 +66,6 @@ import 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
 import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
-import 
org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata;
 import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
 import org.apache.druid.indexing.seekablestream.common.StreamException;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to