This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push: new dff6d03a59 Use SingleThreadExecutor for OrderedExecutor and drainTo() tasks into local array (#3546) dff6d03a59 is described below commit dff6d03a59b293019a3c1144515e6f931986f82f Author: Matteo Merli <mme...@apache.org> AuthorDate: Thu Oct 20 13:24:21 2022 -0700 Use SingleThreadExecutor for OrderedExecutor and drainTo() tasks into local array (#3546) * Use SingleThreadExecutor for OrderedExecutor and drainTo() tasks into local array * Added metrics to executor * Fixed checkstyle * Made the test more resilient * Made the tests not relying on thread.sleep() * Fixed testRejectWhenQueueIsFull * Ignore spotbugs warning * Fixed annotation formatting * Removed test assertion that had already been changed --- bookkeeper-common/pom.xml | 5 + .../bookkeeper/common/util/OrderedExecutor.java | 107 ++++---- .../bookkeeper/common/util/OrderedScheduler.java | 10 +- .../common/util/SingleThreadExecutor.java | 296 +++++++++++++++++++++ .../common/util/TestSingleThreadExecutor.java | 290 ++++++++++++++++++++ 5 files changed, 652 insertions(+), 56 deletions(-) diff --git a/bookkeeper-common/pom.xml b/bookkeeper-common/pom.xml index 62aad8735e..29d77bccde 100644 --- a/bookkeeper-common/pom.xml +++ b/bookkeeper-common/pom.xml @@ -85,6 +85,11 @@ <artifactId>commons-lang3</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> <plugins> diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedExecutor.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedExecutor.java index 9ee84ba9ed..40c3fb0283 100644 --- a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedExecutor.java +++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedExecutor.java @@ -29,19 +29,16 @@ import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Random; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; -import org.apache.bookkeeper.common.collections.BlockingMpscQueue; import org.apache.bookkeeper.common.util.affinity.CpuAffinity; import org.apache.bookkeeper.stats.Gauge; import org.apache.bookkeeper.stats.NullStatsLogger; @@ -294,20 +291,17 @@ public class OrderedExecutor implements ExecutorService { } } - protected ThreadPoolExecutor createSingleThreadExecutor(ThreadFactory factory) { - BlockingQueue<Runnable> queue; - if (enableBusyWait) { - // Use queue with busy-wait polling strategy - queue = new BlockingMpscQueue<>(maxTasksInQueue > 0 ? maxTasksInQueue : DEFAULT_MAX_ARRAY_QUEUE_SIZE); + protected ExecutorService createSingleThreadExecutor(ThreadFactory factory) { + if (maxTasksInQueue > 0) { + return new SingleThreadExecutor(factory, maxTasksInQueue, true); } else { - // By default, use regular JDK LinkedBlockingQueue - queue = new LinkedBlockingQueue<>(); + return new SingleThreadExecutor(factory); } - return new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, queue, factory); } - protected ExecutorService getBoundedExecutor(ThreadPoolExecutor executor) { - return new BoundedExecutorService(executor, this.maxTasksInQueue); + protected ExecutorService getBoundedExecutor(ExecutorService executor) { + checkArgument(executor instanceof ThreadPoolExecutor); + return new BoundedExecutorService((ThreadPoolExecutor) executor, this.maxTasksInQueue); } protected ExecutorService addExecutorDecorators(ExecutorService executor) { @@ -400,11 +394,14 @@ public class OrderedExecutor implements ExecutorService { threads = new ExecutorService[numThreads]; threadIds = new long[numThreads]; for (int i = 0; i < numThreads; i++) { - ThreadPoolExecutor thread = createSingleThreadExecutor( + ExecutorService thread = createSingleThreadExecutor( new ThreadFactoryBuilder().setNameFormat(name + "-" + getClass().getSimpleName() + "-" + i + "-%d") .setThreadFactory(threadFactory).build()); - threads[i] = addExecutorDecorators(getBoundedExecutor(thread)); + if (traceTaskExecution || preserveMdcForTaskExecution) { + thread = addExecutorDecorators(thread); + } + threads[i] = thread; final int idx = i; try { @@ -434,43 +431,49 @@ public class OrderedExecutor implements ExecutorService { throw new RuntimeException("Couldn't start thread " + i, e); } - // Register gauges - statsLogger.scopeLabel("thread", String.valueOf(idx)) - .registerGauge(String.format("%s-queue", name), new Gauge<Number>() { - @Override - public Number getDefaultValue() { - return 0; - } - - @Override - public Number getSample() { - return thread.getQueue().size(); - } - }); - statsLogger.scopeLabel("thread", String.valueOf(idx)) - .registerGauge(String.format("%s-completed-tasks", name), new Gauge<Number>() { - @Override - public Number getDefaultValue() { - return 0; - } - - @Override - public Number getSample() { - return thread.getCompletedTaskCount(); - } - }); - statsLogger.scopeLabel("thread", String.valueOf(idx)) - .registerGauge(String.format("%s-total-tasks", name), new Gauge<Number>() { - @Override - public Number getDefaultValue() { - return 0; - } - - @Override - public Number getSample() { - return thread.getTaskCount(); - } - }); + if (thread instanceof SingleThreadExecutor) { + SingleThreadExecutor ste = (SingleThreadExecutor) thread; + ste.registerMetrics(statsLogger); + } else if (thread instanceof ThreadPoolExecutor) { + ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) thread; + // Register gauges + statsLogger.scopeLabel("thread", String.valueOf(idx)) + .registerGauge(String.format("%s-queue", name), new Gauge<Number>() { + @Override + public Number getDefaultValue() { + return 0; + } + + @Override + public Number getSample() { + return threadPoolExecutor.getQueue().size(); + } + }); + statsLogger.scopeLabel("thread", String.valueOf(idx)) + .registerGauge(String.format("%s-completed-tasks", name), new Gauge<Number>() { + @Override + public Number getDefaultValue() { + return 0; + } + + @Override + public Number getSample() { + return threadPoolExecutor.getCompletedTaskCount(); + } + }); + statsLogger.scopeLabel("thread", String.valueOf(idx)) + .registerGauge(String.format("%s-total-tasks", name), new Gauge<Number>() { + @Override + public Number getDefaultValue() { + return 0; + } + + @Override + public Number getSample() { + return threadPoolExecutor.getTaskCount(); + } + }); + } } statsLogger.registerGauge(String.format("%s-threads", name), new Gauge<Number>() { diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java index cc2e9c8405..377ab202a6 100644 --- a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java +++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java @@ -33,7 +33,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.bookkeeper.stats.StatsLogger; @@ -118,17 +117,20 @@ public class OrderedScheduler extends OrderedExecutor implements ScheduledExecut } @Override - protected ScheduledThreadPoolExecutor createSingleThreadExecutor(ThreadFactory factory) { - return new ScheduledThreadPoolExecutor(1, factory); + protected ExecutorService createSingleThreadExecutor(ThreadFactory factory) { + return new BoundedScheduledExecutorService(new ScheduledThreadPoolExecutor(1, factory), this.maxTasksInQueue); } @Override - protected ListeningScheduledExecutorService getBoundedExecutor(ThreadPoolExecutor executor) { + protected ListeningScheduledExecutorService getBoundedExecutor(ExecutorService executor) { return new BoundedScheduledExecutorService((ScheduledThreadPoolExecutor) executor, this.maxTasksInQueue); } @Override protected ListeningScheduledExecutorService addExecutorDecorators(ExecutorService executor) { + if (!(executor instanceof ListeningScheduledExecutorService)) { + executor = new BoundedScheduledExecutorService((ScheduledThreadPoolExecutor) executor, 0); + } return new OrderedSchedulerDecoratedThread((ListeningScheduledExecutorService) executor); } diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/SingleThreadExecutor.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/SingleThreadExecutor.java new file mode 100644 index 0000000000..318aacb8cf --- /dev/null +++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/SingleThreadExecutor.java @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bookkeeper.common.util; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.stats.Gauge; +import org.apache.bookkeeper.stats.StatsLogger; + +/** + * Implements a single thread executor that drains the queue in batches to minimize contention between threads. + * + * <p>Tasks are executed in a safe manner: if there are exceptions they are logged and the executor will + * proceed with the next tasks. + */ +@Slf4j +public class SingleThreadExecutor extends AbstractExecutorService implements ExecutorService, Runnable { + private final BlockingQueue<Runnable> queue; + private final Thread runner; + + private final boolean rejectExecution; + + private final LongAdder tasksCount = new LongAdder(); + private final LongAdder tasksCompleted = new LongAdder(); + private final LongAdder tasksRejected = new LongAdder(); + private final LongAdder tasksFailed = new LongAdder(); + + enum State { + Running, + Shutdown, + Terminated + } + + private volatile State state; + + private final CountDownLatch startLatch; + + public SingleThreadExecutor(ThreadFactory tf) { + this(tf, 64 * 1024, false); + } + + @SneakyThrows + @SuppressFBWarnings(value = {"SC_START_IN_CTOR"}) + public SingleThreadExecutor(ThreadFactory tf, int maxQueueCapacity, boolean rejectExecution) { + this.queue = new ArrayBlockingQueue<>(maxQueueCapacity); + this.runner = tf.newThread(this); + this.state = State.Running; + this.rejectExecution = rejectExecution; + this.startLatch = new CountDownLatch(1); + this.runner.start(); + + // Ensure the runner is already fully working by the time the constructor is done + this.startLatch.await(); + } + + public void run() { + try { + boolean isInitialized = false; + List<Runnable> localTasks = new ArrayList<>(); + + while (state == State.Running) { + if (!isInitialized) { + startLatch.countDown(); + isInitialized = true; + } + + int n = queue.drainTo(localTasks); + if (n > 0) { + for (int i = 0; i < n; i++) { + if (!safeRunTask(localTasks.get(i))) { + return; + } + } + localTasks.clear(); + } else { + if (!safeRunTask(queue.take())) { + return; + } + } + } + + // Clear the queue in orderly shutdown + int n = queue.drainTo(localTasks); + for (int i = 0; i < n; i++) { + safeRunTask(localTasks.get(i)); + } + } catch (InterruptedException ie) { + // Exit loop when interrupted + Thread.currentThread().interrupt(); + } catch (Throwable t) { + log.error("Exception in executor: {}", t.getMessage(), t); + throw t; + } finally { + state = State.Terminated; + } + } + + private boolean safeRunTask(Runnable r) { + try { + r.run(); + tasksCompleted.increment(); + } catch (Throwable t) { + if (t instanceof InterruptedException) { + Thread.currentThread().interrupt(); + return false; + } else { + tasksFailed.increment(); + log.error("Error while running task: {}", t.getMessage(), t); + } + } + + return true; + } + + @Override + public void shutdown() { + state = State.Shutdown; + if (queue.isEmpty()) { + runner.interrupt(); + } + } + + @Override + public List<Runnable> shutdownNow() { + this.state = State.Shutdown; + this.runner.interrupt(); + List<Runnable> remainingTasks = new ArrayList<>(); + queue.drainTo(remainingTasks); + return remainingTasks; + } + + @Override + public boolean isShutdown() { + return state != State.Running; + } + + @Override + public boolean isTerminated() { + return state == State.Terminated; + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + runner.join(unit.toMillis(timeout)); + return runner.isAlive(); + } + + public long getQueuedTasksCount() { + return Math.max(0, getSubmittedTasksCount() - getCompletedTasksCount()); + } + + public long getSubmittedTasksCount() { + return tasksCount.sum(); + } + + public long getCompletedTasksCount() { + return tasksCompleted.sum(); + } + + public long getRejectedTasksCount() { + return tasksRejected.sum(); + } + + public long getFailedTasksCount() { + return tasksFailed.sum(); + } + + @Override + public void execute(Runnable r) { + if (state != State.Running) { + throw new RejectedExecutionException("Executor is shutting down"); + } + + try { + if (!rejectExecution) { + queue.put(r); + tasksCount.increment(); + } else { + if (queue.offer(r)) { + tasksCount.increment(); + } else { + tasksRejected.increment(); + throw new ExecutorRejectedException("Executor queue is full"); + } + } + } catch (InterruptedException e) { + throw new RejectedExecutionException("Executor thread was interrupted", e); + } + } + + public void registerMetrics(StatsLogger statsLogger) { + // Register gauges + statsLogger.scopeLabel("thread", runner.getName()) + .registerGauge("thread_executor_queue", new Gauge<Number>() { + @Override + public Number getDefaultValue() { + return 0; + } + + @Override + public Number getSample() { + return getQueuedTasksCount(); + } + }); + statsLogger.scopeLabel("thread", runner.getName()) + .registerGauge("thread_executor_completed", new Gauge<Number>() { + @Override + public Number getDefaultValue() { + return 0; + } + + @Override + public Number getSample() { + return getCompletedTasksCount(); + } + }); + statsLogger.scopeLabel("thread", runner.getName()) + .registerGauge("thread_executor_tasks_completed", new Gauge<Number>() { + @Override + public Number getDefaultValue() { + return 0; + } + + @Override + public Number getSample() { + return getCompletedTasksCount(); + } + }); + statsLogger.scopeLabel("thread", runner.getName()) + .registerGauge("thread_executor_tasks_rejected", new Gauge<Number>() { + @Override + public Number getDefaultValue() { + return 0; + } + + @Override + public Number getSample() { + return getRejectedTasksCount(); + } + }); + statsLogger.scopeLabel("thread", runner.getName()) + .registerGauge("thread_executor_tasks_failed", new Gauge<Number>() { + @Override + public Number getDefaultValue() { + return 0; + } + + @Override + public Number getSample() { + return getFailedTasksCount(); + } + }); + } + + private static class ExecutorRejectedException extends RejectedExecutionException { + + private ExecutorRejectedException(String msg) { + super(msg); + } + @Override + public Throwable fillInStackTrace() { + // Avoid the stack traces to be generated for this exception. This is done + // because when rejectExecution=true, there could be many such exceptions + // getting thrown, and filling the stack traces is very expensive + return this; + } + } +} diff --git a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestSingleThreadExecutor.java b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestSingleThreadExecutor.java new file mode 100644 index 0000000000..1f03bfb85f --- /dev/null +++ b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestSingleThreadExecutor.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bookkeeper.common.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import io.netty.util.concurrent.DefaultThreadFactory; +import java.util.List; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.Cleanup; +import org.awaitility.Awaitility; +import org.junit.Test; + +/** + * Unit test for {@link SingleThreadExecutor}. + */ +public class TestSingleThreadExecutor { + + private static final ThreadFactory THREAD_FACTORY = new DefaultThreadFactory("test"); + + @Test + public void testSimple() throws Exception { + @Cleanup("shutdown") + SingleThreadExecutor ste = new SingleThreadExecutor(THREAD_FACTORY); + + AtomicInteger count = new AtomicInteger(); + + assertEquals(0, ste.getSubmittedTasksCount()); + assertEquals(0, ste.getCompletedTasksCount()); + assertEquals(0, ste.getQueuedTasksCount()); + + for (int i = 0; i < 10; i++) { + ste.execute(() -> count.incrementAndGet()); + } + + assertEquals(10, ste.getSubmittedTasksCount()); + + ste.submit(() -> { + }).get(); + + assertEquals(10, count.get()); + assertEquals(11, ste.getSubmittedTasksCount()); + + Awaitility.await().untilAsserted(() -> assertEquals(11, ste.getCompletedTasksCount())); + assertEquals(0, ste.getRejectedTasksCount()); + assertEquals(0, ste.getFailedTasksCount()); + assertEquals(0, ste.getQueuedTasksCount()); + } + + @Test + public void testRejectWhenQueueIsFull() throws Exception { + @Cleanup("shutdownNow") + SingleThreadExecutor ste = new SingleThreadExecutor(THREAD_FACTORY, 10, true); + + CyclicBarrier barrier = new CyclicBarrier(10 + 1); + CountDownLatch startedLatch = new CountDownLatch(1); + + for (int i = 0; i < 10; i++) { + ste.execute(() -> { + startedLatch.countDown(); + + try { + barrier.await(); + } catch (InterruptedException | BrokenBarrierException e) { + // ignore + } + }); + + // Wait until the first task is already running in the thread + startedLatch.await(); + } + + // Next task should go through, because the runner thread has already pulled out 1 item from the + // queue: the first tasks which is currently stuck + ste.execute(() -> { + }); + + // Now the queue is really full and should reject tasks + try { + ste.execute(() -> { + }); + fail("should have rejected the task"); + } catch (RejectedExecutionException e) { + // Expected + } + + assertTrue(ste.getSubmittedTasksCount() >= 11); + assertTrue(ste.getRejectedTasksCount() >= 1); + assertEquals(0, ste.getFailedTasksCount()); + } + + @Test + public void testBlockWhenQueueIsFull() throws Exception { + @Cleanup("shutdown") + SingleThreadExecutor ste = new SingleThreadExecutor(THREAD_FACTORY, 10, false); + + CyclicBarrier barrier = new CyclicBarrier(10 + 1); + + for (int i = 0; i < 10; i++) { + ste.execute(() -> { + try { + barrier.await(1, TimeUnit.SECONDS); + } catch (TimeoutException te) { + // ignore + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + assertEquals(10, ste.getQueuedTasksCount()); + + ste.submit(() -> { + }).get(); + + assertEquals(11, ste.getSubmittedTasksCount()); + assertEquals(0, ste.getRejectedTasksCount()); + } + + @Test + public void testShutdown() throws Exception { + @Cleanup("shutdown") + SingleThreadExecutor ste = new SingleThreadExecutor(THREAD_FACTORY); + + assertFalse(ste.isShutdown()); + assertFalse(ste.isTerminated()); + + AtomicInteger count = new AtomicInteger(); + + for (int i = 0; i < 3; i++) { + ste.execute(() -> { + try { + Thread.sleep(1000); + count.incrementAndGet(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + } + + ste.shutdown(); + assertTrue(ste.isShutdown()); + assertFalse(ste.isTerminated()); + + try { + ste.execute(() -> { + }); + fail("should have rejected the task"); + } catch (RejectedExecutionException e) { + // Expected + } + + ste.awaitTermination(10, TimeUnit.SECONDS); + assertTrue(ste.isShutdown()); + assertTrue(ste.isTerminated()); + + assertEquals(3, count.get()); + } + + @Test + public void testShutdownNow() throws Exception { + @Cleanup("shutdown") + SingleThreadExecutor ste = new SingleThreadExecutor(THREAD_FACTORY); + + assertFalse(ste.isShutdown()); + assertFalse(ste.isTerminated()); + + AtomicInteger count = new AtomicInteger(); + + for (int i = 0; i < 3; i++) { + ste.execute(() -> { + try { + Thread.sleep(2000); + count.incrementAndGet(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + // Ensure the 3 tasks are not picked up in one shot by the runner thread + Thread.sleep(500); + } + + List<Runnable> remainingTasks = ste.shutdownNow(); + assertEquals(2, remainingTasks.size()); + assertTrue(ste.isShutdown()); + + try { + ste.execute(() -> { + }); + fail("should have rejected the task"); + } catch (RejectedExecutionException e) { + // Expected + } + + ste.awaitTermination(10, TimeUnit.SECONDS); + assertTrue(ste.isShutdown()); + assertTrue(ste.isTerminated()); + + assertEquals(0, count.get()); + } + + @Test + public void testTasksWithException() throws Exception { + @Cleanup("shutdown") + SingleThreadExecutor ste = new SingleThreadExecutor(THREAD_FACTORY); + + AtomicInteger count = new AtomicInteger(); + + for (int i = 0; i < 10; i++) { + ste.execute(() -> { + count.incrementAndGet(); + throw new RuntimeException("xyz"); + }); + } + + ste.submit(() -> { + }).get(); + assertEquals(10, count.get()); + + assertEquals(11, ste.getSubmittedTasksCount()); + Awaitility.await().untilAsserted(() -> assertEquals(1, ste.getCompletedTasksCount())); + assertEquals(0, ste.getRejectedTasksCount()); + assertEquals(10, ste.getFailedTasksCount()); + } + + @Test + public void testTasksWithNPE() throws Exception { + @Cleanup("shutdown") + SingleThreadExecutor ste = new SingleThreadExecutor(THREAD_FACTORY); + + AtomicInteger count = new AtomicInteger(); + String npeTest = null; + + for (int i = 0; i < 10; i++) { + ste.execute(() -> { + count.incrementAndGet(); + + // Trigger the NPE exception + System.out.println(npeTest.length()); + }); + } + + ste.submit(() -> { + }).get(); + assertEquals(10, count.get()); + + assertEquals(11, ste.getSubmittedTasksCount()); + Awaitility.await().untilAsserted(() -> assertEquals(1, ste.getCompletedTasksCount())); + assertEquals(0, ste.getRejectedTasksCount()); + assertEquals(10, ste.getFailedTasksCount()); + } + + @Test + public void testShutdownEmpty() throws Exception { + SingleThreadExecutor ste = new SingleThreadExecutor(THREAD_FACTORY); + ste.shutdown(); + assertTrue(ste.isShutdown()); + + ste.awaitTermination(10, TimeUnit.SECONDS); + assertTrue(ste.isShutdown()); + assertTrue(ste.isTerminated()); + } +}