This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new dff6d03a59 Use SingleThreadExecutor for OrderedExecutor and drainTo()
tasks into local array (#3546)
dff6d03a59 is described below
commit dff6d03a59b293019a3c1144515e6f931986f82f
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Oct 20 13:24:21 2022 -0700
Use SingleThreadExecutor for OrderedExecutor and drainTo() tasks into local
array (#3546)
* Use SingleThreadExecutor for OrderedExecutor and drainTo() tasks into
local array
* Added metrics to executor
* Fixed checkstyle
* Made the test more resilient
* Made the tests not relying on thread.sleep()
* Fixed testRejectWhenQueueIsFull
* Ignore spotbugs warning
* Fixed annotation formatting
* Removed test assertion that had already been changed
---
bookkeeper-common/pom.xml | 5 +
.../bookkeeper/common/util/OrderedExecutor.java | 107 ++++----
.../bookkeeper/common/util/OrderedScheduler.java | 10 +-
.../common/util/SingleThreadExecutor.java | 296 +++++++++++++++++++++
.../common/util/TestSingleThreadExecutor.java | 290 ++++++++++++++++++++
5 files changed, 652 insertions(+), 56 deletions(-)
diff --git a/bookkeeper-common/pom.xml b/bookkeeper-common/pom.xml
index 62aad8735e..29d77bccde 100644
--- a/bookkeeper-common/pom.xml
+++ b/bookkeeper-common/pom.xml
@@ -85,6 +85,11 @@
<artifactId>commons-lang3</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedExecutor.java
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedExecutor.java
index 9ee84ba9ed..40c3fb0283 100644
---
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedExecutor.java
+++
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedExecutor.java
@@ -29,19 +29,16 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Random;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.common.collections.BlockingMpscQueue;
import org.apache.bookkeeper.common.util.affinity.CpuAffinity;
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -294,20 +291,17 @@ public class OrderedExecutor implements ExecutorService {
}
}
- protected ThreadPoolExecutor createSingleThreadExecutor(ThreadFactory
factory) {
- BlockingQueue<Runnable> queue;
- if (enableBusyWait) {
- // Use queue with busy-wait polling strategy
- queue = new BlockingMpscQueue<>(maxTasksInQueue > 0 ?
maxTasksInQueue : DEFAULT_MAX_ARRAY_QUEUE_SIZE);
+ protected ExecutorService createSingleThreadExecutor(ThreadFactory
factory) {
+ if (maxTasksInQueue > 0) {
+ return new SingleThreadExecutor(factory, maxTasksInQueue, true);
} else {
- // By default, use regular JDK LinkedBlockingQueue
- queue = new LinkedBlockingQueue<>();
+ return new SingleThreadExecutor(factory);
}
- return new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, queue,
factory);
}
- protected ExecutorService getBoundedExecutor(ThreadPoolExecutor executor) {
- return new BoundedExecutorService(executor, this.maxTasksInQueue);
+ protected ExecutorService getBoundedExecutor(ExecutorService executor) {
+ checkArgument(executor instanceof ThreadPoolExecutor);
+ return new BoundedExecutorService((ThreadPoolExecutor) executor,
this.maxTasksInQueue);
}
protected ExecutorService addExecutorDecorators(ExecutorService executor) {
@@ -400,11 +394,14 @@ public class OrderedExecutor implements ExecutorService {
threads = new ExecutorService[numThreads];
threadIds = new long[numThreads];
for (int i = 0; i < numThreads; i++) {
- ThreadPoolExecutor thread = createSingleThreadExecutor(
+ ExecutorService thread = createSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat(name + "-" +
getClass().getSimpleName() + "-" + i + "-%d")
.setThreadFactory(threadFactory).build());
- threads[i] = addExecutorDecorators(getBoundedExecutor(thread));
+ if (traceTaskExecution || preserveMdcForTaskExecution) {
+ thread = addExecutorDecorators(thread);
+ }
+ threads[i] = thread;
final int idx = i;
try {
@@ -434,43 +431,49 @@ public class OrderedExecutor implements ExecutorService {
throw new RuntimeException("Couldn't start thread " + i, e);
}
- // Register gauges
- statsLogger.scopeLabel("thread", String.valueOf(idx))
- .registerGauge(String.format("%s-queue", name), new
Gauge<Number>() {
- @Override
- public Number getDefaultValue() {
- return 0;
- }
-
- @Override
- public Number getSample() {
- return thread.getQueue().size();
- }
- });
- statsLogger.scopeLabel("thread", String.valueOf(idx))
- .registerGauge(String.format("%s-completed-tasks", name),
new Gauge<Number>() {
- @Override
- public Number getDefaultValue() {
- return 0;
- }
-
- @Override
- public Number getSample() {
- return thread.getCompletedTaskCount();
- }
- });
- statsLogger.scopeLabel("thread", String.valueOf(idx))
- .registerGauge(String.format("%s-total-tasks", name), new
Gauge<Number>() {
- @Override
- public Number getDefaultValue() {
- return 0;
- }
-
- @Override
- public Number getSample() {
- return thread.getTaskCount();
- }
- });
+ if (thread instanceof SingleThreadExecutor) {
+ SingleThreadExecutor ste = (SingleThreadExecutor) thread;
+ ste.registerMetrics(statsLogger);
+ } else if (thread instanceof ThreadPoolExecutor) {
+ ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor)
thread;
+ // Register gauges
+ statsLogger.scopeLabel("thread", String.valueOf(idx))
+ .registerGauge(String.format("%s-queue", name), new
Gauge<Number>() {
+ @Override
+ public Number getDefaultValue() {
+ return 0;
+ }
+
+ @Override
+ public Number getSample() {
+ return threadPoolExecutor.getQueue().size();
+ }
+ });
+ statsLogger.scopeLabel("thread", String.valueOf(idx))
+ .registerGauge(String.format("%s-completed-tasks",
name), new Gauge<Number>() {
+ @Override
+ public Number getDefaultValue() {
+ return 0;
+ }
+
+ @Override
+ public Number getSample() {
+ return
threadPoolExecutor.getCompletedTaskCount();
+ }
+ });
+ statsLogger.scopeLabel("thread", String.valueOf(idx))
+ .registerGauge(String.format("%s-total-tasks", name),
new Gauge<Number>() {
+ @Override
+ public Number getDefaultValue() {
+ return 0;
+ }
+
+ @Override
+ public Number getSample() {
+ return threadPoolExecutor.getTaskCount();
+ }
+ });
+ }
}
statsLogger.registerGauge(String.format("%s-threads", name), new
Gauge<Number>() {
diff --git
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
index cc2e9c8405..377ab202a6 100644
---
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
+++
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
@@ -33,7 +33,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.bookkeeper.stats.StatsLogger;
@@ -118,17 +117,20 @@ public class OrderedScheduler extends OrderedExecutor
implements ScheduledExecut
}
@Override
- protected ScheduledThreadPoolExecutor
createSingleThreadExecutor(ThreadFactory factory) {
- return new ScheduledThreadPoolExecutor(1, factory);
+ protected ExecutorService createSingleThreadExecutor(ThreadFactory
factory) {
+ return new BoundedScheduledExecutorService(new
ScheduledThreadPoolExecutor(1, factory), this.maxTasksInQueue);
}
@Override
- protected ListeningScheduledExecutorService
getBoundedExecutor(ThreadPoolExecutor executor) {
+ protected ListeningScheduledExecutorService
getBoundedExecutor(ExecutorService executor) {
return new
BoundedScheduledExecutorService((ScheduledThreadPoolExecutor) executor,
this.maxTasksInQueue);
}
@Override
protected ListeningScheduledExecutorService
addExecutorDecorators(ExecutorService executor) {
+ if (!(executor instanceof ListeningScheduledExecutorService)) {
+ executor = new
BoundedScheduledExecutorService((ScheduledThreadPoolExecutor) executor, 0);
+ }
return new
OrderedSchedulerDecoratedThread((ListeningScheduledExecutorService) executor);
}
diff --git
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/SingleThreadExecutor.java
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/SingleThreadExecutor.java
new file mode 100644
index 0000000000..318aacb8cf
--- /dev/null
+++
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/SingleThreadExecutor.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.util;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAdder;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+/**
+ * Implements a single thread executor that drains the queue in batches to
minimize contention between threads.
+ *
+ * <p>Tasks are executed in a safe manner: if there are exceptions they are
logged and the executor will
+ * proceed with the next tasks.
+ */
+@Slf4j
+public class SingleThreadExecutor extends AbstractExecutorService implements
ExecutorService, Runnable {
+ private final BlockingQueue<Runnable> queue;
+ private final Thread runner;
+
+ private final boolean rejectExecution;
+
+ private final LongAdder tasksCount = new LongAdder();
+ private final LongAdder tasksCompleted = new LongAdder();
+ private final LongAdder tasksRejected = new LongAdder();
+ private final LongAdder tasksFailed = new LongAdder();
+
+ enum State {
+ Running,
+ Shutdown,
+ Terminated
+ }
+
+ private volatile State state;
+
+ private final CountDownLatch startLatch;
+
+ public SingleThreadExecutor(ThreadFactory tf) {
+ this(tf, 64 * 1024, false);
+ }
+
+ @SneakyThrows
+ @SuppressFBWarnings(value = {"SC_START_IN_CTOR"})
+ public SingleThreadExecutor(ThreadFactory tf, int maxQueueCapacity,
boolean rejectExecution) {
+ this.queue = new ArrayBlockingQueue<>(maxQueueCapacity);
+ this.runner = tf.newThread(this);
+ this.state = State.Running;
+ this.rejectExecution = rejectExecution;
+ this.startLatch = new CountDownLatch(1);
+ this.runner.start();
+
+ // Ensure the runner is already fully working by the time the
constructor is done
+ this.startLatch.await();
+ }
+
+ public void run() {
+ try {
+ boolean isInitialized = false;
+ List<Runnable> localTasks = new ArrayList<>();
+
+ while (state == State.Running) {
+ if (!isInitialized) {
+ startLatch.countDown();
+ isInitialized = true;
+ }
+
+ int n = queue.drainTo(localTasks);
+ if (n > 0) {
+ for (int i = 0; i < n; i++) {
+ if (!safeRunTask(localTasks.get(i))) {
+ return;
+ }
+ }
+ localTasks.clear();
+ } else {
+ if (!safeRunTask(queue.take())) {
+ return;
+ }
+ }
+ }
+
+ // Clear the queue in orderly shutdown
+ int n = queue.drainTo(localTasks);
+ for (int i = 0; i < n; i++) {
+ safeRunTask(localTasks.get(i));
+ }
+ } catch (InterruptedException ie) {
+ // Exit loop when interrupted
+ Thread.currentThread().interrupt();
+ } catch (Throwable t) {
+ log.error("Exception in executor: {}", t.getMessage(), t);
+ throw t;
+ } finally {
+ state = State.Terminated;
+ }
+ }
+
+ private boolean safeRunTask(Runnable r) {
+ try {
+ r.run();
+ tasksCompleted.increment();
+ } catch (Throwable t) {
+ if (t instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ return false;
+ } else {
+ tasksFailed.increment();
+ log.error("Error while running task: {}", t.getMessage(), t);
+ }
+ }
+
+ return true;
+ }
+
+ @Override
+ public void shutdown() {
+ state = State.Shutdown;
+ if (queue.isEmpty()) {
+ runner.interrupt();
+ }
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ this.state = State.Shutdown;
+ this.runner.interrupt();
+ List<Runnable> remainingTasks = new ArrayList<>();
+ queue.drainTo(remainingTasks);
+ return remainingTasks;
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return state != State.Running;
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return state == State.Terminated;
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws
InterruptedException {
+ runner.join(unit.toMillis(timeout));
+ return runner.isAlive();
+ }
+
+ public long getQueuedTasksCount() {
+ return Math.max(0, getSubmittedTasksCount() -
getCompletedTasksCount());
+ }
+
+ public long getSubmittedTasksCount() {
+ return tasksCount.sum();
+ }
+
+ public long getCompletedTasksCount() {
+ return tasksCompleted.sum();
+ }
+
+ public long getRejectedTasksCount() {
+ return tasksRejected.sum();
+ }
+
+ public long getFailedTasksCount() {
+ return tasksFailed.sum();
+ }
+
+ @Override
+ public void execute(Runnable r) {
+ if (state != State.Running) {
+ throw new RejectedExecutionException("Executor is shutting down");
+ }
+
+ try {
+ if (!rejectExecution) {
+ queue.put(r);
+ tasksCount.increment();
+ } else {
+ if (queue.offer(r)) {
+ tasksCount.increment();
+ } else {
+ tasksRejected.increment();
+ throw new ExecutorRejectedException("Executor queue is
full");
+ }
+ }
+ } catch (InterruptedException e) {
+ throw new RejectedExecutionException("Executor thread was
interrupted", e);
+ }
+ }
+
+ public void registerMetrics(StatsLogger statsLogger) {
+ // Register gauges
+ statsLogger.scopeLabel("thread", runner.getName())
+ .registerGauge("thread_executor_queue", new Gauge<Number>() {
+ @Override
+ public Number getDefaultValue() {
+ return 0;
+ }
+
+ @Override
+ public Number getSample() {
+ return getQueuedTasksCount();
+ }
+ });
+ statsLogger.scopeLabel("thread", runner.getName())
+ .registerGauge("thread_executor_completed", new
Gauge<Number>() {
+ @Override
+ public Number getDefaultValue() {
+ return 0;
+ }
+
+ @Override
+ public Number getSample() {
+ return getCompletedTasksCount();
+ }
+ });
+ statsLogger.scopeLabel("thread", runner.getName())
+ .registerGauge("thread_executor_tasks_completed", new
Gauge<Number>() {
+ @Override
+ public Number getDefaultValue() {
+ return 0;
+ }
+
+ @Override
+ public Number getSample() {
+ return getCompletedTasksCount();
+ }
+ });
+ statsLogger.scopeLabel("thread", runner.getName())
+ .registerGauge("thread_executor_tasks_rejected", new
Gauge<Number>() {
+ @Override
+ public Number getDefaultValue() {
+ return 0;
+ }
+
+ @Override
+ public Number getSample() {
+ return getRejectedTasksCount();
+ }
+ });
+ statsLogger.scopeLabel("thread", runner.getName())
+ .registerGauge("thread_executor_tasks_failed", new
Gauge<Number>() {
+ @Override
+ public Number getDefaultValue() {
+ return 0;
+ }
+
+ @Override
+ public Number getSample() {
+ return getFailedTasksCount();
+ }
+ });
+ }
+
+ private static class ExecutorRejectedException extends
RejectedExecutionException {
+
+ private ExecutorRejectedException(String msg) {
+ super(msg);
+ }
+ @Override
+ public Throwable fillInStackTrace() {
+ // Avoid the stack traces to be generated for this exception. This
is done
+ // because when rejectExecution=true, there could be many such
exceptions
+ // getting thrown, and filling the stack traces is very expensive
+ return this;
+ }
+ }
+}
diff --git
a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestSingleThreadExecutor.java
b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestSingleThreadExecutor.java
new file mode 100644
index 0000000000..1f03bfb85f
--- /dev/null
+++
b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestSingleThreadExecutor.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.List;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.Cleanup;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+
+/**
+ * Unit test for {@link SingleThreadExecutor}.
+ */
+public class TestSingleThreadExecutor {
+
+ private static final ThreadFactory THREAD_FACTORY = new
DefaultThreadFactory("test");
+
+ @Test
+ public void testSimple() throws Exception {
+ @Cleanup("shutdown")
+ SingleThreadExecutor ste = new SingleThreadExecutor(THREAD_FACTORY);
+
+ AtomicInteger count = new AtomicInteger();
+
+ assertEquals(0, ste.getSubmittedTasksCount());
+ assertEquals(0, ste.getCompletedTasksCount());
+ assertEquals(0, ste.getQueuedTasksCount());
+
+ for (int i = 0; i < 10; i++) {
+ ste.execute(() -> count.incrementAndGet());
+ }
+
+ assertEquals(10, ste.getSubmittedTasksCount());
+
+ ste.submit(() -> {
+ }).get();
+
+ assertEquals(10, count.get());
+ assertEquals(11, ste.getSubmittedTasksCount());
+
+ Awaitility.await().untilAsserted(() -> assertEquals(11,
ste.getCompletedTasksCount()));
+ assertEquals(0, ste.getRejectedTasksCount());
+ assertEquals(0, ste.getFailedTasksCount());
+ assertEquals(0, ste.getQueuedTasksCount());
+ }
+
+ @Test
+ public void testRejectWhenQueueIsFull() throws Exception {
+ @Cleanup("shutdownNow")
+ SingleThreadExecutor ste = new SingleThreadExecutor(THREAD_FACTORY,
10, true);
+
+ CyclicBarrier barrier = new CyclicBarrier(10 + 1);
+ CountDownLatch startedLatch = new CountDownLatch(1);
+
+ for (int i = 0; i < 10; i++) {
+ ste.execute(() -> {
+ startedLatch.countDown();
+
+ try {
+ barrier.await();
+ } catch (InterruptedException | BrokenBarrierException e) {
+ // ignore
+ }
+ });
+
+ // Wait until the first task is already running in the thread
+ startedLatch.await();
+ }
+
+ // Next task should go through, because the runner thread has already
pulled out 1 item from the
+ // queue: the first tasks which is currently stuck
+ ste.execute(() -> {
+ });
+
+ // Now the queue is really full and should reject tasks
+ try {
+ ste.execute(() -> {
+ });
+ fail("should have rejected the task");
+ } catch (RejectedExecutionException e) {
+ // Expected
+ }
+
+ assertTrue(ste.getSubmittedTasksCount() >= 11);
+ assertTrue(ste.getRejectedTasksCount() >= 1);
+ assertEquals(0, ste.getFailedTasksCount());
+ }
+
+ @Test
+ public void testBlockWhenQueueIsFull() throws Exception {
+ @Cleanup("shutdown")
+ SingleThreadExecutor ste = new SingleThreadExecutor(THREAD_FACTORY,
10, false);
+
+ CyclicBarrier barrier = new CyclicBarrier(10 + 1);
+
+ for (int i = 0; i < 10; i++) {
+ ste.execute(() -> {
+ try {
+ barrier.await(1, TimeUnit.SECONDS);
+ } catch (TimeoutException te) {
+ // ignore
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ assertEquals(10, ste.getQueuedTasksCount());
+
+ ste.submit(() -> {
+ }).get();
+
+ assertEquals(11, ste.getSubmittedTasksCount());
+ assertEquals(0, ste.getRejectedTasksCount());
+ }
+
+ @Test
+ public void testShutdown() throws Exception {
+ @Cleanup("shutdown")
+ SingleThreadExecutor ste = new SingleThreadExecutor(THREAD_FACTORY);
+
+ assertFalse(ste.isShutdown());
+ assertFalse(ste.isTerminated());
+
+ AtomicInteger count = new AtomicInteger();
+
+ for (int i = 0; i < 3; i++) {
+ ste.execute(() -> {
+ try {
+ Thread.sleep(1000);
+ count.incrementAndGet();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ ste.shutdown();
+ assertTrue(ste.isShutdown());
+ assertFalse(ste.isTerminated());
+
+ try {
+ ste.execute(() -> {
+ });
+ fail("should have rejected the task");
+ } catch (RejectedExecutionException e) {
+ // Expected
+ }
+
+ ste.awaitTermination(10, TimeUnit.SECONDS);
+ assertTrue(ste.isShutdown());
+ assertTrue(ste.isTerminated());
+
+ assertEquals(3, count.get());
+ }
+
+ @Test
+ public void testShutdownNow() throws Exception {
+ @Cleanup("shutdown")
+ SingleThreadExecutor ste = new SingleThreadExecutor(THREAD_FACTORY);
+
+ assertFalse(ste.isShutdown());
+ assertFalse(ste.isTerminated());
+
+ AtomicInteger count = new AtomicInteger();
+
+ for (int i = 0; i < 3; i++) {
+ ste.execute(() -> {
+ try {
+ Thread.sleep(2000);
+ count.incrementAndGet();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ // Ensure the 3 tasks are not picked up in one shot by the runner
thread
+ Thread.sleep(500);
+ }
+
+ List<Runnable> remainingTasks = ste.shutdownNow();
+ assertEquals(2, remainingTasks.size());
+ assertTrue(ste.isShutdown());
+
+ try {
+ ste.execute(() -> {
+ });
+ fail("should have rejected the task");
+ } catch (RejectedExecutionException e) {
+ // Expected
+ }
+
+ ste.awaitTermination(10, TimeUnit.SECONDS);
+ assertTrue(ste.isShutdown());
+ assertTrue(ste.isTerminated());
+
+ assertEquals(0, count.get());
+ }
+
+ @Test
+ public void testTasksWithException() throws Exception {
+ @Cleanup("shutdown")
+ SingleThreadExecutor ste = new SingleThreadExecutor(THREAD_FACTORY);
+
+ AtomicInteger count = new AtomicInteger();
+
+ for (int i = 0; i < 10; i++) {
+ ste.execute(() -> {
+ count.incrementAndGet();
+ throw new RuntimeException("xyz");
+ });
+ }
+
+ ste.submit(() -> {
+ }).get();
+ assertEquals(10, count.get());
+
+ assertEquals(11, ste.getSubmittedTasksCount());
+ Awaitility.await().untilAsserted(() -> assertEquals(1,
ste.getCompletedTasksCount()));
+ assertEquals(0, ste.getRejectedTasksCount());
+ assertEquals(10, ste.getFailedTasksCount());
+ }
+
+ @Test
+ public void testTasksWithNPE() throws Exception {
+ @Cleanup("shutdown")
+ SingleThreadExecutor ste = new SingleThreadExecutor(THREAD_FACTORY);
+
+ AtomicInteger count = new AtomicInteger();
+ String npeTest = null;
+
+ for (int i = 0; i < 10; i++) {
+ ste.execute(() -> {
+ count.incrementAndGet();
+
+ // Trigger the NPE exception
+ System.out.println(npeTest.length());
+ });
+ }
+
+ ste.submit(() -> {
+ }).get();
+ assertEquals(10, count.get());
+
+ assertEquals(11, ste.getSubmittedTasksCount());
+ Awaitility.await().untilAsserted(() -> assertEquals(1,
ste.getCompletedTasksCount()));
+ assertEquals(0, ste.getRejectedTasksCount());
+ assertEquals(10, ste.getFailedTasksCount());
+ }
+
+ @Test
+ public void testShutdownEmpty() throws Exception {
+ SingleThreadExecutor ste = new SingleThreadExecutor(THREAD_FACTORY);
+ ste.shutdown();
+ assertTrue(ste.isShutdown());
+
+ ste.awaitTermination(10, TimeUnit.SECONDS);
+ assertTrue(ste.isShutdown());
+ assertTrue(ste.isTerminated());
+ }
+}