kfaraz commented on code in PR #14293:
URL: https://github.com/apache/druid/pull/14293#discussion_r1195854805


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -488,58 +426,56 @@ public boolean add(final Task task) throws 
EntryExistsException
     // Set forceTimeChunkLock before adding task spec to taskStorage, so that 
we can see always consistent task spec.
     task.addToContextIfAbsent(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, 
lockConfig.isForceTimeChunkLock());
     defaultTaskConfig.getContext().forEach(task::addToContextIfAbsent);
-    // Every task shuold use the lineage-based segment allocation protocol 
unless it is explicitly set to
+    // Every task should use the lineage-based segment allocation protocol 
unless it is explicitly set to
     // using the legacy protocol.
     task.addToContextIfAbsent(
         
SinglePhaseParallelIndexTaskRunner.CTX_USE_LINEAGE_BASED_SEGMENT_ALLOCATION_KEY,
         
SinglePhaseParallelIndexTaskRunner.DEFAULT_USE_LINEAGE_BASED_SEGMENT_ALLOCATION
     );
 
-    giant.lock();
-
-    try {
-      Preconditions.checkState(active, "Queue is not active!");
-      Preconditions.checkNotNull(task, "task");
-      Preconditions.checkState(tasks.size() < config.getMaxSize(), "Too many 
tasks (max = %,d)", config.getMaxSize());
-
-      // If this throws with any sort of exception, including 
TaskExistsException, we don't want to
-      // insert the task into our queue. So don't catch it.
-      taskStorage.insert(task, TaskStatus.running(task.getId()));
-      addTaskInternal(task);
-      requestManagement();
-      return true;
-    }
-    finally {
-      giant.unlock();
-    }
+    // Do not add the task to queue if insert into metadata fails for any 
reason
+    taskStorage.insert(task, TaskStatus.running(task.getId()));
+    addTaskInternal(task);
+    requestManagement();
+    return true;
   }
 
-  @GuardedBy("giant")
+  /**
+   * Atomically adds this task to the TaskQueue.
+   */
   private void addTaskInternal(final Task task)
   {
-    final Task existingTask = tasks.putIfAbsent(task.getId(), task);
-
-    if (existingTask == null) {
-      taskLockbox.add(task);
-    } else if (!existingTask.equals(task)) {
-      throw new ISE("Cannot add task ID [%s] with same ID as task that has 
already been added", task.getId());
-    }
+    tasks.computeIfAbsent(
+        task.getId(),
+        taskId -> {

Review Comment:
   Yeah, it is intentional. I will add a comment calling out why it must be a 
ConcurrentHashMap.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to