Repository: jclouds Updated Branches: refs/heads/2.0.x 6100b7c36 -> 05f64b20a
Fix default executor rejection policies Project: http://git-wip-us.apache.org/repos/asf/jclouds/repo Commit: http://git-wip-us.apache.org/repos/asf/jclouds/commit/05f64b20 Tree: http://git-wip-us.apache.org/repos/asf/jclouds/tree/05f64b20 Diff: http://git-wip-us.apache.org/repos/asf/jclouds/diff/05f64b20 Branch: refs/heads/2.0.x Commit: 05f64b20a8d757a06f03027231bc53e000717835 Parents: 6100b7c Author: Ignasi Barrera <[email protected]> Authored: Fri Apr 21 20:17:02 2017 +0200 Committer: Ignasi Barrera <[email protected]> Committed: Mon Apr 24 08:16:45 2017 +0200 ---------------------------------------------------------------------- .../concurrent/DynamicThreadPoolExecutor.java | 8 ++ .../DynamicThreadPoolExecutorTest.java | 142 +++++++++++++++++++ 2 files changed, 150 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/jclouds/blob/05f64b20/core/src/main/java/org/jclouds/concurrent/DynamicThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/jclouds/concurrent/DynamicThreadPoolExecutor.java b/core/src/main/java/org/jclouds/concurrent/DynamicThreadPoolExecutor.java index a032f85..a24eeb6 100644 --- a/core/src/main/java/org/jclouds/concurrent/DynamicThreadPoolExecutor.java +++ b/core/src/main/java/org/jclouds/concurrent/DynamicThreadPoolExecutor.java @@ -122,6 +122,10 @@ class DynamicThreadPoolExecutor extends ThreadPoolExecutor { */ static class ForceQueuePolicy implements RejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + if (executor.isShutdown()) { + throw new RejectedExecutionException("Rejected execution of task [" + r.getClass() + + "] since the executor is shutdown."); + } try { executor.getQueue().put(r); } catch (InterruptedException e) { @@ -147,6 +151,10 @@ class DynamicThreadPoolExecutor extends ThreadPoolExecutor { } public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + if (executor.isShutdown()) { + throw new RejectedExecutionException("Rejected execution of task [" + r.getClass() + + "] since the executor is shutdown."); + } try { boolean successful = executor.getQueue().offer(r, waitTime, TimeUnit.MILLISECONDS); if (!successful) http://git-wip-us.apache.org/repos/asf/jclouds/blob/05f64b20/core/src/test/java/org/jclouds/concurrent/DynamicThreadPoolExecutorTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/jclouds/concurrent/DynamicThreadPoolExecutorTest.java b/core/src/test/java/org/jclouds/concurrent/DynamicThreadPoolExecutorTest.java new file mode 100644 index 0000000..f34e48c --- /dev/null +++ b/core/src/test/java/org/jclouds/concurrent/DynamicThreadPoolExecutorTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.jclouds.concurrent; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.testng.annotations.Test; + +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.common.util.concurrent.Uninterruptibles; + +public class DynamicThreadPoolExecutorTest { + + @Test + public void testTasksAreEnqueuedIfQueueFull() throws InterruptedException, ExecutionException, TimeoutException { + DynamicThreadPoolExecutor executor = newExecutor(new DynamicThreadPoolExecutor.ForceQueuePolicy()); + try { + List<Task> tasks = ImmutableList.of(new Task(2), new Task(2), new Task(2), new Task(2)); + List<Future<?>> futures = new ArrayList<Future<?>>(); + for (Task task : tasks) { + futures.add(executor.submit(task)); + } + + for (Future<?> future : futures) { + future.get(5, TimeUnit.SECONDS); + } + } finally { + executor.shutdownNow(); + } + } + + @Test(expectedExceptions = RejectedExecutionException.class) + public void testTasksAreRejectedIfQueueFull() throws InterruptedException, ExecutionException, TimeoutException { + DynamicThreadPoolExecutor executor = newExecutor(new ThreadPoolExecutor.AbortPolicy()); + try { + for (int i = 0; i < executor.getMaximumPoolSize() + 4; i++) { + executor.submit(new Task(2)); + } + } finally { + executor.shutdownNow(); + } + } + + @Test + public void testTasksWaitForSpaceIfQueueFull() throws InterruptedException, ExecutionException, TimeoutException { + DynamicThreadPoolExecutor executor = newExecutor(new DynamicThreadPoolExecutor.TimedBlockingPolicy(5000)); + try { + List<Task> tasks = ImmutableList.of(new Task(2), new Task(2), new Task(2), new Task(2)); + List<Future<?>> futures = new ArrayList<Future<?>>(); + for (Task task : tasks) { + futures.add(executor.submit(task)); + } + + for (Future<?> future : futures) { + future.get(5, TimeUnit.SECONDS); + } + } finally { + executor.shutdownNow(); + } + } + + @Test(expectedExceptions = RejectedExecutionException.class) + public void testTasksAreRejectedIfExecutorIsShutdownAndPolicyIsForce() throws InterruptedException, + ExecutionException, TimeoutException { + DynamicThreadPoolExecutor executor = newExecutor(new DynamicThreadPoolExecutor.ForceQueuePolicy()); + try { + executor.submit(new Task(2)); + executor.shutdown(); + executor.submit(new Task(2)); + } finally { + executor.shutdownNow(); + } + } + + @Test(expectedExceptions = RejectedExecutionException.class) + public void testTasksAreRejectedIfExecutorIsShutdownAndPolicyIsWait() throws InterruptedException, + ExecutionException, TimeoutException { + DynamicThreadPoolExecutor executor = newExecutor(new DynamicThreadPoolExecutor.TimedBlockingPolicy(5000)); + try { + executor.submit(new Task(2)); + executor.shutdown(); + executor.submit(new Task(2)); + } finally { + executor.shutdownNow(); + } + } + + private static class Task implements Runnable { + private final AtomicInteger executions; + + public Task(int executions) { + this.executions = new AtomicInteger(executions); + } + + @Override + public void run() { + while (executions.decrementAndGet() >= 0) { + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + } + } + } + + public DynamicThreadPoolExecutor newExecutor(RejectedExecutionHandler rejectionPolicy) { + DynamicThreadPoolExecutor.DynamicQueue<Runnable> queue = new DynamicThreadPoolExecutor.DynamicQueue<Runnable>(); + DynamicThreadPoolExecutor executor = new DynamicThreadPoolExecutor(1, 1, 60000, TimeUnit.MILLISECONDS, queue, + namedThreadFactory("dyn-pool-test")); + executor.setRejectedExecutionHandler(rejectionPolicy); + queue.setThreadPoolExecutor(executor); + return executor; + } + + private ThreadFactory namedThreadFactory(String name) { + return new ThreadFactoryBuilder().setNameFormat(name).setThreadFactory(Executors.defaultThreadFactory()).build(); + } +}
