This is an automated email from the ASF dual-hosted git repository. burcham pushed a commit to branch support/1.12 in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.12 by this push: new 6fec62f GEODE-7884: server hangs due to IllegalStateException (#4822) 6fec62f is described below commit 6fec62ff7b4b6ebc4f0f8079fcd67a2b0c3919b0 Author: Bruce Schuchardt <bschucha...@pivotal.io> AuthorDate: Fri Mar 20 09:05:24 2020 -0700 GEODE-7884: server hangs due to IllegalStateException (#4822) * GEODE-7884: server hangs due to IllegalStateException Added cancellation check before scheduling an idle-timeout or ack-wait-threshold timer task. I had to add a new method to SystemTimerTask and then noticed there were no tests for SystemTimer, so I cleaned up that class and added tests. * adding missing copyright header to new test * fixing LGTM issues * reinstating 'continue' when encountering a null timer during a sweep * addressing Bill's comments renamed swarm everwhere made the collection of timers associated with a DistributedSystem into a Set made timer task variables in Connection volatile added checks in tasks to cancel themselves if their Connection is closed (cherry picked from commit 2d2a3f80bd5053749963889c1898df48e9aa0be7) --- .../internal/InternalDistributedSystem.java | 2 +- .../org/apache/geode/internal/SystemTimer.java | 370 +++++++-------------- .../geode/internal/admin/StatAlertsManager.java | 2 +- .../geode/internal/cache/ExpirationScheduler.java | 2 +- .../geode/internal/cache/GemFireCacheImpl.java | 2 +- .../cache/partitioned/PRSanityCheckMessage.java | 2 +- .../internal/cache/tier/sockets/AcceptorImpl.java | 2 +- .../org/apache/geode/internal/tcp/Connection.java | 30 +- .../apache/geode/internal/tcp/ConnectionTable.java | 22 +- .../org/apache/geode/internal/SystemTimerTest.java | 162 +++++++++ 10 files changed, 329 insertions(+), 267 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java index 823844f..e97bd02 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java @@ -1625,7 +1625,7 @@ public class InternalDistributedSystem extends DistributedSystem // bug 38501: this has to happen *after* // the DM is closed :-( if (!preparingForReconnect) { - SystemTimer.cancelSwarm(this); + SystemTimer.cancelTimers(this); } } // finally timer cancelled } // finally dm closed diff --git a/geode-core/src/main/java/org/apache/geode/internal/SystemTimer.java b/geode-core/src/main/java/org/apache/geode/internal/SystemTimer.java index 9ce3525..7eddf43 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/SystemTimer.java +++ b/geode-core/src/main/java/org/apache/geode/internal/SystemTimer.java @@ -15,32 +15,30 @@ package org.apache.geode.internal; import java.lang.ref.WeakReference; -import java.text.SimpleDateFormat; -import java.util.ArrayList; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.Map; +import java.util.Set; import java.util.Timer; import java.util.TimerTask; import org.apache.logging.log4j.Logger; import org.apache.geode.CancelException; -import org.apache.geode.SystemFailure; import org.apache.geode.annotations.internal.MakeNotStatic; -import org.apache.geode.distributed.internal.InternalDistributedSystem; +import org.apache.geode.distributed.DistributedSystem; import org.apache.geode.logging.internal.log4j.api.LogService; /** - * Instances of this class are like {@link Timer}, but are associated with a "swarm", which can be - * cancelled as a group with {@link #cancelSwarm(Object)}. + * Instances of this class are like {@link Timer}, but are associated with a DistributedSystem, + * which can be + * cancelled as a group with {@link #cancelTimers(DistributedSystem)}. * * @see Timer * @see TimerTask * - * TODO -- with Java 1.5, this will be a template type so that the swarm's class can be - * specified. */ public class SystemTimer { private static final Logger logger = LogService.getLogger(); @@ -49,12 +47,6 @@ public class SystemTimer { "IBM Corporation".equals(System.getProperty("java.vm.vendor")); /** - * Extra debugging for this class - */ - // private static final boolean DEBUG = true; - static final boolean DEBUG = false; - - /** * the underlying {@link Timer} */ private final Timer timer; @@ -62,119 +54,106 @@ public class SystemTimer { /** * True if this timer has been cancelled */ - private boolean cancelled = false; + private volatile boolean cancelled = false; /** - * the swarm to which this timer belongs + * the DistributedSystem to which this timer belongs */ - private final Object /* T */ swarm; + private final DistributedSystem distributedSystem; @Override public String toString() { - StringBuffer sb = new StringBuffer(); - sb.append("SystemTimer["); - sb.append("swarm = " + swarm); - // sb.append("; timer = " + timer); - sb.append("]"); - return sb.toString(); + return "SystemTimer[" + + "system = " + distributedSystem + + "]"; } /** - * List of all of the swarms in the system - * - * @guarded.By self + * Map of all of the timers in the system */ - // <T, HashMap<Object, ArrayList<WeakReference<SystemTimer>>>> @MakeNotStatic - private static final HashMap allSwarms = new HashMap(); + private static final HashMap<DistributedSystem, Set<WeakReference<SystemTimer>>> distributedSystemTimers = + new HashMap<>(); /** - * Add the given timer is in the given swarm. Used only by constructors. + * Add the given timer is in the given DistributedSystem. Used only by constructors. * - * @param swarm swarm to add the timer to - * @param t timer to add + * @param system DistributedSystem to add the timer to + * @param systemTimer timer to add */ - private static void addToSwarm(Object /* T */ swarm, SystemTimer t) { - final boolean isDebugEnabled = logger.isTraceEnabled(); - // Get or add list of timers for this swarm... - ArrayList /* ArrayList<WeakReference<SystemTimer>> */ swarmSet; - synchronized (allSwarms) { - swarmSet = (ArrayList) allSwarms.get(swarm); - if (swarmSet == null) { - if (isDebugEnabled) { - logger.trace("SystemTimer#addToSwarm: created swarm {}", swarm); - } - swarmSet = new ArrayList(); - allSwarms.put(swarm, swarmSet); + private static void addTimer(DistributedSystem system, SystemTimer systemTimer) { + Set<WeakReference<SystemTimer>> timers; + synchronized (distributedSystemTimers) { + timers = distributedSystemTimers.get(system); + if (timers == null) { + timers = new HashSet<>(); + distributedSystemTimers.put(system, timers); } - } // synchronized + } - // Add the timer to the swarm's list - if (isDebugEnabled) { - logger.trace("SystemTimer#addToSwarm: adding timer <{}>", t); + WeakReference<SystemTimer> wr = new WeakReference<>(systemTimer); + synchronized (timers) { + timers.add(wr); + } + } + + /** + * Return the current number of DistributedSystems with timers + */ + public static int distributedSystemCount() { + synchronized (distributedSystemTimers) { + return distributedSystemTimers.size(); } - WeakReference /* WeakReference<SystemTimer> */ wr = new WeakReference(t); - synchronized (swarmSet) { - swarmSet.add(wr); - } // synchronized } /** * time that the last sweep was done * - * @see #sweepAllSwarms + * @see #sweepAllTimers */ @MakeNotStatic private static long lastSweepAllTime = 0; /** - * Interval, in milliseconds, to sweep all swarms, measured from when the last sweep finished + * Interval, in milliseconds, to sweep all timers, measured from when the last sweep finished * - * @see #sweepAllSwarms + * @see #sweepAllTimers */ private static final long SWEEP_ALL_INTERVAL = 2 * 60 * 1000; // 2 minutes /** - * Manually garbage collect {@link #allSwarms}, if it hasn't happened in a while. + * Manually garbage collect {@link #distributedSystemTimers}, if it hasn't happened in a while. * * @see #lastSweepAllTime */ - private static void sweepAllSwarms() { + private static void sweepAllTimers() { if (System.currentTimeMillis() < lastSweepAllTime + SWEEP_ALL_INTERVAL) { // Too soon. return; } final boolean isDebugEnabled = logger.isTraceEnabled(); - synchronized (allSwarms) { - Iterator it = allSwarms.entrySet().iterator(); - while (it.hasNext()) { // iterate over allSwarms - Map.Entry entry = (Map.Entry) it.next(); - ArrayList swarm = (ArrayList) entry.getValue(); - synchronized (swarm) { - Iterator it2 = swarm.iterator(); - while (it2.hasNext()) { // iterate over current swarm - WeakReference wr = (WeakReference) it2.next(); - SystemTimer st = (SystemTimer) wr.get(); - if (st == null) { - // Remove stale reference - it2.remove(); - continue; + synchronized (distributedSystemTimers) { + Iterator<Map.Entry<DistributedSystem, Set<WeakReference<SystemTimer>>>> allSystemsIterator = + distributedSystemTimers.entrySet().iterator(); + while (allSystemsIterator.hasNext()) { + Map.Entry<DistributedSystem, Set<WeakReference<SystemTimer>>> entry = + allSystemsIterator.next(); + Set<WeakReference<SystemTimer>> timers = entry.getValue(); + synchronized (timers) { + Iterator<WeakReference<SystemTimer>> timersIterator = timers.iterator(); + while (timersIterator.hasNext()) { + WeakReference<SystemTimer> wr = timersIterator.next(); + SystemTimer st = wr.get(); + if (st == null || st.isCancelled()) { + timersIterator.remove(); } - // Get rid of a cancelled timer; it's not interesting. - if (st.cancelled) { - it2.remove(); - continue; - } - } // iterate over current swarm - if (swarm.size() == 0) { // Remove unused swarm - it.remove(); - if (isDebugEnabled) { - logger.trace("SystemTimer#sweepAllSwarms: removed unused swarm {}", entry.getKey()); - } - } // Remove unused swarm - } // synchronized swarm - } // iterate over allSwarms - } // synchronized allSwarms + } + if (timers.size() == 0) { + allSystemsIterator.remove(); + } + } + } + } // Collect time at END of sweep. It means an extra call to the system // timer, but makes this potentially less active. @@ -182,104 +161,75 @@ public class SystemTimer { } /** - * Remove given timer from the swarm. + * Remove given timer. * - * @param t timer to remove + * @param timerToRemove timer to remove * * @see #cancel() */ - private static void removeFromSwarm(SystemTimer t) { - final boolean isDebugEnabled = logger.isTraceEnabled(); - synchronized (allSwarms) { - // Get timer's swarm - ArrayList swarmSet = (ArrayList) allSwarms.get(t.swarm); - if (swarmSet == null) { - if (isDebugEnabled) { - logger.trace("SystemTimer#removeFromSwarm: timer already removed: {}", t); - } + private static void removeTimer(SystemTimer timerToRemove) { + synchronized (distributedSystemTimers) { + // Get the timers for the distributed system + Set<WeakReference<SystemTimer>> timers = + distributedSystemTimers.get(timerToRemove.distributedSystem); + if (timers == null) { return; // already gone } - // Remove timer from swarm - if (isDebugEnabled) { - logger.trace("SystemTimer#removeFromSwarm: removing timer <{}>", t); - } - synchronized (swarmSet) { - Iterator it = swarmSet.iterator(); - while (it.hasNext()) { - WeakReference ref = (WeakReference) it.next(); - SystemTimer t2 = (SystemTimer) ref.get(); - if (t2 == null) { - // Since we've discovered an empty reference, we should remove it. - it.remove(); - continue; - } - if (t2 == t) { - it.remove(); - // Don't keep sweeping once we've found it; just quit. + synchronized (timers) { + Iterator<WeakReference<SystemTimer>> timersIterator = timers.iterator(); + while (timersIterator.hasNext()) { + WeakReference<SystemTimer> ref = timersIterator.next(); + SystemTimer timer = ref.get(); + if (timer == null) { + timersIterator.remove(); + } else if (timer == timerToRemove) { + timersIterator.remove(); break; + } else if (timer.isCancelled()) { + timersIterator.remove(); } - if (t2.cancelled) { - // But if we happen to run across a cancelled timer, - // remove it. - it.remove(); - continue; - } - } // while - - // While we're here, if the swarm has gone to zero size, - // we should remove it. - if (swarmSet.size() == 0) { - allSwarms.remove(t.swarm); // last reference - if (isDebugEnabled) { - logger.trace("SystemTimer#removeFromSwarm: removed last reference to {}", t.swarm); - } } - } // synchronized swarmSet - } // synchronized allSwarms + if (timers.size() == 0) { + distributedSystemTimers.remove(timerToRemove.distributedSystem); // last reference + } + } + } - sweepAllSwarms(); // Occasionally check global list, use any available logger :-) + sweepAllTimers(); // Occasionally check global list } /** * Cancel all outstanding timers * - * @param swarm the swarm to cancel + * @param system the DistributedSystem whose timers should be cancelled */ - public static void cancelSwarm(Object /* T */ swarm) { - Assert.assertTrue(swarm instanceof InternalDistributedSystem); // TODO - // Find the swarmSet and remove it - ArrayList swarmSet; - synchronized (allSwarms) { - swarmSet = (ArrayList) allSwarms.get(swarm); - if (swarmSet == null) { + public static void cancelTimers(DistributedSystem system) { + Set<WeakReference<SystemTimer>> timers; + synchronized (distributedSystemTimers) { + timers = distributedSystemTimers.get(system); + if (timers == null) { return; // already cancelled } // Remove before releasing synchronization, so any fresh timer ends up // in a new set with same key - allSwarms.remove(swarmSet); + distributedSystemTimers.remove(system); } // synchronized - // Empty the swarmSet - synchronized (swarmSet) { - Iterator it = swarmSet.iterator(); - while (it.hasNext()) { - WeakReference wr = (WeakReference) it.next(); - SystemTimer st = (SystemTimer) wr.get(); + // cancel all of the timers + synchronized (timers) { + for (WeakReference<SystemTimer> wr : timers) { + SystemTimer st = wr.get(); // it.remove(); Not necessary, we're emptying the list... if (st != null) { st.cancelled = true; // for safety :-) st.timer.cancel(); // st.cancel() would just search for it again } - } // while - } // synchronized + } + } } public int timerPurge() { - if (logger.isTraceEnabled()) { - logger.trace("SystemTimer#timerPurge of {}", this); - } - // Fix 39585, IBM's java.util.timer's purge() has stack overflow issue if (isIBM) { return 0; @@ -287,44 +237,14 @@ public class SystemTimer { return this.timer.purge(); } - // This creates a non-daemon timer thread. We don't EVER do this... - // /** - // * @see Timer#Timer() - // * - // * @param swarm the swarm this timer belongs to - // */ - // public SystemTimer(DistributedSystem swarm) { - // this.timer = new Timer(); - // this.swarm = swarm; - // addToSwarm(swarm, this); - // } - /** * @see Timer#Timer(boolean) - * @param swarm the swarm this timer belongs to, currently must be a DistributedSystem - * @param isDaemon whether the timer is a daemon. Must be true for GemFire use. + * @param distributedSystem the DistributedSystem to which this timer belongs */ - public SystemTimer(Object /* T */ swarm, boolean isDaemon) { - Assert.assertTrue(isDaemon); // we don't currently allow non-daemon timers - Assert.assertTrue(swarm instanceof InternalDistributedSystem, - "Attempt to create swarm on " + swarm); // TODO allow template class? - this.timer = new Timer(isDaemon); - this.swarm = swarm; - addToSwarm(swarm, this); - } - - /** - * @param name the name to give the timer thread - * @param swarm the swarm this timer belongs to, currently must be a DistributedMember - * @param isDaemon whether the timer is a daemon. Must be true for GemFire use. - */ - public SystemTimer(String name, Object /* T */ swarm, boolean isDaemon) { - Assert.assertTrue(isDaemon); // we don't currently allow non-daemon timers - Assert.assertTrue(swarm instanceof InternalDistributedSystem, - "Attempt to create swarm on " + swarm); // TODO allow template class? - this.timer = new Timer(name, isDaemon); - this.swarm = swarm; - addToSwarm(swarm, this); + public SystemTimer(DistributedSystem distributedSystem) { + this.timer = new Timer(true); + this.distributedSystem = distributedSystem; + addTimer(distributedSystem, this); } private void checkCancelled() throws IllegalStateException { @@ -338,12 +258,6 @@ public class SystemTimer { */ public void schedule(SystemTimerTask task, long delay) { checkCancelled(); - if (logger.isTraceEnabled()) { - Date tilt = new Date(System.currentTimeMillis() + delay); - SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); - logger.trace("SystemTimer#schedule (long): {}: expect task {} to fire around {}", this, task, - sdf.format(tilt)); - } timer.schedule(task, delay); } @@ -352,39 +266,13 @@ public class SystemTimer { */ public void schedule(SystemTimerTask task, Date time) { checkCancelled(); - if (logger.isTraceEnabled()) { - SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); - logger.trace("SystemTimer#schedule (Date): {}: expect task {} to fire around {}", this, task, - sdf.format(time)); - } timer.schedule(task, time); } - // Not currently used, so don't complicate things - // /** - // * @see Timer#schedule(TimerTask, long, long) - // */ - // public void schedule(SystemTimerTask task, long delay, long period) { - // // TODO add debug statement - // checkCancelled(); - // timer.schedule(task, delay, period); - // } - - // Not currently used, so don't complicate things - // /** - // * @see Timer#schedule(TimerTask, Date, long) - // */ - // public void schedule(SystemTimerTask task, Date firstTime, long period) { - // // TODO add debug statement - // checkCancelled(); - // timer.schedule(task, firstTime, period); - // } - /** * @see Timer#scheduleAtFixedRate(TimerTask, long, long) */ public void scheduleAtFixedRate(SystemTimerTask task, long delay, long period) { - // TODO add debug statement checkCancelled(); timer.scheduleAtFixedRate(task, delay, period); } @@ -393,30 +281,24 @@ public class SystemTimer { * @see Timer#schedule(TimerTask, long, long) */ public void schedule(SystemTimerTask task, long delay, long period) { - // TODO add debug statement checkCancelled(); timer.schedule(task, delay, period); } - // Not currently used, so don't complicate things - // /** - // * @see Timer#scheduleAtFixedRate(TimerTask, Date, long) - // */ - // public void scheduleAtFixedRate(SystemTimerTask task, Date firstTime, - // long period) { - // // TODO add debug statement - // checkCancelled(); - // timer.scheduleAtFixedRate(task, firstTime, period); - // } - - /** * @see Timer#cancel() */ public void cancel() { this.cancelled = true; timer.cancel(); - removeFromSwarm(this); + removeTimer(this); + } + + /** + * has this timer been cancelled? + */ + public boolean isCancelled() { + return cancelled; } /** @@ -426,6 +308,17 @@ public class SystemTimer { */ public abstract static class SystemTimerTask extends TimerTask { protected static final Logger logger = LogService.getLogger(); + private volatile boolean cancelled; + + public boolean isCancelled() { + return cancelled; + } + + @Override + public boolean cancel() { + cancelled = true; + return super.cancel(); + } /** * This is your executed action @@ -437,25 +330,14 @@ public class SystemTimer { */ @Override public void run() { - final boolean isDebugEnabled = logger.isTraceEnabled(); - if (isDebugEnabled) { - logger.trace("SystemTimer.MyTask: starting {}", this); - } try { this.run2(); } catch (CancelException ignore) { // ignore: TimerThreads can fire during or near cache closure - } catch (VirtualMachineError e) { - SystemFailure.initiateFailure(e); - throw e; } catch (Throwable t) { - SystemFailure.checkFailure(); logger.warn(String.format("Timer task <%s> encountered exception", this), t); // Don't rethrow, it will just get eaten and kill the timer } - if (isDebugEnabled) { - logger.trace("SystemTimer.MyTask: finished {}", this); - } } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/admin/StatAlertsManager.java b/geode-core/src/main/java/org/apache/geode/internal/admin/StatAlertsManager.java index 7fbbcb3..0205339 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/admin/StatAlertsManager.java +++ b/geode-core/src/main/java/org/apache/geode/internal/admin/StatAlertsManager.java @@ -175,7 +175,7 @@ public class StatAlertsManager { "This manager has been cancelled"); } // start and schedule new timer - timer = new SystemTimer(system /* swarm */, true); + timer = new SystemTimer(system /* swarm */); EvaluateAlertDefnsTask task = new EvaluateAlertDefnsTask(); if (refreshAtFixedRate) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ExpirationScheduler.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ExpirationScheduler.java index e4bf8c9..0698e26 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/ExpirationScheduler.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ExpirationScheduler.java @@ -38,7 +38,7 @@ public class ExpirationScheduler { .getInteger(GeodeGlossary.GEMFIRE_PREFIX + "MAX_PENDING_CANCELS", 10000).intValue(); public ExpirationScheduler(InternalDistributedSystem ds) { - this.timer = new SystemTimer(ds, true); + this.timer = new SystemTimer(ds); } public void forcePurge() { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java index a710b95..78c0217 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java @@ -890,7 +890,7 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has TypeRegistry::new, HARegionQueue::setMessageSyncInterval, FunctionService::registerFunction, - object -> new SystemTimer(object, true), + object -> new SystemTimer((DistributedSystem) object), TombstoneService::initialize, ExpirationScheduler::new, DiskStoreMonitor::new, diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PRSanityCheckMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PRSanityCheckMessage.java index 596429b..1d87d1d 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PRSanityCheckMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PRSanityCheckMessage.java @@ -124,7 +124,7 @@ public class PRSanityCheckMessage extends PartitionMessage { int sanityCheckInterval = Integer .getInteger(GeodeGlossary.GEMFIRE_PREFIX + "PRSanityCheckInterval", 5000).intValue(); if (sanityCheckInterval != 0) { - final SystemTimer tm = new SystemTimer(dm.getSystem(), true); + final SystemTimer tm = new SystemTimer(dm.getSystem()); SystemTimer.SystemTimerTask st = new SystemTimer.SystemTimerTask() { @Override public void run2() { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java index b4a475e..de1de70 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java @@ -489,7 +489,7 @@ public class AcceptorImpl implements Acceptor, Runnable { tmp_q = new LinkedBlockingQueue<>(); tmp_commQ = new LinkedBlockingQueue<>(); tmp_hs = new HashSet<>(512); - tmp_timer = new SystemTimer(internalCache.getDistributedSystem(), true); + tmp_timer = new SystemTimer(internalCache.getDistributedSystem()); } selector = tmp_s; selectorQueue = tmp_q; diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java index 1ec7a06..45d56db 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java @@ -155,7 +155,7 @@ public class Connection implements Runnable { /** * The idle timeout timer task for this connection */ - private SystemTimerTask idleTask; + private volatile SystemTimerTask idleTask; /** * If true then readers for thread owned sockets will send all messages on thread owned senders. @@ -286,7 +286,7 @@ public class Connection implements Runnable { /** * task for detecting ack timeouts and issuing alerts */ - private SystemTimer.SystemTimerTask ackTimeoutTask; + private volatile SystemTimer.SystemTimerTask ackTimeoutTask; /** * millisecond clock at the time message transmission started, if doing forced-disconnect @@ -1433,11 +1433,15 @@ public class Connection implements Runnable { // This cancels the idle timer task, but it also removes the tasks reference to this connection, // freeing up the connection (and it's buffers for GC sooner. if (idleTask != null) { - idleTask.cancel(); + synchronized (idleTask) { + idleTask.cancel(); + } } if (ackTimeoutTask != null) { - ackTimeoutTask.cancel(); + synchronized (ackTimeoutTask) { + ackTimeoutTask.cancel(); + } } } @@ -1935,7 +1939,13 @@ public class Connection implements Runnable { ackTimeoutTask = new SystemTimer.SystemTimerTask() { @Override public void run2() { + if (isSocketClosed()) { + // Connection is closing - nothing to do anymore + cancel(); + return; + } if (owner.isClosed()) { + cancel(); return; } byte connState; @@ -1980,10 +1990,14 @@ public class Connection implements Runnable { synchronized (owner) { SystemTimer timer = owner.getIdleConnTimer(); if (timer != null) { - if (msSA > 0) { - timer.scheduleAtFixedRate(ackTimeoutTask, msAW, Math.min(msAW, msSA)); - } else { - timer.schedule(ackTimeoutTask, msAW); + synchronized (ackTimeoutTask) { + if (!ackTimeoutTask.isCancelled()) { + if (msSA > 0) { + timer.scheduleAtFixedRate(ackTimeoutTask, msAW, Math.min(msAW, msSA)); + } else { + timer.schedule(ackTimeoutTask, msAW); + } + } } } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java index 113d91a..0c8a9f9 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java @@ -206,7 +206,7 @@ public class ConnectionTable { private ConnectionTable(TCPConduit conduit) { owner = conduit; idleConnTimer = owner.idleConnectionTimeout != 0 - ? new SystemTimer(conduit.getDM().getSystem(), true) : null; + ? new SystemTimer(conduit.getDM().getSystem()) : null; threadConnMaps = new ArrayList(); threadConnectionMap = new ConcurrentHashMap(); p2pReaderThreadPool = createThreadPoolForIO(conduit.getDM().getSystem().isShareSockets()); @@ -526,8 +526,12 @@ public class ConnectionTable { if (!closed) { IdleConnTT task = new IdleConnTT(conn); conn.setIdleTimeoutTask(task); - getIdleConnTimer().scheduleAtFixedRate(task, owner.idleConnectionTimeout, - owner.idleConnectionTimeout); + synchronized (task) { + if (!task.isCancelled()) { + getIdleConnTimer().scheduleAtFixedRate(task, owner.idleConnectionTimeout, + owner.idleConnectionTimeout); + } + } } } } catch (IllegalStateException e) { @@ -627,7 +631,7 @@ public class ConnectionTable { return null; } if (idleConnTimer == null) { - idleConnTimer = new SystemTimer(getDM().getSystem(), true); + idleConnTimer = new SystemTimer(getDM().getSystem()); } return idleConnTimer; } @@ -1212,25 +1216,25 @@ public class ConnectionTable { private static class IdleConnTT extends SystemTimer.SystemTimerTask { - private Connection c; + private Connection connection; private IdleConnTT(Connection c) { - this.c = c; + this.connection = c; } @Override public boolean cancel() { - Connection con = c; + Connection con = connection; if (con != null) { con.cleanUpOnIdleTaskCancel(); } - c = null; + connection = null; return super.cancel(); } @Override public void run2() { - Connection con = c; + Connection con = connection; if (con != null) { if (con.checkForIdleTimeout()) { cancel(); diff --git a/geode-core/src/test/java/org/apache/geode/internal/SystemTimerTest.java b/geode-core/src/test/java/org/apache/geode/internal/SystemTimerTest.java new file mode 100644 index 0000000..00f7335 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/SystemTimerTest.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal; + +import static org.apache.geode.test.awaitility.GeodeAwaitility.await; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.mock; + +import java.util.Date; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import org.apache.geode.distributed.DistributedSystem; + +public class SystemTimerTest { + + private DistributedSystem distributedSystem; + private SystemTimer systemTimer; + + @Before + public void setup() { + this.distributedSystem = mock(DistributedSystem.class); + this.systemTimer = new SystemTimer(distributedSystem); + } + + @After + public void teardown() { + if (!systemTimer.isCancelled()) { + systemTimer.cancel(); + } + } + + @Test + public void cancelTimer() { + assertThat(systemTimer.isCancelled()).isFalse(); + int initialSystemCount = SystemTimer.distributedSystemCount(); + SystemTimer.cancelTimers(distributedSystem); + assertThat(systemTimer.isCancelled()).isTrue(); + assertThat(SystemTimer.distributedSystemCount()).isEqualTo(initialSystemCount - 1); + } + + @Test + public void cancel() { + assertThat(systemTimer.isCancelled()).isFalse(); + systemTimer.cancel(); + assertThat(systemTimer.isCancelled()).isTrue(); + } + + @Test + public void scheduleNow() { + AtomicBoolean hasRun = new AtomicBoolean(false); + SystemTimer.SystemTimerTask task = new SystemTimer.SystemTimerTask() { + @Override + public void run2() { + hasRun.set(true); + } + }; + systemTimer.schedule(task, 0); + await().until(() -> hasRun.get()); + } + + @Test + public void scheduleWithDelay() { + AtomicBoolean hasRun = new AtomicBoolean(false); + SystemTimer.SystemTimerTask task = new SystemTimer.SystemTimerTask() { + @Override + public void run2() { + hasRun.set(true); + } + }; + final long millis = System.currentTimeMillis(); + final int delay = 1000; + systemTimer.schedule(task, delay); + await().until(() -> hasRun.get()); + assertThat(System.currentTimeMillis()).isGreaterThanOrEqualTo(millis + delay); + } + + @Test + public void scheduleWithDate() { + AtomicBoolean hasRun = new AtomicBoolean(false); + SystemTimer.SystemTimerTask task = new SystemTimer.SystemTimerTask() { + @Override + public void run2() { + hasRun.set(true); + } + }; + final long millis = System.currentTimeMillis(); + final long delay = 1000; + final Date scheduleTime = new Date(System.currentTimeMillis() + delay); + systemTimer.schedule(task, scheduleTime); + await().until(() -> hasRun.get()); + assertThat(System.currentTimeMillis()).isGreaterThanOrEqualTo(millis + delay); + } + + @Test + public void scheduleRepeatedWithDelay() { + AtomicInteger invocations = new AtomicInteger(0); + SystemTimer.SystemTimerTask task = new SystemTimer.SystemTimerTask() { + @Override + public void run2() { + invocations.incrementAndGet(); + } + }; + final long millis = System.currentTimeMillis(); + final int delay = 1000; + final int period = 500; + systemTimer.schedule(task, delay, period); + await().untilAsserted(() -> assertThat(invocations.get()).isGreaterThanOrEqualTo(2)); + assertThat(System.currentTimeMillis()).isGreaterThanOrEqualTo(millis + delay + period); + } + + @Test + public void scheduleAtFixedRate() { + AtomicInteger invocations = new AtomicInteger(0); + SystemTimer.SystemTimerTask task = new SystemTimer.SystemTimerTask() { + @Override + public void run2() { + invocations.incrementAndGet(); + } + }; + final long millis = System.currentTimeMillis(); + final int delay = 1000; + final int period = 500; + systemTimer.scheduleAtFixedRate(task, delay, period); + await().untilAsserted(() -> assertThat(invocations.get()).isGreaterThanOrEqualTo(2)); + assertThat(System.currentTimeMillis()).isGreaterThanOrEqualTo(millis + delay + period); + } + + @Test + public void cancelTask() { + AtomicInteger invocations = new AtomicInteger(0); + SystemTimer.SystemTimerTask task = new SystemTimer.SystemTimerTask() { + @Override + public void run2() { + invocations.incrementAndGet(); + } + }; + assertThat(task.isCancelled()).isFalse(); + task.cancel(); + assertThat(task.isCancelled()).isTrue(); + assertThatThrownBy(() -> systemTimer.schedule(task, 0)) + .isInstanceOf(IllegalStateException.class); + } + +}