github-code-scanning[bot] commented on code in PR #14407:
URL: https://github.com/apache/druid/pull/14407#discussion_r1318745572


##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/CommandQueueTask.java:
##########
@@ -0,0 +1,197 @@
+package org.apache.druid.indexing.common.task;
+
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Test task that can be given a series of commands to execute in its {@link 
#runTask} method.
+ */
+public class CommandQueueTask extends AbstractTask
+{
+  private static final Logger log = new Logger(CommandQueueTask.class);
+
+  private final Object queueNotification = new Object();
+  private final BlockingQueue<Command<?>> commandQueue = new 
LinkedBlockingQueue<>();
+
+  private final AtomicBoolean finishRequested = new AtomicBoolean(false);
+  private final AtomicInteger numCommandsExecuted = new AtomicInteger(0);
+
+  private final CompletableFuture<TaskStatus> finalTaskStatus = new 
CompletableFuture<>();
+
+  public CommandQueueTask(String datasource, String groupId)
+  {
+    super(
+        StringUtils.format("test_%s_%s", datasource, 
UUID.randomUUID().toString()),
+        groupId,
+        null,
+        datasource,
+        null
+    );
+  }
+
+  /**
+   * Marks the run of this task as finished so that no new commands are 
accepted.
+   * This methods waits for all the commands submitted so far to finish 
execution
+   * and returns the final TaskStatus.
+   */
+  public TaskStatus finishRunAndGetStatus()
+  {
+    synchronized (finishRequested) {
+      finishRequested.set(true);
+    }
+    synchronized (queueNotification) {
+      queueNotification.notify();
+    }
+
+    try {
+      return finalTaskStatus.get(10, TimeUnit.SECONDS);
+    }
+    catch (Exception e) {
+      throw new ISE(e, "Error waiting for task[%s] to finish", getId());
+    }
+  }
+
+  /**
+   * Submits the given runnable for execution on the task thread. This method
+   * returns immediately and does not wait for the execution to finish.
+   */
+  public void submit(Runnable runnable)
+  {
+    // Add a command with a dummy return value
+    Command<?> command = new Command<>(
+        () -> {
+          runnable.run();
+          return 1;
+        }
+    );
+    addToQueue(command);
+  }
+
+  /**
+   * Executes the given callable on the task thread. This method waits until 
the
+   * execution has finished and returns the computed value.
+   */
+  public <V> V execute(Callable<V> callable)
+  {
+    Command<V> command = new Command<>(callable);
+    addToQueue(command);
+    return waitForCommandToFinish(command);
+  }
+
+  private <V> void addToQueue(Command<V> command)
+  {
+    synchronized (finishRequested) {
+      if (finishRequested.get()) {
+        throw new ISE("Task[%s] cannot accept any more commands as it is 
already shutting down.", getId());
+      } else {
+        boolean added = commandQueue.offer(command);
+        if (!added) {
+          throw new ISE("Could not add command to task[%s].", getId());
+        }
+      }
+    }
+
+    synchronized (queueNotification) {
+      queueNotification.notify();

Review Comment:
   ## notify instead of notifyAll
   
   Using notify rather than notifyAll.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/5772)



##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/CommandQueueTask.java:
##########
@@ -0,0 +1,197 @@
+package org.apache.druid.indexing.common.task;
+
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Test task that can be given a series of commands to execute in its {@link 
#runTask} method.
+ */
+public class CommandQueueTask extends AbstractTask
+{
+  private static final Logger log = new Logger(CommandQueueTask.class);
+
+  private final Object queueNotification = new Object();
+  private final BlockingQueue<Command<?>> commandQueue = new 
LinkedBlockingQueue<>();
+
+  private final AtomicBoolean finishRequested = new AtomicBoolean(false);
+  private final AtomicInteger numCommandsExecuted = new AtomicInteger(0);
+
+  private final CompletableFuture<TaskStatus> finalTaskStatus = new 
CompletableFuture<>();
+
+  public CommandQueueTask(String datasource, String groupId)
+  {
+    super(
+        StringUtils.format("test_%s_%s", datasource, 
UUID.randomUUID().toString()),
+        groupId,
+        null,
+        datasource,
+        null
+    );
+  }
+
+  /**
+   * Marks the run of this task as finished so that no new commands are 
accepted.
+   * This methods waits for all the commands submitted so far to finish 
execution
+   * and returns the final TaskStatus.
+   */
+  public TaskStatus finishRunAndGetStatus()
+  {
+    synchronized (finishRequested) {
+      finishRequested.set(true);
+    }
+    synchronized (queueNotification) {
+      queueNotification.notify();

Review Comment:
   ## notify instead of notifyAll
   
   Using notify rather than notifyAll.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/5771)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to