Repository: tomee Updated Branches: refs/heads/master 22e5a3715 -> 5ed406623
Stop thread before draining, atomic getAndSet once & pmd Project: http://git-wip-us.apache.org/repos/asf/tomee/repo Commit: http://git-wip-us.apache.org/repos/asf/tomee/commit/5ed40662 Tree: http://git-wip-us.apache.org/repos/asf/tomee/tree/5ed40662 Diff: http://git-wip-us.apache.org/repos/asf/tomee/diff/5ed40662 Branch: refs/heads/master Commit: 5ed406623d369eb343a3507e18ae9dac0ad51754 Parents: 22e5a37 Author: AndyGee <[email protected]> Authored: Wed Mar 8 17:23:41 2017 +0100 Committer: AndyGee <[email protected]> Committed: Wed Mar 8 17:23:41 2017 +0100 ---------------------------------------------------------------------- .../main/java/org/apache/openejb/util/Pool.java | 98 ++++++++++---------- 1 file changed, 49 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tomee/blob/5ed40662/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java ---------------------------------------------------------------------- diff --git a/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java b/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java index 299f841..a743978 100644 --- a/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java +++ b/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java @@ -28,7 +28,6 @@ import java.util.NoSuchElementException; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -152,15 +151,15 @@ public class Pool<T> { } public void stop() { - final ScheduledFuture<?> future = this.future.get(); - if (future != null && this.future.compareAndSet(future, null) + final ScheduledFuture<?> future = this.future.getAndSet(null); + if (future != null && !future.isDone() && !future.isCancelled() && !future.cancel(false)) { Logger.getLogger(Pool.class.getName()).log(Level.WARNING, "Pool scheduler task termination timeout expired"); } - final ScheduledExecutorService scheduler = this.scheduler.get(); - if (scheduler != null && this.scheduler.compareAndSet(scheduler, null)) { + final ScheduledExecutorService scheduler = this.scheduler.getAndSet(null); + if (scheduler != null) { scheduler.shutdown(); try { if (!scheduler.awaitTermination(10, SECONDS)) { // should last something like 0s max since we killed the task @@ -178,8 +177,8 @@ public class Pool<T> { private Executor createExecutor() { final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 10, - 60L, TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(2), new DaemonThreadFactory("org.apache.openejb.util.Pool", hashCode())); + 60L, SECONDS, + new LinkedBlockingQueue<Runnable>(2), new DaemonThreadFactory("org.apache.openejb.util.Pool", hashCode())); threadPoolExecutor.setRejectedExecutionHandler(new RejectedExecutionHandler() { @Override @@ -187,12 +186,12 @@ public class Pool<T> { if (null == r || null == tpe || tpe.isShutdown() || tpe.isTerminated() || tpe.isTerminating()) { return; - } + } try { - if (!tpe.getQueue().offer(r, 20, TimeUnit.SECONDS)) { + if (!tpe.getQueue().offer(r, 20, SECONDS)) { org.apache.openejb.util.Logger.getInstance(LogCategory.OPENEJB, "org.apache.openejb.util.resources") - .warning("Default pool executor failed to run asynchronous process: " + r); + .warning("Default pool executor failed to run asynchronous process: " + r); } } catch (final InterruptedException e) { //Ignore @@ -370,7 +369,7 @@ public class Pool<T> { try { if (entry == null) { - return added; + return false; } if (!sweeper) { @@ -467,29 +466,25 @@ public class Pool<T> { public boolean close(final long timeout, final TimeUnit unit) throws InterruptedException { - final ScheduledExecutorService ses = this.scheduler.getAndSet(null); + // Stop the sweeper thread + stop(); - try { - // drain all keys so no new instances will be accepted into the pool - while (instances.tryAcquire()) { - Thread.yield(); - } - while (minimum.tryAcquire()) { - Thread.yield(); - } - instances.drainPermits(); - minimum.drainPermits(); + // drain all keys so no new instances will be accepted into the pool + while (instances.tryAcquire()) { + Thread.yield(); + } - // flush and sweep - flush(); - try { - sweeper.run(); - } catch (final RejectedExecutionException e) { - //Ignore - } - } finally { - // Stop the sweeper thread - stop(); + while (minimum.tryAcquire()) { + Thread.yield(); + } + + // flush and sweep + flush(); + + try { + sweeper.run(); + } catch (final Exception ignore) { + //no-op } // Drain all leases @@ -497,9 +492,13 @@ public class Pool<T> { while (available.tryAcquire()) { Thread.yield(); } + available.drainPermits(); } + instances.drainPermits(); + minimum.drainPermits(); + // Wait for any pending discards return out.await(timeout, unit); } @@ -527,7 +526,7 @@ public class Pool<T> { } private static long now() { - return TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS); + return MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS); } public final class Entry { @@ -535,12 +534,12 @@ public class Pool<T> { private long used; private final int version; private final SoftReference<Instance> soft; - private final AtomicReference<Instance> hard = new AtomicReference<Instance>(); + private final AtomicReference<Instance> hard = new AtomicReference<>(); // Added this so the soft reference isn't collected // after the Entry instance is returned from a "pop" method // Also acts as an "inUse" boolean - private final AtomicReference<Instance> active = new AtomicReference<Instance>(); + private final AtomicReference<Instance> active = new AtomicReference<>(); /** * Constructor is private so that it is impossible for an Entry object @@ -560,8 +559,8 @@ public class Pool<T> { } final Instance instance = new Instance(obj); this.soft = garbageCollection ? - new SoftReference<Instance>(instance) : - new HardReference<Instance>(instance); + new SoftReference<>(instance) : + new HardReference<>(instance); this.version = poolVersion.get(); this.active.set(instance); this.created = now() + offset; @@ -601,11 +600,11 @@ public class Pool<T> { public String toString() { final long now = now(); return "Entry{" + - "min=" + (hard.get() != null) + - ", age=" + (now - created) + - ", idle=" + (now - used) + - ", bean=" + soft.get() + - '}'; + "min=" + (hard.get() != null) + + ", age=" + (now - created) + + ", idle=" + (now - used) + + ", bean=" + soft.get() + + '}'; } private class Discarded implements Runnable { @@ -676,14 +675,14 @@ public class Pool<T> { final long now = now(); - final List<Entry> entries = new ArrayList<Entry>(max); + final List<Entry> entries = new ArrayList<>(max); // Pull all the entries from the pool try { while (true) { final Entry entry = pop(0, MILLISECONDS, false); if (entry == null) { - push(entry, true); + push(null, true); break; } entries.add(entry); @@ -694,7 +693,7 @@ public class Pool<T> { // pool has been drained } - final List<Expired> expiredList = new ArrayList<Expired>(max); + final List<Expired> expiredList = new ArrayList<>(max); { // Expire aged instances, enforce pool "versioning" @@ -784,7 +783,7 @@ public class Pool<T> { // If there are any "min" pool instances left over // we need to queue up creation of a replacement - final List<Expired> replace = new ArrayList<Expired>(); + final List<Expired> replace = new ArrayList<>(); for (final Expired expired : expiredList) { executor.execute(expired.entry.active().discard(expired.event)); @@ -795,14 +794,14 @@ public class Pool<T> { } for (int i = 0; i < replace.size(); i++) { - final long offset = maxAge > 0 ? (long) (maxAge / replace.size() * i * maxAgeOffset) % maxAge : 0l; + final long offset = maxAge > 0 ? (long) (maxAge / replace.size() * i * maxAgeOffset) % maxAge : 0L; executor.execute(new Replace(replace.get(i).entry, offset)); } } } - public static enum Event { + public enum Event { FULL, IDLE, AGED, FLUSHED, GC } @@ -1118,7 +1117,7 @@ public class Pool<T> { private Duration maxAge = new Duration(0, MILLISECONDS); private double maxAgeOffset = -1; private Duration idleTimeout = new Duration(0, MILLISECONDS); - private Duration interval = new Duration(5 * 60, TimeUnit.SECONDS); + private Duration interval = new Duration(5 * 60, SECONDS); private Supplier<T> supplier; private Executor executor; private ScheduledExecutorService scheduledExecutorService; @@ -1237,6 +1236,7 @@ public class Pool<T> { this.scheduledExecutorService = scheduledExecutorService; } + @SuppressWarnings("unchecked") public Pool<T> build() { //noinspection unchecked final Pool pool = new Pool(max, min, strict, maxAge.getTime(MILLISECONDS), idleTimeout.getTime(MILLISECONDS), interval.getTime(MILLISECONDS), executor, supplier, replaceAged, maxAgeOffset, this.garbageCollection, replaceFlushed);
