This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new bb1a6def9d Task queue unblock (#12099)
bb1a6def9d is described below

commit bb1a6def9dee55528907999e99ecf1b6c0c5f741
Author: Jason Koch <[email protected]>
AuthorDate: Sat May 14 16:44:29 2022 -0700

    Task queue unblock (#12099)
    
    * concurrency: introduce GuardedBy to TaskQueue
    
    * perf: Introduce TaskQueueScaleTest to test performance of TaskQueue with 
large task counts
    
    This introduces a test case to confirm how long it will take to launch and 
manage (aka shutdown)
    a large number of threads in the TaskQueue.
    
    h/t to @gianm for main implementation.
    
    * perf: improve scalability of TaskQueue with large task counts
    
    * linter fixes, expand test coverage
    
    * pr feedback suggestion; swap to different linter
    
    * swap to use SuppressWarnings
    
    * Fix TaskQueueScaleTest.
    
    Co-authored-by: Gian Merlino <[email protected]>
---
 .../apache/druid/indexing/overlord/TaskQueue.java  | 269 +++++++----
 .../indexing/overlord/TaskQueueScaleTest.java      | 494 +++++++++++++++++++++
 2 files changed, 688 insertions(+), 75 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index a506f33521..50d5d8b4fc 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -28,6 +28,8 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.druid.annotations.SuppressFBWarnings;
 import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.Counters;
@@ -53,9 +55,12 @@ import org.apache.druid.utils.CollectionUtils;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -63,7 +68,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
@@ -79,8 +83,11 @@ import java.util.stream.Collectors;
 public class TaskQueue
 {
   private final long MANAGEMENT_WAIT_TIMEOUT_NANOS = 
TimeUnit.SECONDS.toNanos(60);
+  private final long MIN_WAIT_TIME_MS = 100;
 
+  @GuardedBy("giant")
   private final List<Task> tasks = new ArrayList<>();
+  @GuardedBy("giant")
   private final Map<String, ListenableFuture<TaskStatus>> taskFutures = new 
HashMap<>();
 
   private final TaskLockConfig lockConfig;
@@ -93,7 +100,8 @@ public class TaskQueue
   private final ServiceEmitter emitter;
 
   private final ReentrantLock giant = new ReentrantLock(true);
-  private final Condition managementMayBeNecessary = giant.newCondition();
+  @SuppressWarnings("MismatchedQueryAndUpdateOfCollection")
+  private final BlockingQueue<Object> managementMayBeNecessary = new 
ArrayBlockingQueue<>(8);
   private final ExecutorService managerExec = 
Executors.newSingleThreadExecutor(
       new ThreadFactoryBuilder()
           .setDaemon(false)
@@ -111,7 +119,9 @@ public class TaskQueue
 
   private final ConcurrentHashMap<String, AtomicLong> totalSuccessfulTaskCount 
= new ConcurrentHashMap<>();
   private final ConcurrentHashMap<String, AtomicLong> totalFailedTaskCount = 
new ConcurrentHashMap<>();
+  @GuardedBy("totalSuccessfulTaskCount")
   private Map<String, Long> prevTotalSuccessfulTaskCount = new HashMap<>();
+  @GuardedBy("totalFailedTaskCount")
   private Map<String, Long> prevTotalFailedTaskCount = new HashMap<>();
 
   public TaskQueue(
@@ -207,7 +217,7 @@ public class TaskQueue
             }
           }
       );
-      managementMayBeNecessary.signalAll();
+      requestManagement();
     }
     finally {
       giant.unlock();
@@ -228,7 +238,7 @@ public class TaskQueue
       active = false;
       managerExec.shutdownNow();
       storageSyncExec.shutdownNow();
-      managementMayBeNecessary.signalAll();
+      requestManagement();
     }
     finally {
       giant.unlock();
@@ -240,6 +250,52 @@ public class TaskQueue
     return active;
   }
 
+  /**
+   * Request management from the management thread. Non-blocking.
+   *
+   * Other callers (such as notifyStatus) should trigger activity on the
+   * TaskQueue thread by requesting management here.
+   */
+  void requestManagement()
+  {
+    // use a BlockingQueue since the offer/poll/wait behaviour is simple
+    // and very easy to reason about
+
+    // the request has to be offer (non blocking), since someone might request
+    // while already holding giant lock
+
+    // do not care if the item fits into the queue:
+    // if the queue is already full, request has been triggered anyway
+    managementMayBeNecessary.offer(this);
+  }
+
+  /**
+   * Await for an event to manage.
+   *
+   * This should only be called from the management thread to wait for 
activity.
+   *
+   * @param nanos
+   * @throws InterruptedException
+   */
+  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification = 
"using queue as notification mechanism, result has no value")
+  void awaitManagementNanos(long nanos) throws InterruptedException
+  {
+    // mitigate a busy loop, it can get pretty busy when there are a lot of 
start/stops
+    try {
+      Thread.sleep(MIN_WAIT_TIME_MS);
+    }
+    catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+
+    // wait for an item, if an item arrives (or is already available), 
complete immediately
+    // (does not actually matter what the item is)
+    managementMayBeNecessary.poll(nanos - 
(TimeUnit.MILLISECONDS.toNanos(MIN_WAIT_TIME_MS)), TimeUnit.NANOSECONDS);
+
+    // there may have been multiple requests, clear them all
+    managementMayBeNecessary.clear();
+  }
+
   /**
    * Main task runner management loop. Meant to run forever, or, at least 
until we're stopped.
    */
@@ -252,31 +308,54 @@ public class TaskQueue
     taskRunner.restore();
 
     while (active) {
-      giant.lock();
+      manageInternal();
 
-      try {
-        manageInternal();
-        // awaitNanos because management may become necessary without this 
condition signalling,
-        // due to e.g. tasks becoming ready when other folks mess with the 
TaskLockbox.
-        managementMayBeNecessary.awaitNanos(MANAGEMENT_WAIT_TIMEOUT_NANOS);
-      }
-      finally {
-        giant.unlock();
-      }
+      // awaitNanos because management may become necessary without this 
condition signalling,
+      // due to e.g. tasks becoming ready when other folks mess with the 
TaskLockbox.
+      awaitManagementNanos(MANAGEMENT_WAIT_TIMEOUT_NANOS);
     }
   }
 
   @VisibleForTesting
   void manageInternal()
+  {
+    Set<String> knownTaskIds = new HashSet<>();
+    Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures = new 
HashMap<>();
+
+    giant.lock();
+
+    try {
+      manageInternalCritical(knownTaskIds, runnerTaskFutures);
+    }
+    finally {
+      giant.unlock();
+    }
+
+    manageInternalPostCritical(knownTaskIds, runnerTaskFutures);
+  }
+
+
+  /**
+   * Management loop critical section tasks.
+   *
+   * @param knownTaskIds will be modified - filled with known task IDs
+   * @param runnerTaskFutures will be modified - filled with futures related 
to getting the running tasks
+   */
+  @GuardedBy("giant")
+  private void manageInternalCritical(
+      final Set<String> knownTaskIds,
+      final Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures
+  )
   {
     // Task futures available from the taskRunner
-    final Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures = new 
HashMap<>();
     for (final TaskRunnerWorkItem workItem : taskRunner.getKnownTasks()) {
       runnerTaskFutures.put(workItem.getTaskId(), workItem.getResult());
     }
     // Attain futures for all active tasks (assuming they are ready to run).
     // Copy tasks list, as notifyStatus may modify it.
     for (final Task task : ImmutableList.copyOf(tasks)) {
+      knownTaskIds.add(task.getId());
+
       if (!taskFutures.containsKey(task.getId())) {
         final ListenableFuture<TaskStatus> runnerTaskFuture;
         if (runnerTaskFutures.containsKey(task.getId())) {
@@ -317,11 +396,15 @@ public class TaskQueue
         taskRunner.run(task);
       }
     }
+  }
+
+  @VisibleForTesting
+  private void manageInternalPostCritical(
+      final Set<String> knownTaskIds,
+      final Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures
+  )
+  {
     // Kill tasks that shouldn't be running
-    final Set<String> knownTaskIds = tasks
-        .stream()
-        .map(Task::getId)
-        .collect(Collectors.toSet());
     final Set<String> tasksToKill = 
Sets.difference(runnerTaskFutures.keySet(), knownTaskIds);
     if (!tasksToKill.isEmpty()) {
       log.info("Asking taskRunner to clean up %,d tasks.", tasksToKill.size());
@@ -387,7 +470,7 @@ public class TaskQueue
       // insert the task into our queue. So don't catch it.
       taskStorage.insert(task, TaskStatus.running(task.getId()));
       addTaskInternal(task);
-      managementMayBeNecessary.signalAll();
+      requestManagement();
       return true;
     }
     finally {
@@ -396,6 +479,7 @@ public class TaskQueue
   }
 
   // Should always be called after taking giantLock
+  @GuardedBy("giant")
   private void addTaskInternal(final Task task)
   {
     tasks.add(task);
@@ -403,6 +487,7 @@ public class TaskQueue
   }
 
   // Should always be called after taking giantLock
+  @GuardedBy("giant")
   private void removeTaskInternal(final Task task)
   {
     taskLockbox.remove(task);
@@ -473,30 +558,33 @@ public class TaskQueue
    */
   private void notifyStatus(final Task task, final TaskStatus taskStatus, 
String reasonFormat, Object... args)
   {
-    giant.lock();
+    Preconditions.checkNotNull(task, "task");
+    Preconditions.checkNotNull(taskStatus, "status");
+    Preconditions.checkState(active, "Queue is not active!");
+    Preconditions.checkArgument(
+        task.getId().equals(taskStatus.getId()),
+        "Mismatching task ids[%s/%s]",
+        task.getId(),
+        taskStatus.getId()
+    );
 
+    // Inform taskRunner that this task can be shut down
     TaskLocation taskLocation = TaskLocation.unknown();
+    try {
+      taskLocation = taskRunner.getTaskLocation(task.getId());
+      taskRunner.shutdown(task.getId(), reasonFormat, args);
+    }
+    catch (Exception e) {
+      log.warn(e, "TaskRunner failed to cleanup task after completion: %s", 
task.getId());
+    }
+
+    int removed = 0;
 
+    ///////// critical section
+
+    giant.lock();
     try {
-      Preconditions.checkNotNull(task, "task");
-      Preconditions.checkNotNull(taskStatus, "status");
-      Preconditions.checkState(active, "Queue is not active!");
-      Preconditions.checkArgument(
-          task.getId().equals(taskStatus.getId()),
-          "Mismatching task ids[%s/%s]",
-          task.getId(),
-          taskStatus.getId()
-      );
-      // Inform taskRunner that this task can be shut down
-      try {
-        taskLocation = taskRunner.getTaskLocation(task.getId());
-        taskRunner.shutdown(task.getId(), reasonFormat, args);
-      }
-      catch (Exception e) {
-        log.warn(e, "TaskRunner failed to cleanup task after completion: %s", 
task.getId());
-      }
       // Remove from running tasks
-      int removed = 0;
       for (int i = tasks.size() - 1; i >= 0; i--) {
         if (tasks.get(i).getId().equals(task.getId())) {
           removed++;
@@ -504,36 +592,39 @@ public class TaskQueue
           break;
         }
       }
-      if (removed == 0) {
-        log.warn("Unknown task completed: %s", task.getId());
-      } else if (removed > 1) {
-        log.makeAlert("Removed multiple copies of task").addData("count", 
removed).addData("task", task.getId()).emit();
-      }
+
       // Remove from futures list
       taskFutures.remove(task.getId());
-      if (removed > 0) {
-        // If we thought this task should be running, save status to DB
-        try {
-          final Optional<TaskStatus> previousStatus = 
taskStorage.getStatus(task.getId());
-          if (!previousStatus.isPresent() || 
!previousStatus.get().isRunnable()) {
-            log.makeAlert("Ignoring notification for already-complete 
task").addData("task", task.getId()).emit();
-          } else {
-            taskStorage.setStatus(taskStatus.withLocation(taskLocation));
-            log.info("Task done: %s", task);
-            managementMayBeNecessary.signalAll();
-          }
-        }
-        catch (Exception e) {
-          log.makeAlert(e, "Failed to persist status for task")
-             .addData("task", task.getId())
-             .addData("statusCode", taskStatus.getStatusCode())
-             .emit();
-        }
-      }
     }
     finally {
       giant.unlock();
     }
+
+    ///////// end critical
+
+    if (removed == 0) {
+      log.warn("Unknown task completed: %s", task.getId());
+    }
+
+    if (removed > 0) {
+      // If we thought this task should be running, save status to DB
+      try {
+        final Optional<TaskStatus> previousStatus = 
taskStorage.getStatus(task.getId());
+        if (!previousStatus.isPresent() || !previousStatus.get().isRunnable()) 
{
+          log.makeAlert("Ignoring notification for already-complete 
task").addData("task", task.getId()).emit();
+        } else {
+          taskStorage.setStatus(taskStatus.withLocation(taskLocation));
+          log.info("Task done: %s", task);
+          requestManagement();
+        }
+      }
+      catch (Exception e) {
+        log.makeAlert(e, "Failed to persist status for task")
+            .addData("task", task.getId())
+            .addData("statusCode", taskStatus.getStatusCode())
+            .emit();
+      }
+    }
   }
 
   /**
@@ -655,7 +746,7 @@ public class TaskQueue
             addedTasks.size(),
             removedTasks.size()
         );
-        managementMayBeNecessary.signalAll();
+        requestManagement();
       } else {
         log.info("Not active. Skipping storage sync.");
       }
@@ -688,22 +779,37 @@ public class TaskQueue
   public Map<String, Long> getSuccessfulTaskCount()
   {
     Map<String, Long> total = 
CollectionUtils.mapValues(totalSuccessfulTaskCount, AtomicLong::get);
-    Map<String, Long> delta = getDeltaValues(total, 
prevTotalSuccessfulTaskCount);
-    prevTotalSuccessfulTaskCount = total;
-    return delta;
+    synchronized (totalSuccessfulTaskCount) {
+      Map<String, Long> delta = getDeltaValues(total, 
prevTotalSuccessfulTaskCount);
+      prevTotalSuccessfulTaskCount = total;
+      return delta;
+    }
   }
 
   public Map<String, Long> getFailedTaskCount()
   {
     Map<String, Long> total = CollectionUtils.mapValues(totalFailedTaskCount, 
AtomicLong::get);
-    Map<String, Long> delta = getDeltaValues(total, prevTotalFailedTaskCount);
-    prevTotalFailedTaskCount = total;
-    return delta;
+    synchronized (totalFailedTaskCount) {
+      Map<String, Long> delta = getDeltaValues(total, 
prevTotalFailedTaskCount);
+      prevTotalFailedTaskCount = total;
+      return delta;
+    }
+  }
+
+  Map<String, String> getCurrentTaskDatasources()
+  {
+    giant.lock();
+    try {
+      return tasks.stream().collect(Collectors.toMap(Task::getId, 
Task::getDataSource));
+    }
+    finally {
+      giant.unlock();
+    }
   }
 
   public Map<String, Long> getRunningTaskCount()
   {
-    Map<String, String> taskDatasources = 
tasks.stream().collect(Collectors.toMap(Task::getId, Task::getDataSource));
+    Map<String, String> taskDatasources = getCurrentTaskDatasources();
     return taskRunner.getRunningTasks()
                      .stream()
                      .collect(Collectors.toMap(
@@ -715,7 +821,7 @@ public class TaskQueue
 
   public Map<String, Long> getPendingTaskCount()
   {
-    Map<String, String> taskDatasources = 
tasks.stream().collect(Collectors.toMap(Task::getId, Task::getDataSource));
+    Map<String, String> taskDatasources = getCurrentTaskDatasources();
     return taskRunner.getPendingTasks()
                      .stream()
                      .collect(Collectors.toMap(
@@ -731,13 +837,26 @@ public class TaskQueue
                                                .stream()
                                                
.map(TaskRunnerWorkItem::getTaskId)
                                                .collect(Collectors.toSet());
-    return tasks.stream().filter(task -> 
!runnerKnownTaskIds.contains(task.getId()))
-                .collect(Collectors.toMap(Task::getDataSource, task -> 1L, 
Long::sum));
+
+    giant.lock();
+    try {
+      return tasks.stream().filter(task -> 
!runnerKnownTaskIds.contains(task.getId()))
+                  .collect(Collectors.toMap(Task::getDataSource, task -> 1L, 
Long::sum));
+    }
+    finally {
+      giant.unlock();
+    }
   }
 
   @VisibleForTesting
   List<Task> getTasks()
   {
-    return tasks;
+    giant.lock();
+    try {
+      return new ArrayList<Task>(tasks);
+    }
+    finally {
+      giant.unlock();
+    }
   }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java
new file mode 100644
index 0000000000..d305b0d6c9
--- /dev/null
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.actions.TaskAction;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
+import org.apache.druid.indexing.common.config.TaskStorageConfig;
+import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
+import org.apache.druid.indexing.overlord.config.DefaultTaskConfig;
+import org.apache.druid.indexing.overlord.config.TaskLockConfig;
+import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
+import org.apache.druid.metadata.TaskLookup;
+import org.apache.druid.metadata.TestDerbyConnector;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Tests that {@link TaskQueue} is able to handle large numbers of 
concurrently-running tasks.
+ */
+public class TaskQueueScaleTest
+{
+  private static final String DATASOURCE = "ds";
+
+  private final int numTasks = 1000;
+
+  @Rule
+  public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new 
TestDerbyConnector.DerbyConnectorRule();
+
+  private TaskQueue taskQueue;
+  private TaskStorage taskStorage;
+  private TestTaskRunner taskRunner;
+  private Closer closer;
+
+  @Before
+  public void setUp()
+  {
+    EmittingLogger.registerEmitter(new NoopServiceEmitter());
+
+    closer = Closer.create();
+
+    // Be as realistic as possible; use actual classes for storage rather than 
mocks.
+    taskStorage = new HeapMemoryTaskStorage(new 
TaskStorageConfig(Period.hours(1)));
+    taskRunner = new TestTaskRunner();
+    closer.register(taskRunner::stop);
+    final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
+
+    final IndexerSQLMetadataStorageCoordinator storageCoordinator = new 
IndexerSQLMetadataStorageCoordinator(
+        jsonMapper,
+        derbyConnectorRule.metadataTablesConfigSupplier().get(),
+        derbyConnectorRule.getConnector()
+    );
+
+    final TaskActionClientFactory unsupportedTaskActionFactory =
+        task -> new TaskActionClient()
+        {
+          @Override
+          public <RetType> RetType submit(TaskAction<RetType> taskAction)
+          {
+            throw new UnsupportedOperationException();
+          }
+        };
+
+    taskQueue = new TaskQueue(
+        new TaskLockConfig(),
+        new TaskQueueConfig(null, Period.millis(1), null, null),
+        new DefaultTaskConfig(),
+        taskStorage,
+        taskRunner,
+        unsupportedTaskActionFactory, // Not used for anything serious
+        new TaskLockbox(taskStorage, storageCoordinator),
+        new NoopServiceEmitter()
+    );
+
+    taskQueue.start();
+    closer.register(taskQueue::stop);
+  }
+
+  @After
+  public void tearDown() throws Exception
+  {
+    closer.close();
+  }
+
+  @Test(timeout = 60_000L) // more than enough time if the task queue is 
efficient
+  public void doMassLaunchAndExit() throws Exception
+  {
+    Assert.assertEquals("no tasks should be running", 0, 
taskRunner.getKnownTasks().size());
+    Assert.assertEquals("no tasks should be known", 0, 
taskQueue.getTasks().size());
+    Assert.assertEquals("no tasks should be running", 0, 
taskQueue.getRunningTaskCount().size());
+
+    // Add all tasks.
+    for (int i = 0; i < numTasks; i++) {
+      final TestTask testTask = new TestTask(i, 2000L /* runtime millis */);
+      taskQueue.add(testTask);
+    }
+
+    // in theory we can get a race here, since we fetch the counts at separate 
times
+    Assert.assertEquals("all tasks should be known", numTasks, 
taskQueue.getTasks().size());
+    long runningTasks = 
taskQueue.getRunningTaskCount().values().stream().mapToLong(Long::longValue).sum();
+    long pendingTasks = 
taskQueue.getPendingTaskCount().values().stream().mapToLong(Long::longValue).sum();
+    long waitingTasks = 
taskQueue.getWaitingTaskCount().values().stream().mapToLong(Long::longValue).sum();
+    Assert.assertEquals("all tasks should be known", numTasks, (runningTasks + 
pendingTasks + waitingTasks));
+
+    // Wait for all tasks to finish.
+    final TaskLookup.CompleteTaskLookup completeTaskLookup =
+        TaskLookup.CompleteTaskLookup.of(numTasks, Duration.standardHours(1));
+
+    while (taskStorage.getTaskInfos(completeTaskLookup, DATASOURCE).size() < 
numTasks) {
+      Thread.sleep(100);
+    }
+
+    Thread.sleep(100);
+
+    Assert.assertEquals("no tasks should be active", 0, 
taskStorage.getActiveTasks().size());
+    runningTasks = 
taskQueue.getRunningTaskCount().values().stream().mapToLong(Long::longValue).sum();
+    pendingTasks = 
taskQueue.getPendingTaskCount().values().stream().mapToLong(Long::longValue).sum();
+    waitingTasks = 
taskQueue.getWaitingTaskCount().values().stream().mapToLong(Long::longValue).sum();
+    Assert.assertEquals("no tasks should be running", 0, runningTasks);
+    Assert.assertEquals("no tasks should be pending", 0, pendingTasks);
+    Assert.assertEquals("no tasks should be waiting", 0, waitingTasks);
+  }
+
+  @Test(timeout = 60_000L) // more than enough time if the task queue is 
efficient
+  public void doMassLaunchAndShutdown() throws Exception
+  {
+    Assert.assertEquals("no tasks should be running", 0, 
taskRunner.getKnownTasks().size());
+
+    // Add all tasks.
+    final List<String> taskIds = new ArrayList<>();
+    for (int i = 0; i < numTasks; i++) {
+      final TestTask testTask = new TestTask(
+          i,
+          Duration.standardHours(1).getMillis() /* very long runtime millis, 
so we can do a shutdown */
+      );
+      taskQueue.add(testTask);
+      taskIds.add(testTask.getId());
+    }
+
+    // wait for all tasks to progress to running state
+    while (taskStorage.getActiveTasks().size() < numTasks) {
+      Thread.sleep(100);
+    }
+    Assert.assertEquals("all tasks should be running", numTasks, 
taskStorage.getActiveTasks().size());
+
+    // Shut down all tasks.
+    for (final String taskId : taskIds) {
+      taskQueue.shutdown(taskId, "test shutdown");
+    }
+
+    // Wait for all tasks to finish.
+    while (!taskStorage.getActiveTasks().isEmpty()) {
+      Thread.sleep(100);
+    }
+
+    Assert.assertEquals("no tasks should be running", 0, 
taskStorage.getActiveTasks().size());
+
+    int completed = taskStorage.getTaskInfos(
+        TaskLookup.CompleteTaskLookup.of(numTasks, Duration.standardHours(1)),
+        DATASOURCE
+    ).size();
+    Assert.assertEquals("all tasks should have completed", numTasks, 
completed);
+  }
+
+  private static class TestTask extends NoopTask
+  {
+    private final int number;
+    private final long runtime;
+
+    public TestTask(int number, long runtime)
+    {
+      super(null, null, DATASOURCE, 0, 0, null, null, Collections.emptyMap());
+      this.number = number;
+      this.runtime = runtime;
+    }
+
+    public int getNumber()
+    {
+      return number;
+    }
+
+    public long getRuntimeMillis()
+    {
+      return runtime;
+    }
+  }
+
+  private static class TestTaskRunner implements TaskRunner
+  {
+    private static final Logger log = new Logger(TestTaskRunner.class);
+    private static final Duration T_PENDING_TO_RUNNING = 
Duration.standardSeconds(2);
+    private static final Duration T_SHUTDOWN_ACK = Duration.millis(8);
+    private static final Duration T_SHUTDOWN_COMPLETE = 
Duration.standardSeconds(2);
+
+    @GuardedBy("knownTasks")
+    private final Map<String, TestTaskRunnerWorkItem> knownTasks = new 
HashMap<>();
+
+    private final ScheduledExecutorService exec = ScheduledExecutors.fixed(8, 
"TaskQueueScaleTest-%s");
+
+    @Override
+    public void start()
+    {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ListenableFuture<TaskStatus> run(Task task)
+    {
+      // Production task runners generally do not take a long time to execute 
"run", but may take a long time to
+      // go from "running" to "pending".
+      synchronized (knownTasks) {
+        final TestTaskRunnerWorkItem item = 
knownTasks.computeIfAbsent(task.getId(), TestTaskRunnerWorkItem::new);
+        exec.schedule(
+            () -> {
+              try {
+                synchronized (knownTasks) {
+                  final TestTaskRunnerWorkItem item2 = 
knownTasks.get(task.getId());
+                  if (item2.getState() == RunnerTaskState.PENDING) {
+                    knownTasks.put(task.getId(), 
item2.withState(RunnerTaskState.RUNNING));
+                  }
+                }
+
+                exec.schedule(
+                    () -> {
+                      try {
+                        final TestTaskRunnerWorkItem item2;
+                        synchronized (knownTasks) {
+                          item2 = knownTasks.get(task.getId());
+                          knownTasks.put(task.getId(), 
item2.withState(RunnerTaskState.NONE));
+                        }
+                        if (item2 != null) {
+                          item2.setResult(TaskStatus.success(task.getId()));
+                        }
+                      }
+                      catch (Throwable e) {
+                        log.error(e, "Error in scheduled executor");
+                      }
+                    },
+                    ((TestTask) task).getRuntimeMillis(),
+                    TimeUnit.MILLISECONDS
+                );
+              }
+              catch (Throwable e) {
+                log.error(e, "Error in scheduled executor");
+              }
+            },
+            T_PENDING_TO_RUNNING.getMillis(),
+            TimeUnit.MILLISECONDS
+        );
+
+        return item.getResult();
+      }
+    }
+
+    @Override
+    public void shutdown(String taskid, String reason)
+    {
+      // Production task runners take a long time to execute "shutdown" if the 
task is currently running.
+      synchronized (knownTasks) {
+        if (!knownTasks.containsKey(taskid)) {
+          return;
+        }
+      }
+
+      threadSleep(T_SHUTDOWN_ACK);
+
+      final TestTaskRunnerWorkItem existingTask;
+      synchronized (knownTasks) {
+        existingTask = knownTasks.get(taskid);
+      }
+      if (!existingTask.getResult().isDone()) {
+        exec.schedule(() -> {
+          existingTask.setResult(TaskStatus.failure("taskId", "stopped"));
+          synchronized (knownTasks) {
+            knownTasks.remove(taskid);
+          }
+        }, T_SHUTDOWN_COMPLETE.getMillis(), TimeUnit.MILLISECONDS);
+      }
+    }
+
+    static void threadSleep(Duration duration)
+    {
+      try {
+        Thread.sleep(duration.getMillis());
+      }
+      catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public void registerListener(TaskRunnerListener listener, Executor 
executor)
+    {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void unregisterListener(String listenerId)
+    {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
+    {
+      // Do nothing, and return null. (TaskQueue doesn't use the return value.)
+      return null;
+    }
+
+    @Override
+    public void stop()
+    {
+      exec.shutdownNow();
+    }
+
+    @Override
+    public Collection<? extends TaskRunnerWorkItem> getRunningTasks()
+    {
+      synchronized (knownTasks) {
+        return knownTasks.values()
+                         .stream()
+                         .filter(item -> item.getState() == 
RunnerTaskState.RUNNING)
+                         .collect(Collectors.toList());
+      }
+    }
+
+    @Override
+    public Collection<? extends TaskRunnerWorkItem> getPendingTasks()
+    {
+      synchronized (knownTasks) {
+        return knownTasks.values()
+                         .stream()
+                         .filter(item -> item.getState() == 
RunnerTaskState.PENDING)
+                         .collect(Collectors.toList());
+      }
+    }
+
+    @Override
+    public Collection<? extends TaskRunnerWorkItem> getKnownTasks()
+    {
+      synchronized (knownTasks) {
+        return ImmutableList.copyOf(knownTasks.values());
+      }
+    }
+
+    @Override
+    public Optional<ScalingStats> getScalingStats()
+    {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Map<String, Long> getTotalTaskSlotCount()
+    {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Map<String, Long> getIdleTaskSlotCount()
+    {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Map<String, Long> getUsedTaskSlotCount()
+    {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Map<String, Long> getLazyTaskSlotCount()
+    {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Map<String, Long> getBlacklistedTaskSlotCount()
+    {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  private static class TestTaskRunnerWorkItem extends TaskRunnerWorkItem
+  {
+    private final RunnerTaskState state;
+
+    public TestTaskRunnerWorkItem(final String taskId)
+    {
+      this(taskId, SettableFuture.create(), RunnerTaskState.PENDING);
+    }
+
+    private TestTaskRunnerWorkItem(
+        final String taskId,
+        final ListenableFuture<TaskStatus> result,
+        final RunnerTaskState state
+    )
+    {
+      super(taskId, result);
+      this.state = state;
+    }
+
+    public RunnerTaskState getState()
+    {
+      return state;
+    }
+
+    @Override
+    public TaskLocation getLocation()
+    {
+      return TaskLocation.unknown();
+    }
+
+    @Nullable
+    @Override
+    public String getTaskType()
+    {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public String getDataSource()
+    {
+      throw new UnsupportedOperationException();
+    }
+
+    public void setResult(final TaskStatus result)
+    {
+      ((SettableFuture<TaskStatus>) getResult()).set(result);
+
+      // possibly a parallel shutdown request was issued during the
+      // shutdown time; ignore it
+    }
+
+    public TestTaskRunnerWorkItem withState(final RunnerTaskState newState)
+    {
+      return new TestTaskRunnerWorkItem(getTaskId(), getResult(), newState);
+    }
+  }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to