github-advanced-security[bot] commented on code in PR #18585:
URL: https://github.com/apache/druid/pull/18585#discussion_r2502071390


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerV2.java:
##########
@@ -0,0 +1,1925 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.hrtr;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Supplier;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableScheduledFuture;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.druid.concurrent.LifecycleLock;
+import org.apache.druid.discovery.DiscoveryDruidNode;
+import org.apache.druid.discovery.DruidNodeDiscovery;
+import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
+import org.apache.druid.discovery.WorkerNodeService;
+import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.task.IndexTaskUtils;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
+import org.apache.druid.indexing.overlord.RemoteTaskRunnerWorkItem;
+import org.apache.druid.indexing.overlord.TaskRunnerListener;
+import org.apache.druid.indexing.overlord.TaskRunnerUtils;
+import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
+import org.apache.druid.indexing.overlord.TaskStorage;
+import org.apache.druid.indexing.overlord.WorkerTaskRunner;
+import org.apache.druid.indexing.overlord.autoscaling.ProvisioningService;
+import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
+import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
+import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig;
+import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
+import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
+import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
+import org.apache.druid.indexing.overlord.setup.WorkerSelectStrategy;
+import org.apache.druid.indexing.worker.TaskAnnouncement;
+import org.apache.druid.indexing.worker.Worker;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.http.client.Request;
+import 
org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.tasklogs.TaskLogStreamer;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+/**
+ * TODO: add a more descriptive title
+ */
+public class HttpRemoteTaskRunnerV2 implements WorkerTaskRunner, 
TaskLogStreamer, WorkerHolder.Listener
+{
+  private static final EmittingLogger log = new 
EmittingLogger(HttpRemoteTaskRunnerV2.class);
+
+  public static final String TASK_DISCOVERED_COUNT = "task/discovered/count";
+
+  private final LifecycleLock lifecycleLock = new LifecycleLock();
+
+  // Executor for assigning pending tasks to workers.
+  private final ExecutorService pendingTasksExec;
+
+  // All known tasks, TaskID -> HttpRemoteTaskRunnerWorkItem
+  // This is a ConcurrentMap as some of the reads are done without holding the 
lock.
+  private final ConcurrentHashMap<String, HttpRemoteTaskRunnerWorkItem> tasks 
= new ConcurrentHashMap<>();
+
+  private final PriorityBlockingQueue<PendingTaskQueueItem> pendingTasks = new 
PriorityBlockingQueue<>();
+
+  // All discovered workers, "host:port" -> WorkerHolder
+  private final ConcurrentHashMap<String, WorkerHolder> workers = new 
ConcurrentHashMap<>();
+
+  // Executor for syncing state of each worker.
+  private final ScheduledExecutorService workersSyncExec;
+
+  // Internal worker state counters
+  private final AtomicLong blackListedWorkersCount = new AtomicLong(0);
+
+  // Executor to complete cleanup of workers which have disappeared.
+  private final ListeningScheduledExecutorService cleanupExec;
+  private final ConcurrentMap<String, ScheduledFuture> removedWorkerCleanups = 
new ConcurrentHashMap<>();
+
+  private final ReadWriteLock workerStateLock = new ReentrantReadWriteLock();
+  private final ReadWriteLock taskStateLock = new ReentrantReadWriteLock();
+
+  // task runner listeners
+  private final CopyOnWriteArrayList<Pair<TaskRunnerListener, Executor>> 
listeners = new CopyOnWriteArrayList<>();
+
+  private final ProvisioningStrategy<WorkerTaskRunner> provisioningStrategy;
+  private ProvisioningService provisioningService;
+
+  private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider;
+  private final HttpClient httpClient;
+  private final ObjectMapper smileMapper;
+
+  private final Supplier<WorkerBehaviorConfig> workerConfigRef;
+  private final HttpRemoteTaskRunnerConfig config;
+
+  private final TaskStorage taskStorage;
+  private final ServiceEmitter emitter;
+
+  private volatile DruidNodeDiscovery.Listener nodeDiscoveryListener;
+
+  public HttpRemoteTaskRunnerV2(
+      ObjectMapper smileMapper,
+      HttpRemoteTaskRunnerConfig config,
+      HttpClient httpClient,
+      Supplier<WorkerBehaviorConfig> workerConfigRef,
+      ProvisioningStrategy<WorkerTaskRunner> provisioningStrategy,
+      DruidNodeDiscoveryProvider druidNodeDiscoveryProvider,
+      TaskStorage taskStorage,
+      ServiceEmitter emitter
+  )
+  {
+    this.smileMapper = smileMapper;
+    this.config = config;
+    this.httpClient = httpClient;
+    this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider;
+    this.taskStorage = taskStorage;
+    this.workerConfigRef = workerConfigRef;
+    this.emitter = emitter;
+
+    this.pendingTasksExec = Execs.multiThreaded(
+        config.getPendingTasksRunnerNumThreads(),
+        "hrtr-pending-tasks-runner-%d"
+    );
+
+    this.workersSyncExec = ScheduledExecutors.fixed(
+        config.getWorkerSyncNumThreads(),
+        "HttpRemoteTaskRunner-worker-sync-%d"
+    );
+
+    this.cleanupExec = MoreExecutors.listeningDecorator(
+        ScheduledExecutors.fixed(1, "HttpRemoteTaskRunner-Worker-Cleanup-%d")
+    );
+
+    this.provisioningStrategy = provisioningStrategy;
+  }
+
+  @Override
+  @LifecycleStart
+  public void start()
+  {
+    if (!lifecycleLock.canStart()) {
+      return;
+    }
+
+    try {
+      log.info("Starting...");
+
+      startWorkersHandling();
+
+      ScheduledExecutors.scheduleAtFixedRate(
+          cleanupExec,
+          Period.ZERO.toStandardDuration(),
+          config.getWorkerBlackListCleanupPeriod().toStandardDuration(),
+          this::checkAndRemoveWorkersFromBlackList
+      );
+
+      provisioningService = provisioningStrategy.makeProvisioningService(this);
+
+      scheduleSyncMonitoring();
+
+      startPendingTaskHandling();
+
+      lifecycleLock.started();
+
+      log.info("Started.");
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    finally {
+      lifecycleLock.exitStart();
+    }
+  }
+
+  /**
+   * Must not be used outside of this class and {@link 
HttpRemoteTaskRunnerResource}
+   */
+  Map<String, ImmutableWorkerInfo> getWorkersEligibleToRunTasks()
+  {
+    return Maps.transformEntries(
+        Maps.filterEntries(
+            workers,
+            input -> input.getValue().getState() == WorkerHolder.State.READY &&
+                     input.getValue().isInitialized() &&
+                     input.getValue().isEnabled()
+        ),
+        (String key, WorkerHolder value) -> value.toImmutable()
+    );
+  }
+
+  @GuardedBy("workerStateLock")
+  private ImmutableWorkerInfo findWorkerToRunTask(Task task)
+  {
+    WorkerBehaviorConfig workerConfig = workerConfigRef.get();
+    WorkerSelectStrategy strategy;
+    if (workerConfig == null || workerConfig.getSelectStrategy() == null) {
+      strategy = WorkerBehaviorConfig.DEFAULT_STRATEGY;
+      log.debug("No worker selection strategy set. Using default of [%s]", 
strategy.getClass().getSimpleName());
+    } else {
+      strategy = workerConfig.getSelectStrategy();
+    }
+
+    return strategy.findWorkerForTask(
+        config,
+        ImmutableMap.copyOf(getWorkersEligibleToRunTasks()),
+        task
+    );
+  }
+
+  private boolean runTaskOnWorker(
+      final String taskId,
+      final String workerHost
+  ) throws InterruptedException
+  {
+    final HttpRemoteTaskRunnerWorkItem workItem = tasks.get(taskId);
+    final WorkerHolder workerHolder = workers.get(workerHost);
+
+    Preconditions.checkState(workItem != null, "No task item found for 
task[%s]", taskId);
+    Preconditions.checkState(workerHolder != null, "No worker found for 
host[%s]", workerHost);
+    Preconditions.checkState(
+        workerHolder.getState() == WorkerHolder.State.PENDING_ASSIGN,
+        "Found invalid state[%s] for worker[%s], expected state[%s]",
+        workerHolder.getState(),
+        workerHost,
+        WorkerHolder.State.PENDING_ASSIGN
+    );
+
+    log.info("Assigning task[%s] to worker[%s]", taskId, workerHost);
+
+    if (workerHolder.assignTask(workItem.getTask())) {
+      // Don't assign new tasks until the task we just assigned is actually 
running
+      // on a worker - this avoids overflowing a worker with tasks
+      long waitMs = 
config.getTaskAssignmentTimeout().toStandardDuration().getMillis();
+      long waitStart = System.currentTimeMillis();
+      boolean isTaskAssignmentTimedOut = false;
+
+      final AtomicBoolean taskStartedOnWorker = new AtomicBoolean(false);
+      synchronized (taskStateLock) {
+        while (!taskStartedOnWorker.get()) {
+          tasks.compute(
+              taskId,
+              (key, taskEntry) -> {
+                if (taskEntry != null && 
taskEntry.isRunningOnWorker(workerHolder.getWorker())) {
+                  taskStartedOnWorker.set(true);
+                }
+                return taskEntry;
+              }
+          );
+
+          long remaining = waitMs - (System.currentTimeMillis() - waitStart);
+          if (remaining > 0) {
+            taskStateLock.wait(remaining);
+          } else {
+            isTaskAssignmentTimedOut = true;
+            break;
+          }
+        }
+      }
+
+      if (isTaskAssignmentTimedOut) {
+        log.makeAlert(
+            "Task assignment timed out on worker[%s], never ran task[%s] in 
timeout[%s]!",
+            workerHost,
+            taskId,
+            config.getTaskAssignmentTimeout()
+        ).emit();
+        // taskComplete(..) must be called outside of statusLock, see comments 
on method.
+        taskComplete(
+            taskId,
+            workerHost,
+            TaskStatus.failure(
+                taskId,
+                StringUtils.format(
+                    "The worker that this task is assigned did not start it in 
timeout[%s]. "
+                    + "See overlord and middleManager/indexer logs for more 
details.",
+                    config.getTaskAssignmentTimeout()
+                )
+            )
+        );
+      }
+
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  // CAUTION: This method calls RemoteTaskRunnerWorkItem.setResult(..) which 
results in TaskQueue.notifyStatus() being called
+  // because that is attached by TaskQueue to task result future. So, this 
method must not be called with "statusLock"
+  // held. See https://github.com/apache/druid/issues/6201
+  private void taskComplete(
+      String taskId,
+      String workerHost,
+      TaskStatus taskStatus
+  )
+  {
+    Preconditions.checkState(!Thread.holdsLock(workerStateLock), "Current 
thread must not hold workerStateLock.");
+    Preconditions.checkState(!Thread.holdsLock(taskStateLock), "Current thread 
must not hold taskStateLock.");
+
+    AtomicBoolean taskCompleted = new AtomicBoolean(false);
+
+    tasks.compute(
+        taskId,
+        (key, taskEntry) -> {
+          Preconditions.checkState(taskEntry != null, "Expected task[%s] to 
exist", taskId);
+          if (taskEntry.getResult().isDone()) {
+            // This is not the first complete event.
+            try {
+              TaskState lastKnownState = 
taskEntry.getResult().get().getStatusCode();
+              if (taskStatus.getStatusCode() != lastKnownState) {
+                log.warn(
+                    "The state of the new task complete event is different 
from its last known state. "
+                    + "New state[%s], last known state[%s]",
+                    taskStatus.getStatusCode(),
+                    lastKnownState
+                );
+              }
+            }
+            catch (InterruptedException e) {
+              log.warn(e, "Interrupted while getting the last known task 
status.");
+              Thread.currentThread().interrupt();
+            }
+            catch (ExecutionException e) {
+              // This case should not really happen.
+              log.warn(e, "Failed to get the last known task status. Ignoring 
this failure.");
+            }
+          } else {
+            // Notify interested parties
+            taskEntry.setResult(taskStatus);
+            taskCompleted.set(true);
+          }
+
+          return taskEntry;
+        }
+    );
+
+    if (workerHost != null) {
+      workers.compute(
+          workerHost,
+          (key, workerHolder) -> {
+            if (workerHolder != null) {
+              log.info(
+                  "Worker[%s] completed task[%s] with status[%s]",
+                  workerHolder.getWorker().getHost(),
+                  taskStatus.getId(),
+                  taskStatus.getStatusCode()
+              );
+              // Worker is done with this task
+              workerHolder.setLastCompletedTaskTime(DateTimes.nowUtc());
+              blacklistWorkerIfNeeded(taskStatus, workerHolder);
+            } else {
+              log.warn("Could not find worker for host[%s]", workerHost);
+            }
+            return workerHolder;
+          }
+      );
+    }
+
+    // Notify listeners outside both tasks/workers critical sections to avoid 
deadlock
+    if (taskCompleted.get()) {
+      TaskRunnerUtils.notifyStatusChanged(listeners, taskStatus.getId(), 
taskStatus);
+    }
+
+    // Notify interested parties that a worker is potentially free and/or a 
task status updated
+    notifyWatchers();
+  }
+
+  private void startWorkersHandling() throws InterruptedException
+  {
+    final CountDownLatch workerViewInitialized = new CountDownLatch(1);
+    DruidNodeDiscovery druidNodeDiscovery =
+        
druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY);
+    this.nodeDiscoveryListener = new DruidNodeDiscovery.Listener()
+    {
+      @Override
+      public void nodesAdded(Collection<DiscoveryDruidNode> nodes)
+      {
+        nodes.forEach(node -> addWorker(toWorker(node)));
+      }
+
+      @Override
+      public void nodesRemoved(Collection<DiscoveryDruidNode> nodes)
+      {
+        nodes.forEach(node -> removeWorker(toWorker(node)));
+      }
+
+      @Override
+      public void nodeViewInitialized()
+      {
+        //CountDownLatch.countDown() does nothing when count has already 
reached 0.
+        workerViewInitialized.countDown();
+      }
+
+      @Override
+      public void nodeViewInitializedTimedOut()
+      {
+        nodeViewInitialized();
+      }
+    };
+
+    druidNodeDiscovery.registerListener(nodeDiscoveryListener);
+
+    long workerDiscoveryStartTime = System.currentTimeMillis();
+    while (!workerViewInitialized.await(30, TimeUnit.SECONDS)) {
+      if (System.currentTimeMillis() - workerDiscoveryStartTime > 
TimeUnit.MINUTES.toMillis(5)) {
+        throw new ISE("Couldn't discover workers.");
+      } else {
+        log.info("Waiting for worker discovery...");
+      }
+    }
+    log.info("Discovered [%d] workers.", workers.size());
+
+    // Wait till all worker state is synced so that we know which worker is 
running/completed what tasks or else
+    // We would start assigning tasks which are pretty soon going to be 
reported by discovered workers.
+    workers.forEach((workerHost, workerEntry) -> {
+      try {
+        workerEntry.waitForInitialization();
+      }
+      catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    });
+    log.info("Workers have synced state successfully.");
+  }
+
+  private Worker toWorker(DiscoveryDruidNode node)
+  {
+    final WorkerNodeService workerNodeService = node.getService(
+        WorkerNodeService.DISCOVERY_SERVICE_KEY,
+        WorkerNodeService.class
+    );
+    if (workerNodeService == null) {
+      // this shouldn't typically happen, but just in case it does, make a 
dummy worker to allow the callbacks to
+      // continue since addWorker/removeWorker only need worker.getHost()
+      return new Worker(
+          node.getDruidNode().getServiceScheme(),
+          node.getDruidNode().getHostAndPortToUse(),
+          null,
+          0,
+          "",
+          WorkerConfig.DEFAULT_CATEGORY
+      );
+    }
+    return new Worker(
+        node.getDruidNode().getServiceScheme(),
+        node.getDruidNode().getHostAndPortToUse(),
+        workerNodeService.getIp(),
+        workerNodeService.getCapacity(),
+        workerNodeService.getVersion(),
+        workerNodeService.getCategory()
+    );
+  }
+
+  @VisibleForTesting
+  void addWorker(final Worker worker)
+  {
+    log.info("Adding worker[%s]", worker.getHost());
+    synchronized (workerStateLock) {
+      workers.compute(
+          worker.getHost(), (key, workerEntry) -> {
+            cancelWorkerCleanup(worker.getHost());
+
+            // There cannot be any new tasks assigned to this worker as the 
entry has not been published yet.
+            // That being said, there can be callbacks in taskAddedOrUpdated() 
where some task suddenly begins running
+            // on this worker. That method still blocks on this key lock, so 
it will occur strictly before/after this insertion.
+            if (workerEntry == null) {
+              log.info("Unrecognized worker[%s], rebuilding task mapping", 
worker.getHost());
+              final List<TaskAnnouncement> expectedAnnouncements = new 
ArrayList<>();
+              // It might be a worker that existed before, temporarily went 
away and came back. We might have a set of
+              // tasks that we think are running on this worker. Provide that 
information to WorkerHolder that
+              // manages the task syncing with that worker.
+              tasks.forEach((taskId, taskEntry) -> {
+                if (taskEntry.isRunningOnWorker(worker)) {
+                  // This announcement is only used to notify when a task has 
disappeared on the worker
+                  // So it is okay to set the dataSource and taskResource to 
null as they will not be used
+                  expectedAnnouncements.add(
+                      TaskAnnouncement.create(
+                          taskEntry.getTaskId(),
+                          taskEntry.getTaskType(),
+                          null,
+                          TaskStatus.running(taskEntry.getTaskId()),
+                          taskEntry.getLocation(),
+                          null
+                      )
+                  );
+                }
+              });
+
+              workerEntry = createWorkerHolder(
+                  smileMapper,
+                  httpClient,
+                  config,
+                  workersSyncExec,
+                  this,
+                  worker,
+                  expectedAnnouncements
+              );
+              workerEntry.start();
+            } else {
+              log.info("Worker[%s] already exists", worker.getHost());
+            }
+            return workerEntry;
+          }
+      );
+
+      // Notify any waiters that there is a new worker available
+      workerStateLock.notifyAll();
+    }
+  }
+
+  protected WorkerHolder createWorkerHolder(
+      ObjectMapper smileMapper,
+      HttpClient httpClient,
+      HttpRemoteTaskRunnerConfig config,
+      ScheduledExecutorService workersSyncExec,
+      WorkerHolder.Listener listener,
+      Worker worker,
+      List<TaskAnnouncement> knownAnnouncements
+  )
+  {
+    return new WorkerHolder(smileMapper, httpClient, config, workersSyncExec, 
listener, worker, knownAnnouncements);
+  }
+
+  private void removeWorker(final Worker worker)
+  {
+    // Acquire workerLock to ensure atomicity between worker removal and 
competing scheduling routines
+    final WorkerHolder workerEntry;
+    synchronized (workerStateLock) {
+      workerEntry = workers.remove(worker.getHost());
+    }
+
+    // Perform the cleanup operations outside the lock to avoid excessive 
locking/deadlock
+    if (workerEntry != null) {
+      log.info("Removing worker[%s]", worker.getHost());
+      try {
+        workerEntry.stop();
+        scheduleTasksCleanupForWorker(worker.getHost());
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    } else {
+      log.warn("Asked to remove a non-existent worker[%s]", worker.getHost());
+    }
+  }
+
+  private void cancelWorkerCleanup(String workerHost)
+  {
+    ScheduledFuture previousCleanup = removedWorkerCleanups.remove(workerHost);
+    if (previousCleanup != null) {
+      log.info("Cancelling worker[%s] scheduled task cleanup", workerHost);
+      previousCleanup.cancel(false);
+    }
+  }
+
+  private void scheduleTasksCleanupForWorker(final String workerHostAndPort)
+  {
+    cancelWorkerCleanup(workerHostAndPort);
+
+    final ListenableScheduledFuture<?> cleanupTask = cleanupExec.schedule(
+        () -> {
+          log.info("Running scheduled cleanup for worker[%s]", 
workerHostAndPort);
+          try {
+            final Set<HttpRemoteTaskRunnerWorkItem> tasksToFail = new 
HashSet<>();
+            tasks.forEach((taskId, taskEntry) -> {
+              if (taskEntry.getState().inProgress()) {
+                if (taskEntry.getWorker() != null && 
taskEntry.getWorker().getHost().equals(workerHostAndPort)) {
+                  tasksToFail.add(taskEntry);
+                }
+              }
+            });
+
+            for (HttpRemoteTaskRunnerWorkItem taskItem : tasksToFail) {
+              if (!taskItem.getResult().isDone()) {
+                log.warn(
+                    "Failing task[%s] because worker[%s] disappeared and did 
not report within cleanup timeout[%s]",
+                    taskItem.getTaskId(),
+                    workerHostAndPort,
+                    config.getTaskCleanupTimeout()
+                );
+                // taskComplete(..) must be called outside of statusLock, see 
comments on method.
+                taskComplete(
+                    taskItem.getTaskId(),
+                    null,
+                    TaskStatus.failure(
+                        taskItem.getTaskId(),
+                        StringUtils.format(
+                            "The worker that this task was assigned 
disappeared and "
+                            + "did not report cleanup within timeout[%s]. "
+                            + "See overlord and middleManager/indexer logs for 
more details.",
+                            config.getTaskCleanupTimeout()
+                        )
+                    )
+                );
+              }
+            }
+          }
+          catch (Exception e) {
+            log.makeAlert(e, "Exception while cleaning up worker[%s]", 
workerHostAndPort).emit();
+            throw new RuntimeException(e);
+          }
+        },
+        config.getTaskCleanupTimeout().toStandardDuration().getMillis(),
+        TimeUnit.MILLISECONDS
+    );
+
+    removedWorkerCleanups.put(workerHostAndPort, cleanupTask);
+
+    // Remove this entry from removedWorkerCleanups when done, if it's 
actually the one in there.
+    Futures.addCallback(
+        cleanupTask,
+        new FutureCallback<Object>()
+        {
+          @Override
+          public void onSuccess(Object result)
+          {
+            removedWorkerCleanups.remove(workerHostAndPort, cleanupTask);
+          }
+
+          @Override
+          public void onFailure(Throwable t)
+          {
+            removedWorkerCleanups.remove(workerHostAndPort, cleanupTask);
+          }
+        },
+        MoreExecutors.directExecutor()
+    );
+  }
+
+  private void scheduleSyncMonitoring()
+  {
+    workersSyncExec.scheduleAtFixedRate(
+        () -> {
+          log.debug("Running worker sync monitoring");
+
+          try {
+            syncMonitoring();
+          }
+          catch (Exception ex) {
+            log.makeAlert(ex, "Exception in worker sync monitoring").emit();
+          }
+        },
+        1,
+        5,
+        TimeUnit.MINUTES
+    );
+  }
+
+  @VisibleForTesting
+  void syncMonitoring()
+  {
+    // Ensure that the collection is not being modified during iteration. 
Iterate over a copy
+    final Set<Map.Entry<String, WorkerHolder>> workerEntrySet = 
ImmutableSet.copyOf(workers.entrySet());
+    for (Map.Entry<String, WorkerHolder> e : workerEntrySet) {
+      WorkerHolder workerHolder = e.getValue();
+      if (workerHolder.getUnderlyingSyncer().needsReset()) {
+        // TODO: do we want to make this atomic (e.g. acquire workerLock 
before)
+        if (workers.containsKey(e.getKey())) {
+          log.makeAlert(
+              "Worker[%s] is not syncing properly. Current state is [%s]. 
Resetting it.",
+              workerHolder.getWorker().getHost(),
+              workerHolder.getUnderlyingSyncer().getDebugInfo()
+          ).emit();
+          removeWorker(workerHolder.getWorker());
+          addWorker(workerHolder.getWorker());
+        }
+      }
+    }
+  }
+
+  /**
+   * This method returns the debugging information exposed by {@link 
HttpRemoteTaskRunnerResource} and meant
+   * for that use only. It must not be used for any other purpose.
+   */
+  Map<String, Object> getWorkerSyncerDebugInfo()
+  {
+    Preconditions.checkArgument(lifecycleLock.awaitStarted(1, 
TimeUnit.MILLISECONDS));
+
+    Map<String, Object> result = 
Maps.newHashMapWithExpectedSize(workers.size());
+    for (Map.Entry<String, WorkerHolder> e : 
ImmutableSet.copyOf(workers.entrySet())) {
+      WorkerHolder serverHolder = e.getValue();
+      result.put(
+          e.getKey(),
+          serverHolder.getUnderlyingSyncer().getDebugInfo()
+      );
+    }
+    return result;
+  }
+
+  private void checkAndRemoveWorkersFromBlackList()
+  {
+    final AtomicBoolean shouldRunPendingTasks = new AtomicBoolean(false);
+
+    synchronized (workerStateLock) {
+      for (final String workerHost : workers.keySet()) {
+        workers.computeIfPresent(
+            workerHost,
+            (workerHostKey, workerEntry) -> {
+              if (workerEntry.getState() == WorkerHolder.State.BLACKLISTED) {
+                if (shouldRemoveNodeFromBlackList(workerEntry)) {
+                  log.debug("Removing worker[%s] from blacklist", workerHost);
+                  workerEntry.resetContinuouslyFailedTasksCount();
+                  workerEntry.setBlacklistedUntil(null);
+                  workerEntry.setState(WorkerHolder.State.READY);
+                  shouldRunPendingTasks.set(true);
+                } else {
+                  log.debug("Skipping removal of worker[%s] from blacklist", 
workerHost);
+                }
+              }
+              return workerEntry;
+            }
+        );
+      }
+
+      if (shouldRunPendingTasks.get()) {
+        workerStateLock.notifyAll();
+      }
+    }
+  }
+
+  /**
+   * This method should be called under the corresponding worker key lock.
+   */
+  private boolean shouldRemoveNodeFromBlackList(WorkerHolder workerHolder)
+  {
+    if (blackListedWorkersCount.get() > workers.size() * 
(config.getMaxPercentageBlacklistWorkers() / 100.0)) {
+      log.info(
+          "Removing [%s] from blacklist because percentage of blacklisted 
workers exceeds [%d]",
+          workerHolder.getWorker(),
+          config.getMaxPercentageBlacklistWorkers()
+      );
+
+      return true;
+    }
+
+    long remainingMillis = workerHolder.getBlacklistedUntil().getMillis() - 
System.currentTimeMillis();
+    if (remainingMillis <= 0) {
+      log.info("Removing [%s] from blacklist because backoff time elapsed", 
workerHolder.getWorker());
+      return true;
+    }
+
+    log.info("[%s] still blacklisted for [%,ds]", workerHolder.getWorker(), 
remainingMillis / 1000);
+    return false;
+  }
+
+  /**
+   * This method should be called under the corresponding worker key lock.
+   */
+  private void blacklistWorkerIfNeeded(TaskStatus taskStatus, WorkerHolder 
workerHolder)
+  {
+    if (taskStatus.isSuccess()) {
+      workerHolder.resetContinuouslyFailedTasksCount();
+      workerHolder.setBlacklistedUntil(null);
+      blackListedWorkersCount.decrementAndGet();
+      log.info("[%s] removed from blacklist because a task finished with 
SUCCESS", workerHolder.getWorker());
+    } else if (taskStatus.isFailure()) {
+      workerHolder.incrementContinuouslyFailedTasksCount();
+    }
+
+    if (workerHolder.getContinuouslyFailedTasksCount() > 
config.getMaxRetriesBeforeBlacklist() &&
+        blackListedWorkersCount.get() <= workers.size() * 
(config.getMaxPercentageBlacklistWorkers() / 100.0) - 1) {
+      
workerHolder.setBlacklistedUntil(DateTimes.nowUtc().plus(config.getWorkerBlackListBackoffTime()));
+      blackListedWorkersCount.incrementAndGet();
+      log.info(
+          "Blacklisting [%s] until [%s] after [%,d] failed tasks in a row.",
+          workerHolder.getWorker(),
+          workerHolder.getBlacklistedUntil(),
+          workerHolder.getContinuouslyFailedTasksCount()
+      );
+    }
+  }
+
+  @Override
+  public Collection<ImmutableWorkerInfo> getWorkers()
+  {
+    return 
workers.values().stream().map(WorkerHolder::toImmutable).collect(Collectors.toList());
+  }
+
+  @VisibleForTesting
+  ConcurrentMap<String, WorkerHolder> getWorkersForTestingReadOnly()

Review Comment:
   ## Exposing internal representation
   
   getWorkersForTestingReadOnly exposes the internal representation stored in 
field workers. The value may be modified [after this call to 
getWorkersForTestingReadOnly](1).
   getWorkersForTestingReadOnly exposes the internal representation stored in 
field workers. The value may be modified [after this call to 
getWorkersForTestingReadOnly](2).
   getWorkersForTestingReadOnly exposes the internal representation stored in 
field workers. The value may be modified [after this call to 
getWorkersForTestingReadOnly](3).
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/10536)



##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerV2Test.java:
##########
@@ -0,0 +1,2287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.hrtr;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.Futures;
+import org.apache.druid.common.guava.DSuppliers;
+import org.apache.druid.concurrent.LifecycleLock;
+import org.apache.druid.discovery.DiscoveryDruidNode;
+import org.apache.druid.discovery.DruidNodeDiscovery;
+import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
+import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.discovery.WorkerNodeService;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
+import org.apache.druid.indexing.overlord.TaskRunnerListener;
+import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
+import org.apache.druid.indexing.overlord.TaskStorage;
+import org.apache.druid.indexing.overlord.TestProvisioningStrategy;
+import org.apache.druid.indexing.overlord.autoscaling.NoopProvisioningStrategy;
+import org.apache.druid.indexing.overlord.autoscaling.ProvisioningService;
+import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
+import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig;
+import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
+import 
org.apache.druid.indexing.overlord.setup.EqualDistributionWorkerSelectStrategy;
+import org.apache.druid.indexing.worker.TaskAnnouncement;
+import org.apache.druid.indexing.worker.Worker;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.coordination.ChangeRequestHttpSyncer;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
+import org.easymock.EasyMock;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.joda.time.Period;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import static org.easymock.EasyMock.isA;
+
+/**
+ *
+ */
+public class HttpRemoteTaskRunnerV2Test
+{
+  @Before
+  public void setup()
+  {
+    EmittingLogger.registerEmitter(new NoopServiceEmitter());
+  }
+
+  /*
+  Simulates startup of Overlord and Workers being discovered with no 
previously known tasks. Fresh tasks are given
+  and expected to be completed.
+   */
+  @Test(timeout = 60_000L)
+  public void testFreshStart() throws Exception
+  {
+    TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
+    DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = 
EasyMock.createMock(DruidNodeDiscoveryProvider.class);
+    
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
+            .andReturn(druidNodeDiscovery);
+    EasyMock.replay(druidNodeDiscoveryProvider);
+
+    HttpRemoteTaskRunnerV2 taskRunner = newHttpTaskRunnerInstance(
+        druidNodeDiscoveryProvider,
+        new NoopProvisioningStrategy<>()
+    );
+
+    taskRunner.start();
+
+    DiscoveryDruidNode druidNode1 = new DiscoveryDruidNode(
+        new DruidNode("service", "host1", false, 8080, null, true, false),
+        NodeRole.MIDDLE_MANAGER,
+        ImmutableMap.of(
+            WorkerNodeService.DISCOVERY_SERVICE_KEY, new 
WorkerNodeService("ip1", 2, "0", WorkerConfig.DEFAULT_CATEGORY)
+        )
+    );
+
+    DiscoveryDruidNode druidNode2 = new DiscoveryDruidNode(
+        new DruidNode("service", "host2", false, 8080, null, true, false),
+        NodeRole.MIDDLE_MANAGER,
+        ImmutableMap.of(
+            WorkerNodeService.DISCOVERY_SERVICE_KEY, new 
WorkerNodeService("ip2", 2, "0", WorkerConfig.DEFAULT_CATEGORY)
+        )
+    );
+
+    
druidNodeDiscovery.getListeners().get(0).nodesAdded(ImmutableList.of(druidNode1,
 druidNode2));
+
+    int numTasks = 8;
+    List<Future<TaskStatus>> futures = new ArrayList<>();
+    for (int i = 0; i < numTasks; i++) {
+      futures.add(taskRunner.run(NoopTask.create()));
+    }
+
+    for (Future<TaskStatus> future : futures) {
+      Assert.assertTrue(future.get().isSuccess());
+    }
+
+    Assert.assertEquals(numTasks, taskRunner.getKnownTasks().size());
+    Assert.assertEquals(numTasks, taskRunner.getCompletedTasks().size());
+    Assert.assertEquals(4, taskRunner.getTotalCapacity());
+    Assert.assertEquals(-1, taskRunner.getMaximumCapacityWithAutoscale());
+    Assert.assertEquals(0, taskRunner.getUsedCapacity());
+  }
+
+  @Test(timeout = 60_000L)
+  public void testFreshStart_nodeDiscoveryTimedOut() throws Exception
+  {
+    TestDruidNodeDiscovery druidNodeDiscovery = new 
TestDruidNodeDiscovery(true);
+    DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = 
EasyMock.createMock(DruidNodeDiscoveryProvider.class);
+    
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
+            .andReturn(druidNodeDiscovery);
+    EasyMock.replay(druidNodeDiscoveryProvider);
+
+    HttpRemoteTaskRunnerV2 taskRunner = newHttpTaskRunnerInstance(
+        druidNodeDiscoveryProvider,
+        new NoopProvisioningStrategy<>()
+    );
+
+    taskRunner.start();
+
+    DiscoveryDruidNode druidNode1 = new DiscoveryDruidNode(
+        new DruidNode("service", "host1", false, 8080, null, true, false),
+        NodeRole.MIDDLE_MANAGER,
+        ImmutableMap.of(
+            WorkerNodeService.DISCOVERY_SERVICE_KEY, new 
WorkerNodeService("ip1", 2, "0", WorkerConfig.DEFAULT_CATEGORY)
+        )
+    );
+
+    DiscoveryDruidNode druidNode2 = new DiscoveryDruidNode(
+        new DruidNode("service", "host2", false, 8080, null, true, false),
+        NodeRole.MIDDLE_MANAGER,
+        ImmutableMap.of(
+            WorkerNodeService.DISCOVERY_SERVICE_KEY, new 
WorkerNodeService("ip2", 2, "0", WorkerConfig.DEFAULT_CATEGORY)
+        )
+    );
+
+    
druidNodeDiscovery.getListeners().get(0).nodesAdded(ImmutableList.of(druidNode1,
 druidNode2));
+
+    int numTasks = 8;
+    List<Future<TaskStatus>> futures = new ArrayList<>();
+    for (int i = 0; i < numTasks; i++) {
+      futures.add(taskRunner.run(NoopTask.create()));
+    }
+
+    for (Future<TaskStatus> future : futures) {
+      Assert.assertTrue(future.get().isSuccess());
+    }
+
+    Assert.assertEquals(numTasks, taskRunner.getKnownTasks().size());
+    Assert.assertEquals(numTasks, taskRunner.getCompletedTasks().size());
+    Assert.assertEquals(4, taskRunner.getTotalCapacity());
+    Assert.assertEquals(0, taskRunner.getUsedCapacity());
+  }
+
+  /*
+  Simulates startup of Overlord. Overlord is then stopped and is expected to 
close down certain things.
+   */
+  @Test(timeout = 60_000L)
+  public void testFreshStartAndStop()
+  {
+    TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
+    DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = 
EasyMock.createMock(DruidNodeDiscoveryProvider.class);
+    
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
+            .andReturn(druidNodeDiscovery).times(2);
+    ProvisioningStrategy provisioningStrategy = 
EasyMock.createMock(ProvisioningStrategy.class);
+    ProvisioningService provisioningService = 
EasyMock.createNiceMock(ProvisioningService.class);
+    
EasyMock.expect(provisioningStrategy.makeProvisioningService(isA(HttpRemoteTaskRunnerV2.class)))
+            .andReturn(provisioningService);
+    provisioningService.close();
+    EasyMock.expectLastCall();
+    EasyMock.replay(druidNodeDiscoveryProvider, provisioningStrategy, 
provisioningService);
+
+    DiscoveryDruidNode druidNode1 = new DiscoveryDruidNode(
+        new DruidNode("service", "host1", false, 8080, null, true, false),
+        NodeRole.MIDDLE_MANAGER,
+        ImmutableMap.of(
+            WorkerNodeService.DISCOVERY_SERVICE_KEY, new 
WorkerNodeService("ip1", 2, "0", WorkerConfig.DEFAULT_CATEGORY)
+        )
+    );
+
+    DiscoveryDruidNode druidNode2 = new DiscoveryDruidNode(
+        new DruidNode("service", "host2", false, 8080, null, true, false),
+        NodeRole.MIDDLE_MANAGER,
+        ImmutableMap.of(
+            WorkerNodeService.DISCOVERY_SERVICE_KEY, new 
WorkerNodeService("ip2", 2, "0", WorkerConfig.DEFAULT_CATEGORY)
+        )
+    );
+
+    HttpRemoteTaskRunnerV2 taskRunner = newHttpTaskRunnerInstance(
+        druidNodeDiscoveryProvider,
+        provisioningStrategy
+    );
+
+    taskRunner.start();
+    
druidNodeDiscovery.getListeners().get(0).nodesAdded(ImmutableList.of(druidNode1,
 druidNode2));
+    ConcurrentMap<String, WorkerHolder> workers = 
taskRunner.getWorkersForTestingReadOnly();
+    Assert.assertEquals(2, workers.size());
+    Assert.assertTrue(workers.values().stream().noneMatch(w -> 
w.getUnderlyingSyncer().isExecutorShutdown()));
+    workers.values().iterator().next().stop();
+    taskRunner.stop();
+    Assert.assertTrue(druidNodeDiscovery.getListeners().isEmpty());
+    Assert.assertEquals(2, workers.size());
+    Assert.assertTrue(workers.values().stream().allMatch(w -> 
w.getUnderlyingSyncer().isExecutorShutdown()));
+    EasyMock.verify(druidNodeDiscoveryProvider, provisioningStrategy, 
provisioningService);
+  }
+
+  /*
+  Simulates startup of Overlord with no provisoner. Overlord is then stopped 
and is expected to close down certain
+  things.
+   */
+  @Test(timeout = 60_000L)
+  public void testFreshStartAndStopNoProvisioner()
+  {
+    TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
+    ProvisioningStrategy provisioningStrategy = 
EasyMock.createMock(ProvisioningStrategy.class);
+
+    DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = 
EasyMock.createMock(DruidNodeDiscoveryProvider.class);
+    
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
+            .andReturn(druidNodeDiscovery).times(2);
+    
EasyMock.expect(provisioningStrategy.makeProvisioningService(isA(HttpRemoteTaskRunnerV2.class)))
+            .andReturn(null);
+    EasyMock.expectLastCall();
+    EasyMock.replay(druidNodeDiscoveryProvider, provisioningStrategy);
+
+    HttpRemoteTaskRunnerV2 taskRunner = new HttpRemoteTaskRunnerV2(
+        TestHelper.makeJsonMapper(),
+        new HttpRemoteTaskRunnerConfig()
+        {
+          @Override
+          public int getPendingTasksRunnerNumThreads()
+          {
+            return 3;
+          }
+        },
+        EasyMock.createNiceMock(HttpClient.class),
+        DSuppliers.of(new 
AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())),
+        provisioningStrategy,
+        druidNodeDiscoveryProvider,
+        EasyMock.createNiceMock(TaskStorage.class),
+        new NoopServiceEmitter()
+    )
+    {
+      @Override
+      protected WorkerHolder createWorkerHolder(
+          ObjectMapper smileMapper,
+          HttpClient httpClient,
+          HttpRemoteTaskRunnerConfig config,
+          ScheduledExecutorService workersSyncExec,
+          WorkerHolder.Listener listener,
+          Worker worker,
+          List<TaskAnnouncement> knownAnnouncements
+      )
+      {
+        return HttpRemoteTaskRunnerV2Test.createWorkerHolder(
+            smileMapper,
+            httpClient,
+            config,
+            workersSyncExec,
+            listener,
+            worker,
+            ImmutableList.of(),
+            ImmutableList.of(),
+            ImmutableMap.of(),
+            new AtomicInteger(),
+            ImmutableSet.of()
+        );
+      }
+    };
+
+    taskRunner.start();
+    taskRunner.stop();
+    EasyMock.verify(druidNodeDiscoveryProvider, provisioningStrategy);
+  }
+
+  /*
+  Simulates one task not getting acknowledged to be running after assigning it 
to a worker. But, other tasks are
+  successfully assigned to other worker and get completed.
+   */
+  @Test(timeout = 60_000L)
+  public void testOneStuckTaskAssignmentDoesntBlockOthers() throws Exception
+  {
+    TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
+    DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = 
EasyMock.createMock(DruidNodeDiscoveryProvider.class);
+    
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
+            .andReturn(druidNodeDiscovery);
+    EasyMock.replay(druidNodeDiscoveryProvider);
+
+    Task task1 = NoopTask.create();
+    Task task2 = NoopTask.create();
+    Task task3 = NoopTask.create();
+
+    HttpRemoteTaskRunnerV2 taskRunner = new HttpRemoteTaskRunnerV2(
+        TestHelper.makeJsonMapper(),
+        new HttpRemoteTaskRunnerConfig()
+        {
+          @Override
+          public int getPendingTasksRunnerNumThreads()
+          {
+            return 3;
+          }
+        },
+        EasyMock.createNiceMock(HttpClient.class),
+        DSuppliers.of(new 
AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())),
+        new NoopProvisioningStrategy<>(),
+        druidNodeDiscoveryProvider,
+        EasyMock.createNiceMock(TaskStorage.class),
+        new NoopServiceEmitter()
+    )
+    {
+      @Override
+      protected WorkerHolder createWorkerHolder(
+          ObjectMapper smileMapper,
+          HttpClient httpClient,
+          HttpRemoteTaskRunnerConfig config,
+          ScheduledExecutorService workersSyncExec,
+          WorkerHolder.Listener listener,
+          Worker worker,
+          List<TaskAnnouncement> knownAnnouncements
+      )
+      {
+        return HttpRemoteTaskRunnerV2Test.createWorkerHolder(
+            smileMapper,
+            httpClient,
+            config,
+            workersSyncExec,
+            listener,
+            worker,
+            ImmutableList.of(),
+            ImmutableList.of(),
+            ImmutableMap.of(task1, ImmutableList.of()), //no announcements 
would be received for task1
+            new AtomicInteger(),
+            ImmutableSet.of()
+        );
+      }
+    };
+
+    taskRunner.start();
+
+    DiscoveryDruidNode druidNode1 = new DiscoveryDruidNode(
+        new DruidNode("service", "host1", false, 8080, null, true, false),
+        NodeRole.MIDDLE_MANAGER,
+        ImmutableMap.of(
+            WorkerNodeService.DISCOVERY_SERVICE_KEY, new 
WorkerNodeService("ip1", 2, "0", WorkerConfig.DEFAULT_CATEGORY)
+        )
+    );
+
+    DiscoveryDruidNode druidNode2 = new DiscoveryDruidNode(
+        new DruidNode("service", "host2", false, 8080, null, true, false),
+        NodeRole.MIDDLE_MANAGER,
+        ImmutableMap.of(
+            WorkerNodeService.DISCOVERY_SERVICE_KEY, new 
WorkerNodeService("ip2", 2, "0", WorkerConfig.DEFAULT_CATEGORY)
+        )
+    );
+
+    
druidNodeDiscovery.getListeners().get(0).nodesAdded(ImmutableList.of(druidNode1,
 druidNode2));
+
+    taskRunner.run(task1);
+    Future<TaskStatus> future2 = taskRunner.run(task2);
+    Future<TaskStatus> future3 = taskRunner.run(task3);
+
+    Assert.assertTrue(future2.get().isSuccess());
+    Assert.assertTrue(future3.get().isSuccess());
+
+    Assert.assertEquals(task1.getId(), 
Iterables.getOnlyElement(taskRunner.getPendingTasks()).getTaskId());
+  }
+
+  /*
+  Simulates restart of the Overlord where taskRunner, on start, discovers 
workers with prexisting tasks.
+   */
+  @Test(timeout = 60_000L)
+  public void testTaskRunnerRestart() throws Exception
+  {
+    TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
+    DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = 
EasyMock.createMock(DruidNodeDiscoveryProvider.class);
+    
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
+            .andReturn(druidNodeDiscovery);
+    EasyMock.replay(druidNodeDiscoveryProvider);
+
+    ConcurrentMap<String, CustomFunction> workerHolders = new 
ConcurrentHashMap<>();
+
+    Task task1 = NoopTask.create();
+    Task task2 = NoopTask.create();
+    Task task3 = NoopTask.create();
+    Task task4 = NoopTask.create();
+    Task task5 = NoopTask.create();
+
+    TaskStorage taskStorageMock = EasyMock.createStrictMock(TaskStorage.class);
+    
EasyMock.expect(taskStorageMock.getStatus(task1.getId())).andReturn(Optional.absent());
+    
EasyMock.expect(taskStorageMock.getStatus(task2.getId())).andReturn(Optional.absent()).times(2);
+    
EasyMock.expect(taskStorageMock.getStatus(task3.getId())).andReturn(Optional.of(TaskStatus.running(task3.getId())));
+    
EasyMock.expect(taskStorageMock.getStatus(task4.getId())).andReturn(Optional.of(TaskStatus.running(task4.getId())));
+    
EasyMock.expect(taskStorageMock.getStatus(task5.getId())).andReturn(Optional.of(TaskStatus.success(task5.getId())));
+    EasyMock.replay(taskStorageMock);
+
+    HttpRemoteTaskRunnerV2 taskRunner = new HttpRemoteTaskRunnerV2(
+        TestHelper.makeJsonMapper(),
+        new HttpRemoteTaskRunnerConfig()
+        {
+          @Override
+          public int getPendingTasksRunnerNumThreads()
+          {
+            return 3;
+          }
+        },
+        EasyMock.createNiceMock(HttpClient.class),
+        DSuppliers.of(new 
AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())),
+        new NoopProvisioningStrategy<>(),
+        druidNodeDiscoveryProvider,
+        taskStorageMock,
+        new NoopServiceEmitter()
+    )
+    {
+      @Override
+      protected WorkerHolder createWorkerHolder(
+          ObjectMapper smileMapper,
+          HttpClient httpClient,
+          HttpRemoteTaskRunnerConfig config,
+          ScheduledExecutorService workersSyncExec,
+          WorkerHolder.Listener listener,
+          Worker worker,
+          List<TaskAnnouncement> knownAnnouncements
+      )
+      {
+        if (workerHolders.containsKey(worker.getHost())) {
+          return workerHolders.get(worker.getHost()).apply(
+              smileMapper,
+              httpClient,
+              config,
+              workersSyncExec,
+              listener,
+              worker,
+              knownAnnouncements
+          );
+        } else {
+          throw new ISE("No WorkerHolder for [%s].", worker.getHost());
+        }
+      }
+    };
+
+    taskRunner.start();
+
+    DiscoveryDruidNode druidNode = new DiscoveryDruidNode(
+        new DruidNode("service", "host", false, 1234, null, true, false),
+        NodeRole.MIDDLE_MANAGER,
+        ImmutableMap.of(
+            WorkerNodeService.DISCOVERY_SERVICE_KEY, new 
WorkerNodeService("ip1", 2, "0", WorkerConfig.DEFAULT_CATEGORY)
+        )
+    );
+
+    AtomicInteger ticks = new AtomicInteger();
+    Set<String> taskShutdowns = new HashSet<>();
+
+    workerHolders.put(
+        "host:1234",
+        (mapper, httpClient, config, exec, listener, worker, 
knownAnnouncements) -> createWorkerHolder(
+            mapper,
+            httpClient,
+            config,
+            exec,
+            listener,
+            worker,
+            knownAnnouncements,
+            ImmutableList.of(
+                TaskAnnouncement.create(
+                    task1,
+                    TaskStatus.success(task1.getId()),
+                    TaskLocation.create("host", 1234, 1235)
+                ),
+                TaskAnnouncement.create(
+                    task2,
+                    TaskStatus.running(task2.getId()),
+                    TaskLocation.create("host", 1234, 1235)
+                ),
+                TaskAnnouncement.create(
+                    task2,
+                    TaskStatus.success(task2.getId()),
+                    TaskLocation.create("host", 1234, 1235)
+                ),
+                TaskAnnouncement.create(
+                    task3,
+                    TaskStatus.success(task3.getId()),
+                    TaskLocation.create("host", 1234, 1235)
+                ),
+                TaskAnnouncement.create(
+                    task4,
+                    TaskStatus.running(task4.getId()),
+                    TaskLocation.create("host", 1234, 1235)
+                ),
+                TaskAnnouncement.create(
+                    task5,
+                    TaskStatus.running(task5.getId()),
+                    TaskLocation.create("host", 1234, 1235)
+                )
+            ),
+            ImmutableMap.of(),
+            ticks,
+            taskShutdowns
+        )
+    );
+
+    druidNodeDiscovery.getListeners().get(0).nodesAdded(
+        ImmutableList.of(
+            druidNode
+        )
+    );
+
+    while (ticks.get() < 1) {
+      Thread.sleep(100);
+    }
+
+    EasyMock.verify(taskStorageMock);
+
+    Assert.assertEquals(ImmutableSet.of(task2.getId(), task5.getId()), 
taskShutdowns);
+    Assert.assertTrue(taskRunner.getPendingTasks().isEmpty());
+
+    TaskRunnerWorkItem item = 
Iterables.getOnlyElement(taskRunner.getRunningTasks());
+    Assert.assertEquals(task4.getId(), item.getTaskId());
+
+    Assert.assertTrue(taskRunner.run(task3).get().isSuccess());
+
+    Assert.assertEquals(2, taskRunner.getKnownTasks().size());
+
+  }
+
+  @Test(timeout = 60_000L)
+  public void testWorkerDisapperAndReappearBeforeItsCleanup() throws Exception
+  {
+    TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
+    DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = 
EasyMock.createMock(DruidNodeDiscoveryProvider.class);
+    
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
+            .andReturn(druidNodeDiscovery);
+    EasyMock.replay(druidNodeDiscoveryProvider);
+
+    ConcurrentMap<String, CustomFunction> workerHolders = new 
ConcurrentHashMap<>();
+
+    HttpRemoteTaskRunnerV2 taskRunner = new HttpRemoteTaskRunnerV2(
+        TestHelper.makeJsonMapper(),
+        new HttpRemoteTaskRunnerConfig()
+        {
+          @Override
+          public int getPendingTasksRunnerNumThreads()
+          {
+            return 3;
+          }
+        },
+        EasyMock.createNiceMock(HttpClient.class),
+        DSuppliers.of(new 
AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())),
+        new NoopProvisioningStrategy<>(),
+        druidNodeDiscoveryProvider,
+        EasyMock.createNiceMock(TaskStorage.class),
+        new NoopServiceEmitter()
+    )
+    {
+      @Override
+      protected WorkerHolder createWorkerHolder(
+          ObjectMapper smileMapper,
+          HttpClient httpClient,
+          HttpRemoteTaskRunnerConfig config,
+          ScheduledExecutorService workersSyncExec,
+          WorkerHolder.Listener listener,
+          Worker worker,
+          List<TaskAnnouncement> knownAnnouncements
+      )
+      {
+        if (workerHolders.containsKey(worker.getHost())) {
+          return workerHolders.get(worker.getHost()).apply(
+              smileMapper,
+              httpClient,
+              config,
+              workersSyncExec,
+              listener,
+              worker,
+              knownAnnouncements
+          );
+        } else {
+          throw new ISE("No WorkerHolder for [%s].", worker.getHost());
+        }
+      }
+    };
+
+    taskRunner.start();
+
+    Task task1 = NoopTask.create();
+    Task task2 = NoopTask.create();
+
+    DiscoveryDruidNode druidNode = new DiscoveryDruidNode(
+        new DruidNode("service", "host", false, 1234, null, true, false),
+        NodeRole.MIDDLE_MANAGER,
+        ImmutableMap.of(
+            WorkerNodeService.DISCOVERY_SERVICE_KEY, new 
WorkerNodeService("ip1", 2, "0", WorkerConfig.DEFAULT_CATEGORY)
+        )
+    );
+
+    workerHolders.put(
+        "host:1234",
+        (mapper, httpClient, config, exec, listener, worker, 
knownAnnouncements) -> createWorkerHolder(
+            mapper,
+            httpClient,
+            config,
+            exec,
+            listener,
+            worker,
+            knownAnnouncements,
+            ImmutableList.of(),
+            ImmutableMap.of(
+                task1, ImmutableList.of(
+                    TaskAnnouncement.create(
+                        task1,
+                        TaskStatus.running(task1.getId()),
+                        TaskLocation.unknown()
+                    ),
+                    TaskAnnouncement.create(
+                        task1,
+                        TaskStatus.running(task1.getId()),
+                        TaskLocation.create("host", 1234, 1235)
+                    ),
+                    TaskAnnouncement.create(
+                        task1,
+                        TaskStatus.success(task1.getId()),
+                        TaskLocation.create("host", 1234, 1235)
+                    )
+                ),
+                task2, ImmutableList.of(
+                    TaskAnnouncement.create(
+                        task2,
+                        TaskStatus.running(task2.getId()),
+                        TaskLocation.unknown()
+                    ),
+                    TaskAnnouncement.create(
+                        task2,
+                        TaskStatus.running(task2.getId()),
+                        TaskLocation.create("host", 1234, 1235)
+                    )
+                )
+            ),
+            new AtomicInteger(),
+            ImmutableSet.of()
+        )
+    );
+
+    druidNodeDiscovery.getListeners().get(0).nodesAdded(
+        ImmutableList.of(
+            druidNode
+        )
+    );
+
+    Future<TaskStatus> future1 = taskRunner.run(task1);
+    Future<TaskStatus> future2 = taskRunner.run(task2);
+
+    while (taskRunner.getPendingTasks().size() > 0) {
+      Thread.sleep(100);
+    }
+
+    druidNodeDiscovery.getListeners().get(0).nodesRemoved(
+        ImmutableList.of(
+            druidNode
+        )
+    );
+
+    workerHolders.put(
+        "host:1234",
+        (mapper, httpClient, config, exec, listener, worker, 
knownAnnouncements) -> createWorkerHolder(
+            mapper,
+            httpClient,
+            config,
+            exec,
+            listener,
+            worker,
+            knownAnnouncements,
+            ImmutableList.of(
+                TaskAnnouncement.create(
+                    task2,
+                    TaskStatus.running(task2.getId()),
+                    TaskLocation.create("host", 1234, 1235)
+                ),
+                TaskAnnouncement.create(
+                    task2,
+                    TaskStatus.success(task2.getId()),
+                    TaskLocation.create("host", 1234, 1235)
+                )
+
+            ),
+            ImmutableMap.of(),
+            new AtomicInteger(),
+            ImmutableSet.of()
+        )
+    );
+
+    druidNodeDiscovery.getListeners().get(0).nodesAdded(
+        ImmutableList.of(
+            druidNode
+        )
+    );
+
+    Assert.assertTrue(future1.get().isSuccess());
+    Assert.assertTrue(future2.get().isSuccess());
+  }
+
+  @Test(timeout = 60_000L)
+  public void testWorkerDisapperAndReappearAfterItsCleanup() throws Exception
+  {
+    TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
+    DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = 
EasyMock.createMock(DruidNodeDiscoveryProvider.class);
+    
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
+            .andReturn(druidNodeDiscovery);
+    EasyMock.replay(druidNodeDiscoveryProvider);
+
+    ConcurrentMap<String, CustomFunction> workerHolders = new 
ConcurrentHashMap<>();
+
+    HttpRemoteTaskRunnerV2 taskRunner = new HttpRemoteTaskRunnerV2(
+        TestHelper.makeJsonMapper(),
+        new HttpRemoteTaskRunnerConfig()
+        {
+          @Override
+          public Period getTaskCleanupTimeout()
+          {
+            return Period.millis(1);
+          }
+        },
+        EasyMock.createNiceMock(HttpClient.class),
+        DSuppliers.of(new 
AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())),
+        new NoopProvisioningStrategy<>(),
+        druidNodeDiscoveryProvider,
+        EasyMock.createNiceMock(TaskStorage.class),
+        new NoopServiceEmitter()
+    )
+    {
+      @Override
+      protected WorkerHolder createWorkerHolder(
+          ObjectMapper smileMapper,
+          HttpClient httpClient,
+          HttpRemoteTaskRunnerConfig config,
+          ScheduledExecutorService workersSyncExec,
+          WorkerHolder.Listener listener,
+          Worker worker,
+          List<TaskAnnouncement> knownAnnouncements
+      )
+      {
+        if (workerHolders.containsKey(worker.getHost())) {
+          return workerHolders.get(worker.getHost()).apply(
+              smileMapper,
+              httpClient,
+              config,
+              workersSyncExec,
+              listener,
+              worker,
+              knownAnnouncements
+          );
+        } else {
+          throw new ISE("No WorkerHolder for [%s].", worker.getHost());
+        }
+      }
+    };
+
+    taskRunner.start();
+
+    Task task1 = NoopTask.create();
+    Task task2 = NoopTask.create();
+
+    DiscoveryDruidNode druidNode = new DiscoveryDruidNode(
+        new DruidNode("service", "host", false, 1234, null, true, false),
+        NodeRole.MIDDLE_MANAGER,
+        ImmutableMap.of(
+            WorkerNodeService.DISCOVERY_SERVICE_KEY,
+            new WorkerNodeService("ip1", 2, "0", WorkerConfig.DEFAULT_CATEGORY)
+        )
+    );
+
+    workerHolders.put(
+        "host:1234",
+        (mapper, httpClient, config, exec, listener, worker, 
knownAnnouncements) -> createWorkerHolder(
+            mapper,
+            httpClient,
+            config,
+            exec,
+            listener,
+            worker,
+            knownAnnouncements,
+            ImmutableList.of(),
+            ImmutableMap.of(
+                task1, ImmutableList.of(
+                    TaskAnnouncement.create(
+                        task1,
+                        TaskStatus.running(task1.getId()),
+                        TaskLocation.unknown()
+                    ),
+                    TaskAnnouncement.create(
+                        task1,
+                        TaskStatus.running(task1.getId()),
+                        TaskLocation.create("host", 1234, 1235)
+                    )
+                ),
+                task2, ImmutableList.of(
+                    TaskAnnouncement.create(
+                        task2,
+                        TaskStatus.running(task2.getId()),
+                        TaskLocation.unknown()
+                    ),
+                    TaskAnnouncement.create(
+                        task2,
+                        TaskStatus.running(task2.getId()),
+                        TaskLocation.create("host", 1234, 1235)
+                    )
+                )
+            ),
+            new AtomicInteger(),
+            ImmutableSet.of()
+        )
+    );
+
+    druidNodeDiscovery.getListeners().get(0).nodesAdded(
+        ImmutableList.of(
+            druidNode
+        )
+    );
+
+    Future<TaskStatus> future1 = taskRunner.run(task1);
+    Future<TaskStatus> future2 = taskRunner.run(task2);
+
+    while (taskRunner.getPendingTasks().size() > 0) {
+      Thread.sleep(100);
+    }
+
+    druidNodeDiscovery.getListeners().get(0).nodesRemoved(
+        ImmutableList.of(
+            druidNode
+        )
+    );
+
+    Assert.assertTrue(future1.get().isFailure());
+    Assert.assertTrue(future2.get().isFailure());
+    Assert.assertNotNull(future1.get().getErrorMsg());
+    Assert.assertNotNull(future2.get().getErrorMsg());
+    Assert.assertTrue(
+        future1.get().getErrorMsg().startsWith(
+            "The worker that this task was assigned disappeared and did not 
report cleanup within timeout"
+        )
+    );
+    Assert.assertTrue(
+        future2.get().getErrorMsg().startsWith(
+            "The worker that this task was assigned disappeared and did not 
report cleanup within timeout"
+        )
+    );
+
+    AtomicInteger ticks = new AtomicInteger();
+    Set<String> actualShutdowns = Collections.newSetFromMap(new 
ConcurrentHashMap<>());
+
+    workerHolders.put(
+        "host:1234",
+        (mapper, httpClient, config, exec, listener, worker, 
knownAnnouncements) -> createWorkerHolder(
+            mapper,
+            httpClient,
+            config,
+            exec,
+            listener,
+            worker,
+            knownAnnouncements,
+            ImmutableList.of(
+                TaskAnnouncement.create(
+                    task1,
+                    TaskStatus.success(task1.getId()),
+                    TaskLocation.create("host", 1234, 1235)
+                ),
+                TaskAnnouncement.create(
+                    task2,
+                    TaskStatus.running(task2.getId()),
+                    TaskLocation.create("host", 1234, 1235)
+                )
+            ),
+            ImmutableMap.of(),
+            ticks,
+            actualShutdowns
+        )
+    );
+
+    druidNodeDiscovery.getListeners().get(0).nodesAdded(
+        ImmutableList.of(
+            druidNode
+        )
+    );
+
+    while (ticks.get() < 1) {
+      Thread.sleep(100);
+    }
+
+    Assert.assertEquals(ImmutableSet.of(task2.getId()), actualShutdowns);
+    Assert.assertTrue(taskRunner.run(task1).get().isFailure());
+    Assert.assertTrue(taskRunner.run(task2).get().isFailure());
+  }
+
+  @Test(timeout = 60_000L)
+  public void testMarkWorkersLazy() throws Exception
+  {
+    TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
+    DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = 
EasyMock.createMock(DruidNodeDiscoveryProvider.class);
+    
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
+            .andReturn(druidNodeDiscovery);
+    EasyMock.replay(druidNodeDiscoveryProvider);
+
+    Task task1 = NoopTask.create();
+    Task task2 = NoopTask.create();
+    String additionalWorkerCategory = "category2";
+
+    ConcurrentMap<String, CustomFunction> workerHolders = new 
ConcurrentHashMap<>();
+
+    HttpRemoteTaskRunnerV2 taskRunner = new HttpRemoteTaskRunnerV2(
+        TestHelper.makeJsonMapper(),
+        new HttpRemoteTaskRunnerConfig()
+        {
+          @Override
+          public int getPendingTasksRunnerNumThreads()
+          {
+            return 3;
+          }
+        },
+        EasyMock.createNiceMock(HttpClient.class),
+        DSuppliers.of(new 
AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())),
+        new NoopProvisioningStrategy<>(),
+        druidNodeDiscoveryProvider,
+        EasyMock.createNiceMock(TaskStorage.class),
+        new NoopServiceEmitter()
+    )
+    {
+      @Override
+      protected WorkerHolder createWorkerHolder(
+          ObjectMapper smileMapper,
+          HttpClient httpClient,
+          HttpRemoteTaskRunnerConfig config,
+          ScheduledExecutorService workersSyncExec,
+          WorkerHolder.Listener listener,
+          Worker worker,
+          List<TaskAnnouncement> knownAnnouncements
+      )
+      {
+        if (workerHolders.containsKey(worker.getHost())) {
+          return workerHolders.get(worker.getHost()).apply(
+              smileMapper,
+              httpClient,
+              config,
+              workersSyncExec,
+              listener,
+              worker,
+              knownAnnouncements
+          );
+        } else {
+          throw new ISE("No WorkerHolder for [%s].", worker.getHost());
+        }
+      }
+    };
+
+    taskRunner.start();
+
+    Assert.assertTrue(taskRunner.getTotalTaskSlotCount().isEmpty());
+    Assert.assertTrue(taskRunner.getIdleTaskSlotCount().isEmpty());
+    Assert.assertTrue(taskRunner.getUsedTaskSlotCount().isEmpty());
+
+    AtomicInteger ticks = new AtomicInteger();
+
+    DiscoveryDruidNode druidNode1 = new DiscoveryDruidNode(
+        new DruidNode("service", "host1", false, 8080, null, true, false),
+        NodeRole.MIDDLE_MANAGER,
+        ImmutableMap.of(
+            WorkerNodeService.DISCOVERY_SERVICE_KEY, new 
WorkerNodeService("ip1", 1, "0", WorkerConfig.DEFAULT_CATEGORY)
+        )
+    );
+
+    workerHolders.put(
+        "host1:8080",
+        (mapper, httpClient, config, exec, listener, worker, 
knownAnnouncements) -> createWorkerHolder(
+            mapper,
+            httpClient,
+            config,
+            exec,
+            listener,
+            worker,
+            knownAnnouncements,
+            ImmutableList.of(),
+            ImmutableMap.of(
+                task1, ImmutableList.of(
+                    TaskAnnouncement.create(
+                        task1,
+                        TaskStatus.running(task1.getId()),
+                        TaskLocation.unknown()
+                    ),
+                    TaskAnnouncement.create(
+                        task1,
+                        TaskStatus.running(task1.getId()),
+                        TaskLocation.create("host1", 8080, -1)
+                    )
+                )
+            ),
+            ticks,
+            ImmutableSet.of()
+        )
+    );
+
+    
druidNodeDiscovery.getListeners().get(0).nodesAdded(ImmutableList.of(druidNode1));
+
+    Assert.assertEquals(1, 
taskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
+    Assert.assertEquals(1, 
taskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
+    Assert.assertEquals(0, 
taskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
+
+    taskRunner.run(task1);
+
+    while (ticks.get() < 1) {
+      Thread.sleep(100);
+    }
+
+    Assert.assertEquals(1, 
taskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
+    Assert.assertEquals(0, 
taskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
+    Assert.assertEquals(1, 
taskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
+
+    DiscoveryDruidNode druidNode2 = new DiscoveryDruidNode(
+        new DruidNode("service", "host2", false, 8080, null, true, false),
+        NodeRole.MIDDLE_MANAGER,
+        ImmutableMap.of(
+            WorkerNodeService.DISCOVERY_SERVICE_KEY,
+            new WorkerNodeService("ip2", 1, "0", additionalWorkerCategory)
+        )
+    );
+
+    workerHolders.put(
+        "host2:8080",
+        (mapper, httpClient, config, exec, listener, worker, 
knownAnnouncements) -> createWorkerHolder(
+            mapper,
+            httpClient,
+            config,
+            exec,
+            listener,
+            worker,
+            knownAnnouncements,
+            ImmutableList.of(),
+            ImmutableMap.of(task2, ImmutableList.of()),
+            ticks,
+            ImmutableSet.of()
+        )
+    );
+
+    
druidNodeDiscovery.getListeners().get(0).nodesAdded(ImmutableList.of(druidNode2));
+
+    Assert.assertEquals(1, 
taskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
+    Assert.assertEquals(1, 
taskRunner.getTotalTaskSlotCount().get(additionalWorkerCategory).longValue());
+    Assert.assertEquals(0, 
taskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
+    Assert.assertEquals(1, 
taskRunner.getIdleTaskSlotCount().get(additionalWorkerCategory).longValue());
+    Assert.assertEquals(1, 
taskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
+    Assert.assertEquals(0, 
taskRunner.getUsedTaskSlotCount().get(additionalWorkerCategory).longValue());
+
+    taskRunner.run(task2);
+
+    while (ticks.get() < 2) {
+      Thread.sleep(100);
+    }
+
+    Assert.assertEquals(1, 
taskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
+    Assert.assertEquals(1, 
taskRunner.getTotalTaskSlotCount().get(additionalWorkerCategory).longValue());
+    Assert.assertEquals(0, 
taskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
+    
Assert.assertFalse(taskRunner.getIdleTaskSlotCount().containsKey(additionalWorkerCategory));
+    Assert.assertEquals(1, 
taskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
+    Assert.assertEquals(0, 
taskRunner.getUsedTaskSlotCount().get(additionalWorkerCategory).longValue());
+
+    DiscoveryDruidNode druidNode3 = new DiscoveryDruidNode(
+        new DruidNode("service", "host3", false, 8080, null, true, false),
+        NodeRole.MIDDLE_MANAGER,
+        ImmutableMap.of(
+            WorkerNodeService.DISCOVERY_SERVICE_KEY,
+            new WorkerNodeService("ip2", 1, "0", WorkerConfig.DEFAULT_CATEGORY)
+        )
+    );
+
+    workerHolders.put(
+        "host3:8080",
+        (mapper, httpClient, config, exec, listener, worker, 
knownAnnouncements) -> createWorkerHolder(
+            mapper,
+            httpClient,
+            config,
+            exec,
+            listener,
+            worker,
+            knownAnnouncements,
+            ImmutableList.of(),
+            ImmutableMap.of(),
+            new AtomicInteger(),
+            ImmutableSet.of()
+        )
+    );
+
+    
druidNodeDiscovery.getListeners().get(0).nodesAdded(ImmutableList.of(druidNode3));
+
+    Assert.assertEquals(2, 
taskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
+    Assert.assertEquals(1, 
taskRunner.getTotalTaskSlotCount().get(additionalWorkerCategory).longValue());
+    Assert.assertEquals(1, 
taskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
+    
Assert.assertFalse(taskRunner.getIdleTaskSlotCount().containsKey(additionalWorkerCategory));
+    Assert.assertEquals(1, 
taskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
+    Assert.assertEquals(0, 
taskRunner.getUsedTaskSlotCount().get(additionalWorkerCategory).longValue());
+    
Assert.assertFalse(taskRunner.getLazyTaskSlotCount().containsKey(WorkerConfig.DEFAULT_CATEGORY));
+    
Assert.assertFalse(taskRunner.getLazyTaskSlotCount().containsKey(additionalWorkerCategory));
+
+    Assert.assertEquals(task1.getId(), 
Iterables.getOnlyElement(taskRunner.getRunningTasks()).getTaskId());
+    Assert.assertEquals(task2.getId(), 
Iterables.getOnlyElement(taskRunner.getPendingTasks()).getTaskId());
+
+    Assert.assertEquals(
+        Collections.emptyList(),
+        taskRunner.markWorkersLazy(Predicates.alwaysTrue(), 0)
+    );
+
+    Assert.assertEquals(
+        "host3:8080",
+        
Iterables.getOnlyElement(taskRunner.markWorkersLazy(Predicates.alwaysTrue(), 1))
+                 .getHost()
+    );
+
+    Assert.assertEquals(
+        "host3:8080",
+        
Iterables.getOnlyElement(taskRunner.markWorkersLazy(Predicates.alwaysTrue(), 
Integer.MAX_VALUE))
+                 .getHost()
+    );
+
+    Assert.assertEquals(2, 
taskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
+    Assert.assertEquals(1, 
taskRunner.getTotalTaskSlotCount().get(additionalWorkerCategory).longValue());
+    Assert.assertEquals(0, 
taskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
+    
Assert.assertFalse(taskRunner.getIdleTaskSlotCount().containsKey(additionalWorkerCategory));
+    Assert.assertEquals(1, 
taskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
+    Assert.assertEquals(0, 
taskRunner.getUsedTaskSlotCount().get(additionalWorkerCategory).longValue());
+    Assert.assertEquals(1, 
taskRunner.getLazyTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
+    
Assert.assertFalse(taskRunner.getLazyTaskSlotCount().containsKey(additionalWorkerCategory));
+  }
+
+  /*
+   * Task goes PENDING -> RUNNING -> SUCCESS and few more useless 
notifications in between.
+   */
+  @Test(timeout = 60_000L)
+  public void testTaskAddedOrUpdated1() throws Exception
+  {
+    Task task = NoopTask.create();
+    List<Object> listenerNotificationsAccumulator = new ArrayList<>();
+    HttpRemoteTaskRunnerV2 taskRunner = 
createTaskRunnerForTestTaskAddedOrUpdated(
+        EasyMock.createStrictMock(TaskStorage.class),
+        listenerNotificationsAccumulator
+    );
+
+    Worker worker = new Worker("http", "worker", "127.0.0.1", 1, "v1", 
WorkerConfig.DEFAULT_CATEGORY);
+    WorkerHolder workerHolder = EasyMock.createMock(WorkerHolder.class);
+    EasyMock.expect(workerHolder.getWorker())
+            .andReturn(worker)
+            .anyTimes();
+    
EasyMock.expect(workerHolder.getState()).andReturn(WorkerHolder.State.READY).anyTimes();
+    EasyMock.expect(workerHolder.isInitialized()).andReturn(true).anyTimes();
+    EasyMock.expect(workerHolder.isEnabled()).andReturn(true).anyTimes();
+    EasyMock.expect(workerHolder.toImmutable())
+            .andReturn(new ImmutableWorkerInfo(worker, 0, ImmutableSet.of(), 
ImmutableSet.of(), DateTimes.nowUtc()))
+            .anyTimes();
+    workerHolder.setState(EasyMock.anyObject());
+    EasyMock.expectLastCall().anyTimes();
+    workerHolder.setLastCompletedTaskTime(EasyMock.anyObject());
+    workerHolder.resetContinuouslyFailedTasksCount();
+    workerHolder.setBlacklistedUntil(null);
+    
EasyMock.expect(workerHolder.getContinuouslyFailedTasksCount()).andReturn(0);
+    EasyMock.replay(workerHolder);
+
+    // Register the mock worker in the workers map so taskComplete() can find 
it
+    taskRunner.getWorkersForTestingReadOnly().put(worker.getHost(), 
workerHolder);
+
+    Future<TaskStatus> future = taskRunner.run(task);
+    Assert.assertEquals(task.getId(), 
Iterables.getOnlyElement(taskRunner.getPendingTasks()).getTaskId());
+
+    // RUNNING notification from worker
+    taskRunner.taskAddedOrUpdated(
+        TaskAnnouncement.create(
+            task,
+            TaskStatus.running(task.getId()),
+            TaskLocation.create("worker", 1000, 1001)
+        ), workerHolder
+    );
+    Assert.assertEquals(task.getId(), 
Iterables.getOnlyElement(taskRunner.getRunningTasks()).getTaskId());
+
+    // Another RUNNING notification from worker, notifying change in location
+    taskRunner.taskAddedOrUpdated(
+        TaskAnnouncement.create(
+            task,
+            TaskStatus.running(task.getId()),
+            TaskLocation.create("worker", 1, 2)
+        ), workerHolder
+    );
+    Assert.assertEquals(task.getId(), 
Iterables.getOnlyElement(taskRunner.getRunningTasks()).getTaskId());
+
+    // Redundant RUNNING notification from worker, ignored
+    taskRunner.taskAddedOrUpdated(
+        TaskAnnouncement.create(
+            task,
+            TaskStatus.running(task.getId()),
+            TaskLocation.create("worker", 1, 2)
+        ), workerHolder
+    );
+    Assert.assertEquals(task.getId(), 
Iterables.getOnlyElement(taskRunner.getRunningTasks()).getTaskId());
+
+    // Another "rogue-worker" reports running it, and gets asked to shutdown 
the task
+    WorkerHolder rogueWorkerHolder = EasyMock.createMock(WorkerHolder.class);
+    EasyMock.expect(rogueWorkerHolder.getWorker())
+            .andReturn(new Worker("http", "rogue-worker", "127.0.0.1", 5, 
"v1", WorkerConfig.DEFAULT_CATEGORY))
+            .anyTimes();
+    rogueWorkerHolder.shutdownTask(task.getId());
+    EasyMock.replay(rogueWorkerHolder);
+    taskRunner.taskAddedOrUpdated(
+        TaskAnnouncement.create(
+            task,
+            TaskStatus.running(task.getId()),
+            TaskLocation.create("rogue-worker", 1, 2)
+        ), rogueWorkerHolder
+    );
+    Assert.assertEquals(task.getId(), 
Iterables.getOnlyElement(taskRunner.getRunningTasks()).getTaskId());
+    EasyMock.verify(rogueWorkerHolder);
+
+    // "rogue-worker" reports FAILURE for the task, ignored
+    rogueWorkerHolder = EasyMock.createMock(WorkerHolder.class);
+    EasyMock.expect(rogueWorkerHolder.getWorker())
+            .andReturn(new Worker("http", "rogue-worker", "127.0.0.1", 5, 
"v1", WorkerConfig.DEFAULT_CATEGORY))
+            .anyTimes();
+    EasyMock.replay(rogueWorkerHolder);
+    taskRunner.taskAddedOrUpdated(
+        TaskAnnouncement.create(
+            task,
+            TaskStatus.failure(task.getId(), "Dummy task status failure err 
message"),
+            TaskLocation.create("rogue-worker", 1, 2)
+        ), rogueWorkerHolder
+    );
+    Assert.assertEquals(task.getId(), 
Iterables.getOnlyElement(taskRunner.getRunningTasks()).getTaskId());
+    EasyMock.verify(rogueWorkerHolder);
+
+    // workers sends SUCCESS notification, task is marked SUCCESS now.
+    taskRunner.taskAddedOrUpdated(
+        TaskAnnouncement.create(
+            task,
+            TaskStatus.success(task.getId()),
+            TaskLocation.create("worker", 1, 2)
+        ), workerHolder
+    );
+    Assert.assertEquals(task.getId(), 
Iterables.getOnlyElement(taskRunner.getCompletedTasks()).getTaskId());
+    Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
+
+    // "rogue-worker" reports running it, and gets asked to shutdown the task
+    rogueWorkerHolder = EasyMock.createMock(WorkerHolder.class);
+    EasyMock.expect(rogueWorkerHolder.getWorker())
+            .andReturn(new Worker("http", "rogue-worker", "127.0.0.1", 5, 
"v1", WorkerConfig.DEFAULT_CATEGORY))
+            .anyTimes();
+    rogueWorkerHolder.shutdownTask(task.getId());
+    EasyMock.replay(rogueWorkerHolder);
+    taskRunner.taskAddedOrUpdated(
+        TaskAnnouncement.create(
+            task,
+            TaskStatus.running(task.getId()),
+            TaskLocation.create("rogue-worker", 1, 2)
+        ), rogueWorkerHolder
+    );
+    Assert.assertEquals(task.getId(), 
Iterables.getOnlyElement(taskRunner.getCompletedTasks()).getTaskId());
+    EasyMock.verify(rogueWorkerHolder);
+
+    // "rogue-worker" reports FAILURE for the tasks, ignored
+    rogueWorkerHolder = EasyMock.createMock(WorkerHolder.class);
+    EasyMock.expect(rogueWorkerHolder.getWorker())
+            .andReturn(new Worker("http", "rogue-worker", "127.0.0.1", 5, 
"v1", WorkerConfig.DEFAULT_CATEGORY))
+            .anyTimes();
+    EasyMock.replay(rogueWorkerHolder);
+    taskRunner.taskAddedOrUpdated(
+        TaskAnnouncement.create(
+            task,
+            TaskStatus.failure(task.getId(), "Dummy task status failure for 
testing"),
+            TaskLocation.create("rogue-worker", 1, 2)
+        ), rogueWorkerHolder
+    );
+    Assert.assertEquals(task.getId(), 
Iterables.getOnlyElement(taskRunner.getCompletedTasks()).getTaskId());
+    EasyMock.verify(rogueWorkerHolder);
+
+    Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
+
+    EasyMock.verify(workerHolder);
+
+    Assert.assertEquals(
+        ImmutableList.of(
+            ImmutableList.of(task.getId(), TaskLocation.create("worker", 1000, 
1001)),
+            ImmutableList.of(task.getId(), TaskLocation.create("worker", 1, 
2)),
+            ImmutableList.of(task.getId(), TaskStatus.success(task.getId()))
+        ),
+        listenerNotificationsAccumulator
+    );
+  }
+
+  /*
+   * Task goes from PENDING -> SUCCESS . Happens when TaskRunner is given task 
but a worker reported it being already
+   * completed with SUCCESS.
+   */
+  @Test(timeout = 60_000L)
+  public void testTaskAddedOrUpdated2() throws Exception
+  {
+    Task task = NoopTask.create();
+    List<Object> listenerNotificationsAccumulator = new ArrayList<>();
+    HttpRemoteTaskRunnerV2 taskRunner = 
createTaskRunnerForTestTaskAddedOrUpdated(
+        EasyMock.createStrictMock(TaskStorage.class),
+        listenerNotificationsAccumulator
+    );
+
+    Worker worker = new Worker("http", "localhost", "127.0.0.1", 1, "v1", 
WorkerConfig.DEFAULT_CATEGORY);
+
+    WorkerHolder workerHolder = EasyMock.createMock(WorkerHolder.class);
+    EasyMock.expect(workerHolder.getWorker()).andReturn(worker).anyTimes();
+    
EasyMock.expect(workerHolder.getState()).andReturn(WorkerHolder.State.READY).anyTimes();
+    EasyMock.expect(workerHolder.isInitialized()).andReturn(true).anyTimes();
+    EasyMock.expect(workerHolder.isEnabled()).andReturn(true).anyTimes();
+    EasyMock.expect(workerHolder.toImmutable())
+            .andReturn(new ImmutableWorkerInfo(worker, 0, ImmutableSet.of(), 
ImmutableSet.of(), DateTimes.nowUtc()))
+            .anyTimes();
+    workerHolder.setState(EasyMock.anyObject());
+    EasyMock.expectLastCall().anyTimes();
+    workerHolder.setLastCompletedTaskTime(EasyMock.anyObject());
+    workerHolder.resetContinuouslyFailedTasksCount();
+    workerHolder.setBlacklistedUntil(null);
+    
EasyMock.expect(workerHolder.getContinuouslyFailedTasksCount()).andReturn(0);
+    EasyMock.replay(workerHolder);
+
+    // Register the mock worker in the workers map so taskComplete() can find 
it
+    taskRunner.getWorkersForTestingReadOnly().put(worker.getHost(), 
workerHolder);
+
+    Future<TaskStatus> future = taskRunner.run(task);
+    Assert.assertEquals(task.getId(), 
Iterables.getOnlyElement(taskRunner.getPendingTasks()).getTaskId());
+
+    taskRunner.taskAddedOrUpdated(
+        TaskAnnouncement.create(
+            task,
+            TaskStatus.success(task.getId()),
+            TaskLocation.create("worker", 1, 2)
+        ), workerHolder
+    );
+    Assert.assertEquals(task.getId(), 
Iterables.getOnlyElement(taskRunner.getCompletedTasks()).getTaskId());
+
+    Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
+
+    EasyMock.verify(workerHolder);
+
+    Assert.assertEquals(
+        ImmutableList.of(
+            ImmutableList.of(task.getId(), TaskLocation.create("worker", 1, 
2)),
+            ImmutableList.of(task.getId(), TaskStatus.success(task.getId()))
+        ),
+        listenerNotificationsAccumulator
+    );
+  }
+
+  /*
+   * Notifications received for tasks not known to TaskRunner maybe known to 
TaskStorage.
+   * This could happen when TaskRunner starts and workers reports 
running/completed tasks on them.
+   */
+  @Test(timeout = 60_000L)
+  public void testTaskAddedOrUpdated3()
+  {
+    Task task1 = NoopTask.create();
+    Task task2 = NoopTask.create();
+    Task task3 = NoopTask.create();
+    Task task4 = NoopTask.create();
+    Task task5 = NoopTask.create();
+    Task task6 = NoopTask.create();
+
+    TaskStorage taskStorage = EasyMock.createMock(TaskStorage.class);
+    
EasyMock.expect(taskStorage.getStatus(task1.getId())).andReturn(Optional.of(TaskStatus.running(task1.getId())));
+    
EasyMock.expect(taskStorage.getStatus(task2.getId())).andReturn(Optional.of(TaskStatus.running(task2.getId())));
+    
EasyMock.expect(taskStorage.getStatus(task3.getId())).andReturn(Optional.of(TaskStatus.success(task3.getId())));
+    
EasyMock.expect(taskStorage.getStatus(task4.getId())).andReturn(Optional.of(TaskStatus.success(task4.getId())));
+    
EasyMock.expect(taskStorage.getStatus(task5.getId())).andReturn(Optional.absent());
+    
EasyMock.expect(taskStorage.getStatus(task6.getId())).andReturn(Optional.absent());
+    EasyMock.replay(taskStorage);
+
+    List<Object> listenerNotificationsAccumulator = new ArrayList<>();
+    HttpRemoteTaskRunnerV2 taskRunner =
+        createTaskRunnerForTestTaskAddedOrUpdated(taskStorage, 
listenerNotificationsAccumulator);
+
+    Worker worker = new Worker("http", "localhost", "127.0.0.1", 1, "v1", 
WorkerConfig.DEFAULT_CATEGORY);
+
+    WorkerHolder workerHolder = EasyMock.createMock(WorkerHolder.class);
+    EasyMock.expect(workerHolder.getWorker()).andReturn(worker).anyTimes();
+    
EasyMock.expect(workerHolder.getState()).andReturn(WorkerHolder.State.READY).anyTimes();
+    EasyMock.expect(workerHolder.isInitialized()).andReturn(true).anyTimes();
+    EasyMock.expect(workerHolder.isEnabled()).andReturn(true).anyTimes();
+    EasyMock.expect(workerHolder.toImmutable())
+            .andReturn(new ImmutableWorkerInfo(worker, 0, ImmutableSet.of(), 
ImmutableSet.of(), DateTimes.nowUtc()))
+            .anyTimes();
+    workerHolder.setState(EasyMock.anyObject());
+    EasyMock.expectLastCall().anyTimes();
+    workerHolder.setLastCompletedTaskTime(EasyMock.anyObject());
+    workerHolder.resetContinuouslyFailedTasksCount();
+    workerHolder.setBlacklistedUntil(null);
+    
EasyMock.expect(workerHolder.getContinuouslyFailedTasksCount()).andReturn(0);
+    workerHolder.shutdownTask(task3.getId());
+    workerHolder.shutdownTask(task5.getId());
+    EasyMock.replay(workerHolder);
+
+    // Register the mock worker in the workers map so taskComplete() can find 
it
+    taskRunner.getWorkersForTestingReadOnly().put(worker.getHost(), 
workerHolder);
+
+    Assert.assertEquals(0, taskRunner.getKnownTasks().size());
+
+    taskRunner.taskAddedOrUpdated(
+        TaskAnnouncement.create(
+            task1,
+            TaskStatus.running(task1.getId()),
+            TaskLocation.create("worker", 1, 2)
+        ), workerHolder
+    );
+
+    taskRunner.taskAddedOrUpdated(
+        TaskAnnouncement.create(
+            task2,
+            TaskStatus.success(task2.getId()),
+            TaskLocation.create("worker", 3, 4)
+        ), workerHolder
+    );
+
+    taskRunner.taskAddedOrUpdated(
+        TaskAnnouncement.create(
+            task3,
+            TaskStatus.running(task3.getId()),
+            TaskLocation.create("worker", 5, 6)
+        ), workerHolder
+    );
+
+    taskRunner.taskAddedOrUpdated(
+        TaskAnnouncement.create(
+            task4,
+            TaskStatus.success(task4.getId()),
+            TaskLocation.create("worker", 7, 8)
+        ), workerHolder
+    );
+
+    taskRunner.taskAddedOrUpdated(
+        TaskAnnouncement.create(
+            task5,
+            TaskStatus.running(task5.getId()),
+            TaskLocation.create("worker", 9, 10)
+        ), workerHolder
+    );
+
+    taskRunner.taskAddedOrUpdated(
+        TaskAnnouncement.create(
+            task6,
+            TaskStatus.success(task6.getId()),
+            TaskLocation.create("worker", 11, 12)
+        ), workerHolder
+    );
+
+    EasyMock.verify(workerHolder, taskStorage);
+
+    Assert.assertEquals(
+        ImmutableList.of(
+            ImmutableList.of(task1.getId(), TaskLocation.create("worker", 1, 
2)),
+            ImmutableList.of(task2.getId(), TaskLocation.create("worker", 3, 
4)),
+            ImmutableList.of(task2.getId(), TaskStatus.success(task2.getId()))
+        ),
+        listenerNotificationsAccumulator
+    );
+  }
+
+  @Test(timeout = 60_000L)
+  public void testTimeoutInAssigningTasks() throws Exception
+  {
+    TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
+    DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = 
EasyMock.createMock(DruidNodeDiscoveryProvider.class);
+    
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
+            .andReturn(druidNodeDiscovery);
+    EasyMock.replay(druidNodeDiscoveryProvider);
+
+    HttpRemoteTaskRunnerV2 taskRunner = new HttpRemoteTaskRunnerV2(
+        TestHelper.makeJsonMapper(),
+        new HttpRemoteTaskRunnerConfig()
+        {
+          @Override
+          public int getPendingTasksRunnerNumThreads()
+          {
+            return 1;
+          }
+
+          @Override
+          public Period getTaskAssignmentTimeout()
+          {
+            return new Period("PT1S");
+          }
+        },
+        EasyMock.createNiceMock(HttpClient.class),
+        DSuppliers.of(new 
AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())),
+        new NoopProvisioningStrategy<>(),
+        druidNodeDiscoveryProvider,
+        EasyMock.createNiceMock(TaskStorage.class),
+        new NoopServiceEmitter()
+    )
+    {
+      @Override
+      protected WorkerHolder createWorkerHolder(
+          ObjectMapper smileMapper,
+          HttpClient httpClient,
+          HttpRemoteTaskRunnerConfig config,
+          ScheduledExecutorService workersSyncExec,
+          WorkerHolder.Listener listener,
+          Worker worker,
+          List<TaskAnnouncement> knownAnnouncements
+      )
+      {
+        return new WorkerHolder(
+            smileMapper,
+            httpClient,
+            config,
+            workersSyncExec,
+            listener,
+            worker,
+            ImmutableList.of()
+        )
+        {
+          @Override
+          public void start()
+          {
+            disabled.set(false);
+          }
+
+          @Override
+          public void stop()
+          {
+          }
+
+          @Override
+          public boolean isInitialized()
+          {
+            return true;
+          }
+
+          @Override
+          public void waitForInitialization()
+          {
+          }
+
+          @Override
+          public boolean assignTask(Task task)
+          {
+            // Always returns true
+            return true;
+          }
+
+          @Override
+          public void shutdownTask(String taskId)
+          {
+          }
+        };
+      }
+    };
+
+    taskRunner.start();
+
+    DiscoveryDruidNode druidNode1 = new DiscoveryDruidNode(
+        new DruidNode("service", "host1", false, 8080, null, true, false),
+        NodeRole.MIDDLE_MANAGER,
+        ImmutableMap.of(
+            WorkerNodeService.DISCOVERY_SERVICE_KEY, new 
WorkerNodeService("ip1", 2, "0", WorkerConfig.DEFAULT_CATEGORY)
+        )
+    );
+
+    
druidNodeDiscovery.getListeners().get(0).nodesAdded(ImmutableList.of(druidNode1));
+
+    Future<TaskStatus> future = taskRunner.run(NoopTask.create());
+    Assert.assertTrue(future.get().isFailure());
+    Assert.assertNotNull(future.get().getErrorMsg());
+    Assert.assertTrue(
+        future.get().getErrorMsg().startsWith("The worker that this task is 
assigned did not start it in timeout")
+    );
+  }
+
+  @Test(timeout = 60_000L)
+  public void testExceptionThrownInAssigningTasks() throws Exception
+  {
+    TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
+    DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = 
EasyMock.createMock(DruidNodeDiscoveryProvider.class);
+    
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
+            .andReturn(druidNodeDiscovery);
+    EasyMock.replay(druidNodeDiscoveryProvider);
+
+    HttpRemoteTaskRunnerV2 taskRunner = new HttpRemoteTaskRunnerV2(
+        TestHelper.makeJsonMapper(),
+        new HttpRemoteTaskRunnerConfig()
+        {
+          @Override
+          public int getPendingTasksRunnerNumThreads()
+          {
+            return 1;
+          }
+
+          @Override
+          public Period getTaskAssignmentTimeout()
+          {
+            return new Period("PT1S");
+          }
+        },
+        EasyMock.createNiceMock(HttpClient.class),
+        DSuppliers.of(new 
AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())),
+        new NoopProvisioningStrategy<>(),
+        druidNodeDiscoveryProvider,
+        EasyMock.createNiceMock(TaskStorage.class),
+        new NoopServiceEmitter()
+    )
+    {
+      @Override
+      protected WorkerHolder createWorkerHolder(
+          ObjectMapper smileMapper,
+          HttpClient httpClient,
+          HttpRemoteTaskRunnerConfig config,
+          ScheduledExecutorService workersSyncExec,
+          WorkerHolder.Listener listener,
+          Worker worker,
+          List<TaskAnnouncement> knownAnnouncements
+      )
+      {
+        return new WorkerHolder(
+            smileMapper,
+            httpClient,
+            config,
+            workersSyncExec,
+            listener,
+            worker,
+            ImmutableList.of()
+        )
+        {
+          @Override
+          public void start()
+          {
+            disabled.set(false);
+          }
+
+          @Override
+          public void stop()
+          {
+          }
+
+          @Override
+          public boolean isInitialized()
+          {
+            return true;
+          }
+
+          @Override
+          public void waitForInitialization()
+          {
+          }
+
+          @Override
+          public boolean assignTask(Task task)
+          {
+            throw new RuntimeException("Assign failure test");
+          }
+
+          @Override
+          public void shutdownTask(String taskId)
+          {
+          }
+        };
+      }
+    };
+
+    taskRunner.start();
+
+    DiscoveryDruidNode druidNode1 = new DiscoveryDruidNode(
+        new DruidNode("service", "host1", false, 8080, null, true, false),
+        NodeRole.MIDDLE_MANAGER,
+        ImmutableMap.of(
+            WorkerNodeService.DISCOVERY_SERVICE_KEY, new 
WorkerNodeService("ip1", 2, "0", WorkerConfig.DEFAULT_CATEGORY)
+        )
+    );
+
+    
druidNodeDiscovery.getListeners().get(0).nodesAdded(ImmutableList.of(druidNode1));
+
+    Future<TaskStatus> future = taskRunner.run(NoopTask.create());
+    Assert.assertTrue(future.get().isFailure());
+    Assert.assertNotNull(future.get().getErrorMsg());
+    Assert.assertTrue(
+        StringUtils.format("Actual message is: %s", 
future.get().getErrorMsg()),
+        future.get().getErrorMsg().startsWith("Failed to assign this task")
+    );
+  }
+
+  /**
+   * Validate the internal state of tasks within the task runner
+   * when shutdown is called on pending / running tasks and completed tasks
+   */
+  @Test(timeout = 60_000L)
+  public void testShutdown()
+  {
+    List<Object> listenerNotificationsAccumulator = new ArrayList<>();
+    HttpRemoteTaskRunnerV2 taskRunner = 
createTaskRunnerForTestTaskAddedOrUpdated(
+        EasyMock.createStrictMock(TaskStorage.class),
+        listenerNotificationsAccumulator
+    );
+
+    Worker worker = new Worker("http", "localhost", "127.0.0.1", 1, "v1", 
WorkerConfig.DEFAULT_CATEGORY);
+
+    WorkerHolder workerHolder = EasyMock.createMock(WorkerHolder.class);
+    EasyMock.expect(workerHolder.getWorker()).andReturn(worker).anyTimes();
+    workerHolder.setLastCompletedTaskTime(EasyMock.anyObject());
+    workerHolder.resetContinuouslyFailedTasksCount();
+    
EasyMock.expect(workerHolder.getContinuouslyFailedTasksCount()).andReturn(0);
+    EasyMock.replay(workerHolder);
+
+    taskRunner.start();
+
+    Task pendingTask = NoopTask.create();
+    taskRunner.run(pendingTask);
+    // Pending task is not cleaned up immediately
+    taskRunner.shutdown(pendingTask.getId(), "Forced shutdown");
+    Assert.assertTrue(taskRunner.getKnownTasks()
+                                .stream()
+                                .map(TaskRunnerWorkItem::getTaskId)
+                                .collect(Collectors.toSet())
+                                .contains(pendingTask.getId())
+    );
+
+    Task completedTask = NoopTask.create();
+    taskRunner.run(completedTask);
+    taskRunner.taskAddedOrUpdated(
+        TaskAnnouncement.create(
+            completedTask,
+            TaskStatus.success(completedTask.getId()),
+            TaskLocation.create("worker", 1, 2)
+        ), workerHolder
+    );
+    Assert.assertEquals(completedTask.getId(), 
Iterables.getOnlyElement(taskRunner.getCompletedTasks()).getTaskId());
+    TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
+    DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = 
EasyMock.createMock(DruidNodeDiscoveryProvider.class);
+    
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
+            .andReturn(druidNodeDiscovery);
+    EasyMock.replay(druidNodeDiscoveryProvider);
+
+
+    // Completed tasks are cleaned up when shutdown is invokded on them (by 
TaskQueue)
+    taskRunner.shutdown(completedTask.getId(), "Cleanup");
+    Assert.assertFalse(taskRunner.getKnownTasks()
+                                 .stream()
+                                 .map(TaskRunnerWorkItem::getTaskId)
+                                 .collect(Collectors.toSet())
+                                 .contains(completedTask.getId())
+    );
+
+  }
+
+  @Test(timeout = 60_000L)
+  public void testSyncMonitoring_finiteIteration()
+  {
+    TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
+    DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = 
EasyMock.createMock(DruidNodeDiscoveryProvider.class);
+    
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
+            .andReturn(druidNodeDiscovery);
+    EasyMock.replay(druidNodeDiscoveryProvider);
+
+    HttpRemoteTaskRunnerV2 taskRunner = new HttpRemoteTaskRunnerV2(
+        TestHelper.makeJsonMapper(),
+        new HttpRemoteTaskRunnerConfig(),
+        EasyMock.createNiceMock(HttpClient.class),
+        DSuppliers.of(new 
AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())),
+        new NoopProvisioningStrategy<>(),
+        druidNodeDiscoveryProvider,
+        EasyMock.createMock(TaskStorage.class),
+        new NoopServiceEmitter()
+    )
+    {
+      @Override
+      protected WorkerHolder createWorkerHolder(
+          ObjectMapper smileMapper,
+          HttpClient httpClient,
+          HttpRemoteTaskRunnerConfig config,
+          ScheduledExecutorService workersSyncExec,
+          WorkerHolder.Listener listener,
+          Worker worker,
+          List<TaskAnnouncement> knownAnnouncements
+      )
+      {
+        return createNonSyncingWorkerHolder(worker);
+      }
+    };
+
+    taskRunner.start();
+    taskRunner.addWorker(createWorker("abc"));
+    taskRunner.addWorker(createWorker("xyz"));
+    taskRunner.addWorker(createWorker("lol"));
+    Assert.assertEquals(3, taskRunner.getWorkerSyncerDebugInfo().size());
+    taskRunner.syncMonitoring();
+    Assert.assertEquals(3, taskRunner.getWorkerSyncerDebugInfo().size());
+  }
+
+  @Test(timeout = 60_000L)
+  public void testGetMaximumCapacity_noWorkerConfig()
+  {
+    TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
+    DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = 
EasyMock.createMock(DruidNodeDiscoveryProvider.class);
+    
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
+            .andReturn(druidNodeDiscovery);
+    EasyMock.replay(druidNodeDiscoveryProvider);
+
+    HttpRemoteTaskRunnerV2 taskRunner = new HttpRemoteTaskRunnerV2(
+        TestHelper.makeJsonMapper(),
+        new HttpRemoteTaskRunnerConfig(),
+        EasyMock.createNiceMock(HttpClient.class),
+        DSuppliers.of(new AtomicReference<>(null)),
+        new TestProvisioningStrategy<>(),
+        druidNodeDiscoveryProvider,
+        EasyMock.createMock(TaskStorage.class),
+        new NoopServiceEmitter()
+    );
+    Assert.assertEquals(-1, taskRunner.getMaximumCapacityWithAutoscale());
+  }
+
+  @Test(timeout = 60_000L)
+  public void testGetMaximumCapacity_noAutoScaler()
+  {
+    TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
+    DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = 
EasyMock.createMock(DruidNodeDiscoveryProvider.class);
+    
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
+            .andReturn(druidNodeDiscovery);
+    EasyMock.replay(druidNodeDiscoveryProvider);
+
+    HttpRemoteTaskRunnerV2 taskRunner = new HttpRemoteTaskRunnerV2(
+        TestHelper.makeJsonMapper(),
+        new HttpRemoteTaskRunnerConfig(),
+        EasyMock.createNiceMock(HttpClient.class),
+        DSuppliers.of(new AtomicReference<>(new DefaultWorkerBehaviorConfig(
+            new EqualDistributionWorkerSelectStrategy(
+                null,
+                null
+            ), null
+        ))),
+        new TestProvisioningStrategy<>(),
+        druidNodeDiscoveryProvider,
+        EasyMock.createMock(TaskStorage.class),
+        new NoopServiceEmitter()
+    );
+    Assert.assertEquals(-1, taskRunner.getMaximumCapacityWithAutoscale());
+  }
+
+  @Test(timeout = 60_000L)
+  public void testGetMaximumCapacity_withAutoScaler()
+  {
+    TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
+    DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = 
EasyMock.createMock(DruidNodeDiscoveryProvider.class);
+    
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
+            .andReturn(druidNodeDiscovery);
+    EasyMock.replay(druidNodeDiscoveryProvider);
+
+    HttpRemoteTaskRunnerV2 taskRunner = new HttpRemoteTaskRunnerV2(
+        TestHelper.makeJsonMapper(),
+        new HttpRemoteTaskRunnerConfig(),
+        EasyMock.createNiceMock(HttpClient.class),
+        DSuppliers.of(new 
AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())),
+        new TestProvisioningStrategy<>(),
+        druidNodeDiscoveryProvider,
+        EasyMock.createMock(TaskStorage.class),
+        new NoopServiceEmitter()
+    );
+    // Default autoscaler has max workers of 0
+    Assert.assertEquals(0, taskRunner.getMaximumCapacityWithAutoscale());
+  }
+
+  public static HttpRemoteTaskRunnerV2 
createTaskRunnerForTestTaskAddedOrUpdated(
+      TaskStorage taskStorage,
+      List<Object> listenerNotificationsAccumulator
+  )
+  {
+    TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
+    DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = 
EasyMock.createMock(DruidNodeDiscoveryProvider.class);
+    
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
+            .andReturn(druidNodeDiscovery);
+    EasyMock.replay(druidNodeDiscoveryProvider);
+
+    // Create HttpClient mock that returns a successful status response for 
worker syncing
+    HttpClient httpClient = EasyMock.createNiceMock(HttpClient.class);
+    EasyMock.expect(httpClient.go(
+        EasyMock.anyObject(),
+        EasyMock.anyObject(),
+        EasyMock.anyObject()
+    )).andReturn(Futures.immediateFuture(
+        new StatusResponseHolder(HttpResponseStatus.OK, new StringBuilder())
+    )).anyTimes();
+    EasyMock.replay(httpClient);
+
+    HttpRemoteTaskRunnerV2 taskRunner = new HttpRemoteTaskRunnerV2(
+        TestHelper.makeJsonMapper(),
+        new HttpRemoteTaskRunnerConfig()
+        {
+          @Override
+          public int getPendingTasksRunnerNumThreads()
+          {
+            return 3;
+          }
+        },
+        httpClient,
+        DSuppliers.of(new 
AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())),
+        new NoopProvisioningStrategy<>(),
+        druidNodeDiscoveryProvider,
+        taskStorage,
+        new NoopServiceEmitter()
+    );
+
+    taskRunner.start();
+
+    if (listenerNotificationsAccumulator != null) {
+      taskRunner.registerListener(
+          new TaskRunnerListener()
+          {
+            @Override
+            public String getListenerId()
+            {
+              return "test-listener";
+            }
+
+            @Override
+            public void locationChanged(String taskId, TaskLocation 
newLocation)
+            {
+              listenerNotificationsAccumulator.add(ImmutableList.of(taskId, 
newLocation));
+            }
+
+            @Override
+            public void statusChanged(String taskId, TaskStatus status)
+            {
+              listenerNotificationsAccumulator.add(ImmutableList.of(taskId, 
status));
+            }
+          },
+          Execs.directExecutor()
+      );
+    }
+
+    return taskRunner;
+  }
+
+  private Worker createWorker(String host)
+  {
+    Worker worker = EasyMock.createMock(Worker.class);
+    EasyMock.expect(worker.getHost()).andReturn(host).anyTimes();
+    EasyMock.replay(worker);
+    return worker;
+  }
+
+  private WorkerHolder createNonSyncingWorkerHolder(Worker worker)
+  {
+    ChangeRequestHttpSyncer syncer = 
EasyMock.createMock(ChangeRequestHttpSyncer.class);
+    EasyMock.expect(syncer.needsReset()).andReturn(true).anyTimes();
+    
EasyMock.expect(syncer.getDebugInfo()).andReturn(Collections.emptyMap()).anyTimes();
+    WorkerHolder workerHolder = EasyMock.createMock(WorkerHolder.class);
+    
EasyMock.expect(workerHolder.getUnderlyingSyncer()).andReturn(syncer).anyTimes();
+    EasyMock.expect(workerHolder.getWorker()).andReturn(worker).anyTimes();
+    workerHolder.start();
+    EasyMock.expectLastCall();
+    workerHolder.stop();
+    EasyMock.expectLastCall();
+    EasyMock.replay(syncer, workerHolder);
+    return workerHolder;
+  }
+
+  private static WorkerHolder createWorkerHolder(
+      ObjectMapper smileMapper,
+      HttpClient httpClient,
+      HttpRemoteTaskRunnerConfig config,
+      ScheduledExecutorService workersSyncExec,
+      WorkerHolder.Listener listener,
+      Worker worker,
+      List<TaskAnnouncement> knownAnnouncements,
+
+      // simulates task announcements received from worker on first sync call 
for the tasks that are already
+      // running/completed on the worker.
+      List<TaskAnnouncement> preExistingTaskAnnouncements,
+
+      // defines behavior for what to do when a particular task is assigned
+      Map<Task, List<TaskAnnouncement>> toBeAssignedTasks,
+
+      // incremented on each runnable completion in workersSyncExec, useful 
for deterministically watching that some
+      // work completed
+      AtomicInteger ticks,
+
+      // Updated each time a shutdown(taskId) call is received, useful for 
asserting that expected shutdowns indeed
+      // happened.
+      Set<String> actualShutdowns
+  )
+  {
+    return new WorkerHolder(smileMapper, httpClient, config, workersSyncExec, 
listener, worker, knownAnnouncements)
+    {
+      private final String workerHost;
+      private final int workerPort;
+      private final LifecycleLock startStopLock = new LifecycleLock();
+
+      {
+        String hostAndPort = worker.getHost();
+        int colonIndex = hostAndPort.indexOf(':');
+        if (colonIndex == -1) {
+          throw new IAE("Invalid host and port: [%s]", colonIndex);
+        }
+        workerHost = hostAndPort.substring(0, colonIndex);
+        workerPort = Integer.parseInt(hostAndPort.substring(colonIndex + 1));

Review Comment:
   ## Missing catch of NumberFormatException
   
   Potential uncaught 'java.lang.NumberFormatException'.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/10535)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to