Backport pool start/stop
Project: http://git-wip-us.apache.org/repos/asf/tomee/repo Commit: http://git-wip-us.apache.org/repos/asf/tomee/commit/39b42eb6 Tree: http://git-wip-us.apache.org/repos/asf/tomee/tree/39b42eb6 Diff: http://git-wip-us.apache.org/repos/asf/tomee/diff/39b42eb6 Branch: refs/heads/tomee-1.7.x Commit: 39b42eb633d2d59a484c7ea78200698c3c6d56e5 Parents: ffea29d Author: AndyGee <[email protected]> Authored: Fri Mar 3 21:11:07 2017 +0100 Committer: AndyGee <[email protected]> Committed: Fri Mar 3 21:11:07 2017 +0100 ---------------------------------------------------------------------- .../main/java/org/apache/openejb/util/Pool.java | 51 ++++++++++++++------ .../java/org/apache/openejb/util/PoolTest.java | 1 + 2 files changed, 38 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tomee/blob/39b42eb6/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java ---------------------------------------------------------------------- diff --git a/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java b/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java index 1e6895e..f87fd52 100644 --- a/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java +++ b/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java @@ -31,6 +31,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -43,6 +44,7 @@ import java.util.logging.Level; import java.util.logging.Logger; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; /** * Any successful pop() call requires a corresponding push() or discard() call. @@ -74,6 +76,7 @@ public class Pool<T> { private final Supplier<T> supplier; private final AtomicReference<ScheduledExecutorService> scheduler = new AtomicReference<ScheduledExecutorService>(); + private final AtomicReference<ScheduledFuture<?>> future = new AtomicReference<ScheduledFuture<?>>(); private final Sweeper sweeper; private final CountingLatch out = new CountingLatch(); @@ -127,21 +130,41 @@ public class Pool<T> { } public Pool start() { - final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1, new SchedulerThreadFactory()); - - if (this.scheduler.compareAndSet(null, scheduledExecutorService)) { - this.scheduler.get().scheduleAtFixedRate(sweeper, 0, this.sweepInterval, MILLISECONDS); + ScheduledExecutorService scheduledExecutorService = this.scheduler.get(); + boolean createdSES = scheduledExecutorService == null; + if (scheduledExecutorService == null) { + scheduledExecutorService = Executors.newScheduledThreadPool(1, new SchedulerThreadFactory()); + if (!this.scheduler.compareAndSet(null, scheduledExecutorService)) { + scheduledExecutorService.shutdownNow(); + scheduledExecutorService = this.scheduler.get(); + createdSES = false; + } + } + final ScheduledFuture<?> scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(sweeper, 0, this.sweepInterval, MILLISECONDS); + if (!this.future.compareAndSet(null, scheduledFuture)) { + scheduledFuture.cancel(true); + } + if (!createdSES) { + // we don't want to shutdown it, we'll just stop the task + this.scheduler.set(null); } return this; } public void stop() { - final ScheduledExecutorService scheduler = this.scheduler.get(); - if (scheduler != null && this.scheduler.compareAndSet(scheduler, null)) { + final ScheduledFuture<?> future = this.future.getAndSet(null); + if (future != null + && !future.isDone() && !future.isCancelled() + && !future.cancel(false)) { + Logger.getLogger(Pool.class.getName()).log(Level.WARNING, "Pool scheduler task termination timeout expired"); + } + + final ScheduledExecutorService scheduler = this.scheduler.getAndSet(null); + if (scheduler != null) { scheduler.shutdown(); try { - if (!scheduler.awaitTermination(10000, MILLISECONDS)) { - Logger.getLogger(this.getClass().getName()).log(Level.WARNING, "Pool scheduler termination timeout expired"); + if (!scheduler.awaitTermination(10, SECONDS)) { // should last something like 0s max since we killed the task + Logger.getLogger(Pool.class.getName()).log(Level.WARNING, "Pool scheduler termination timeout expired"); } } catch (final InterruptedException e) { //Ignore @@ -150,7 +173,7 @@ public class Pool<T> { } public boolean running() { - return this.scheduler.get() != null; + return this.future.get() != null; } private Executor createExecutor() { @@ -444,6 +467,9 @@ public class Pool<T> { public boolean close(final long timeout, final TimeUnit unit) throws InterruptedException { + // Stop the sweeper thread + stop(); + // drain all keys so no new instances will be accepted into the pool while (instances.tryAcquire()) { Thread.yield(); @@ -461,9 +487,6 @@ public class Pool<T> { //Ignore } - // Stop the sweeper thread - stop(); - // Drain all leases if (!(available instanceof Overdraft)) { while (available.tryAcquire()) { @@ -659,7 +682,7 @@ public class Pool<T> { while (true) { final Entry entry = pop(0, MILLISECONDS, false); if (entry == null) { - push(entry, true); + push(null, true); break; } entries.add(entry); @@ -771,7 +794,7 @@ public class Pool<T> { } for (int i = 0; i < replace.size(); i++) { - final long offset = maxAge > 0 ? (long) (maxAge / replace.size() * i * maxAgeOffset) % maxAge : 0l; + final long offset = maxAge > 0 ? (long) (maxAge / replace.size() * i * maxAgeOffset) % maxAge : 0L; executor.execute(new Replace(replace.get(i).entry, offset)); } } http://git-wip-us.apache.org/repos/asf/tomee/blob/39b42eb6/container/openejb-core/src/test/java/org/apache/openejb/util/PoolTest.java ---------------------------------------------------------------------- diff --git a/container/openejb-core/src/test/java/org/apache/openejb/util/PoolTest.java b/container/openejb-core/src/test/java/org/apache/openejb/util/PoolTest.java index 6dc08cd..fbde94f 100644 --- a/container/openejb-core/src/test/java/org/apache/openejb/util/PoolTest.java +++ b/container/openejb-core/src/test/java/org/apache/openejb/util/PoolTest.java @@ -321,6 +321,7 @@ public class PoolTest extends TestCase { final long start = System.currentTimeMillis(); assertTrue(pool.close(10, TimeUnit.SECONDS)); + assertFalse(pool.running()); final long time = System.currentTimeMillis() - start; // All instances should have been removed
