github-code-scanning[bot] commented on code in PR #14407:
URL: https://github.com/apache/druid/pull/14407#discussion_r1316600704
##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/CommandQueueTask.java:
##########
@@ -0,0 +1,171 @@
+package org.apache.druid.indexing.common.task;
+
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Test task that can be given a series of commands to execute in its {@link
#runTask} method.
+ */
+public class CommandQueueTask extends AbstractTask
+{
+ private static final Logger log = new Logger(CommandQueueTask.class);
+
+ private final BlockingQueue<Command<?>> commandQueue = new
LinkedBlockingQueue<>();
+ private final AtomicBoolean finishRequested = new AtomicBoolean(false);
+ private final AtomicInteger numCommandsExecuted = new AtomicInteger(0);
+
+ private final AtomicReference<TaskStatus> finalTaskStatus = new
AtomicReference<>();
+
+ public CommandQueueTask(String datasource, String groupId)
+ {
+ super(
+ StringUtils.format("test_%s_%s", DateTimes.nowUtc(),
UUID.randomUUID().toString()),
+ groupId,
+ null,
+ datasource,
+ null
+ );
+ }
+
+ /**
+ * Marks the run of this task as finished so that no new commands are
accepted.
+ * This methods waits for all the commands submitted so far to finish
execution
+ * and returns the final TaskStatus.
+ */
+ public TaskStatus finishRunAndGetStatus()
+ {
+ // Mark finished to prevent submission of any more commands
+ finishRequested.set(true);
+
+ // Submit a dummy command to ensure that all previous commands have
finished
+ executeInternal(() -> 1);
+
+ return finalTaskStatus.get();
+ }
+
+ /**
+ * Submits the given runnable for execution on the task thread. This method
+ * returns immediately and does not wait for the execution to finish.
+ */
+ public void submit(Runnable runnable)
+ {
+ if (finishRequested.get()) {
+ throw new ISE("Task[%s] cannot accept any more commands as it is already
shutting down.", getId());
+ }
+
+ // Add a command with a dummy return value
+ Command<?> command = new Command<>(
+ () -> {
+ runnable.run();
+ return 1;
+ }
+ );
+ commandQueue.offer(command);
Review Comment:
## Ignored error status of call
Method submit ignores exceptional return value of
BlockingQueue<Command<?>>.offer.
[Show more
details](https://github.com/apache/druid/security/code-scanning/5763)
##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/CommandQueueTask.java:
##########
@@ -0,0 +1,171 @@
+package org.apache.druid.indexing.common.task;
+
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Test task that can be given a series of commands to execute in its {@link
#runTask} method.
+ */
+public class CommandQueueTask extends AbstractTask
+{
+ private static final Logger log = new Logger(CommandQueueTask.class);
+
+ private final BlockingQueue<Command<?>> commandQueue = new
LinkedBlockingQueue<>();
+ private final AtomicBoolean finishRequested = new AtomicBoolean(false);
+ private final AtomicInteger numCommandsExecuted = new AtomicInteger(0);
+
+ private final AtomicReference<TaskStatus> finalTaskStatus = new
AtomicReference<>();
+
+ public CommandQueueTask(String datasource, String groupId)
+ {
+ super(
+ StringUtils.format("test_%s_%s", DateTimes.nowUtc(),
UUID.randomUUID().toString()),
+ groupId,
+ null,
+ datasource,
+ null
+ );
+ }
+
+ /**
+ * Marks the run of this task as finished so that no new commands are
accepted.
+ * This methods waits for all the commands submitted so far to finish
execution
+ * and returns the final TaskStatus.
+ */
+ public TaskStatus finishRunAndGetStatus()
+ {
+ // Mark finished to prevent submission of any more commands
+ finishRequested.set(true);
+
+ // Submit a dummy command to ensure that all previous commands have
finished
+ executeInternal(() -> 1);
+
+ return finalTaskStatus.get();
+ }
+
+ /**
+ * Submits the given runnable for execution on the task thread. This method
+ * returns immediately and does not wait for the execution to finish.
+ */
+ public void submit(Runnable runnable)
+ {
+ if (finishRequested.get()) {
+ throw new ISE("Task[%s] cannot accept any more commands as it is already
shutting down.", getId());
+ }
+
+ // Add a command with a dummy return value
+ Command<?> command = new Command<>(
+ () -> {
+ runnable.run();
+ return 1;
+ }
+ );
+ commandQueue.offer(command);
+ }
+
+ /**
+ * Executes the given callable on the task thread. This method waits until
the
+ * execution has finished and returns the computed value.
+ */
+ public <V> V execute(Callable<V> callable)
+ {
+ if (finishRequested.get()) {
+ throw new ISE("Task[%s] cannot accept any more commands as it is already
shutting down.", getId());
+ }
+
+ return executeInternal(callable);
+ }
+
+ private <V> V executeInternal(Callable<V> callable)
+ {
+ Command<V> command = new Command<>(callable);
+ commandQueue.offer(command);
Review Comment:
## Ignored error status of call
Method executeInternal ignores exceptional return value of
BlockingQueue<Command<?>>.offer.
[Show more
details](https://github.com/apache/druid/security/code-scanning/5764)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]