github-code-scanning[bot] commented on code in PR #14407:
URL: https://github.com/apache/druid/pull/14407#discussion_r1316600704


##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/CommandQueueTask.java:
##########
@@ -0,0 +1,171 @@
+package org.apache.druid.indexing.common.task;
+
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Test task that can be given a series of commands to execute in its {@link 
#runTask} method.
+ */
+public class CommandQueueTask extends AbstractTask
+{
+  private static final Logger log = new Logger(CommandQueueTask.class);
+
+  private final BlockingQueue<Command<?>> commandQueue = new 
LinkedBlockingQueue<>();
+  private final AtomicBoolean finishRequested = new AtomicBoolean(false);
+  private final AtomicInteger numCommandsExecuted = new AtomicInteger(0);
+
+  private final AtomicReference<TaskStatus> finalTaskStatus = new 
AtomicReference<>();
+
+  public CommandQueueTask(String datasource, String groupId)
+  {
+    super(
+        StringUtils.format("test_%s_%s", DateTimes.nowUtc(), 
UUID.randomUUID().toString()),
+        groupId,
+        null,
+        datasource,
+        null
+    );
+  }
+
+  /**
+   * Marks the run of this task as finished so that no new commands are 
accepted.
+   * This methods waits for all the commands submitted so far to finish 
execution
+   * and returns the final TaskStatus.
+   */
+  public TaskStatus finishRunAndGetStatus()
+  {
+    // Mark finished to prevent submission of any more commands
+    finishRequested.set(true);
+
+    // Submit a dummy command to ensure that all previous commands have 
finished
+    executeInternal(() -> 1);
+
+    return finalTaskStatus.get();
+  }
+
+  /**
+   * Submits the given runnable for execution on the task thread. This method
+   * returns immediately and does not wait for the execution to finish.
+   */
+  public void submit(Runnable runnable)
+  {
+    if (finishRequested.get()) {
+      throw new ISE("Task[%s] cannot accept any more commands as it is already 
shutting down.", getId());
+    }
+
+    // Add a command with a dummy return value
+    Command<?> command = new Command<>(
+        () -> {
+          runnable.run();
+          return 1;
+        }
+    );
+    commandQueue.offer(command);

Review Comment:
   ## Ignored error status of call
   
   Method submit ignores exceptional return value of 
BlockingQueue<Command<?>>.offer.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/5763)



##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/CommandQueueTask.java:
##########
@@ -0,0 +1,171 @@
+package org.apache.druid.indexing.common.task;
+
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Test task that can be given a series of commands to execute in its {@link 
#runTask} method.
+ */
+public class CommandQueueTask extends AbstractTask
+{
+  private static final Logger log = new Logger(CommandQueueTask.class);
+
+  private final BlockingQueue<Command<?>> commandQueue = new 
LinkedBlockingQueue<>();
+  private final AtomicBoolean finishRequested = new AtomicBoolean(false);
+  private final AtomicInteger numCommandsExecuted = new AtomicInteger(0);
+
+  private final AtomicReference<TaskStatus> finalTaskStatus = new 
AtomicReference<>();
+
+  public CommandQueueTask(String datasource, String groupId)
+  {
+    super(
+        StringUtils.format("test_%s_%s", DateTimes.nowUtc(), 
UUID.randomUUID().toString()),
+        groupId,
+        null,
+        datasource,
+        null
+    );
+  }
+
+  /**
+   * Marks the run of this task as finished so that no new commands are 
accepted.
+   * This methods waits for all the commands submitted so far to finish 
execution
+   * and returns the final TaskStatus.
+   */
+  public TaskStatus finishRunAndGetStatus()
+  {
+    // Mark finished to prevent submission of any more commands
+    finishRequested.set(true);
+
+    // Submit a dummy command to ensure that all previous commands have 
finished
+    executeInternal(() -> 1);
+
+    return finalTaskStatus.get();
+  }
+
+  /**
+   * Submits the given runnable for execution on the task thread. This method
+   * returns immediately and does not wait for the execution to finish.
+   */
+  public void submit(Runnable runnable)
+  {
+    if (finishRequested.get()) {
+      throw new ISE("Task[%s] cannot accept any more commands as it is already 
shutting down.", getId());
+    }
+
+    // Add a command with a dummy return value
+    Command<?> command = new Command<>(
+        () -> {
+          runnable.run();
+          return 1;
+        }
+    );
+    commandQueue.offer(command);
+  }
+
+  /**
+   * Executes the given callable on the task thread. This method waits until 
the
+   * execution has finished and returns the computed value.
+   */
+  public <V> V execute(Callable<V> callable)
+  {
+    if (finishRequested.get()) {
+      throw new ISE("Task[%s] cannot accept any more commands as it is already 
shutting down.", getId());
+    }
+
+    return executeInternal(callable);
+  }
+
+  private <V> V executeInternal(Callable<V> callable)
+  {
+    Command<V> command = new Command<>(callable);
+    commandQueue.offer(command);

Review Comment:
   ## Ignored error status of call
   
   Method executeInternal ignores exceptional return value of 
BlockingQueue<Command<?>>.offer.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/5764)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to