This is an automated email from the ASF dual-hosted git repository.

burcham pushed a commit to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.12 by this push:
     new 6fec62f  GEODE-7884: server hangs due to IllegalStateException (#4822)
6fec62f is described below

commit 6fec62ff7b4b6ebc4f0f8079fcd67a2b0c3919b0
Author: Bruce Schuchardt <bschucha...@pivotal.io>
AuthorDate: Fri Mar 20 09:05:24 2020 -0700

    GEODE-7884: server hangs due to IllegalStateException (#4822)
    
    * GEODE-7884: server hangs due to IllegalStateException
    
    Added cancellation check before scheduling an idle-timeout or
    ack-wait-threshold timer task.  I had to add a new method to
    SystemTimerTask and then noticed there were no tests for SystemTimer, so
    I cleaned up that class and added tests.
    
    * adding missing copyright header to new test
    
    * fixing LGTM issues
    
    * reinstating 'continue' when encountering a null timer during a sweep
    
    * addressing Bill's comments
    
    renamed swarm everwhere
    made the collection of timers associated with a DistributedSystem into a Set
    made timer task variables in Connection volatile
    added checks in tasks to cancel themselves if their Connection is closed
    
    (cherry picked from commit 2d2a3f80bd5053749963889c1898df48e9aa0be7)
---
 .../internal/InternalDistributedSystem.java        |   2 +-
 .../org/apache/geode/internal/SystemTimer.java     | 370 +++++++--------------
 .../geode/internal/admin/StatAlertsManager.java    |   2 +-
 .../geode/internal/cache/ExpirationScheduler.java  |   2 +-
 .../geode/internal/cache/GemFireCacheImpl.java     |   2 +-
 .../cache/partitioned/PRSanityCheckMessage.java    |   2 +-
 .../internal/cache/tier/sockets/AcceptorImpl.java  |   2 +-
 .../org/apache/geode/internal/tcp/Connection.java  |  30 +-
 .../apache/geode/internal/tcp/ConnectionTable.java |  22 +-
 .../org/apache/geode/internal/SystemTimerTest.java | 162 +++++++++
 10 files changed, 329 insertions(+), 267 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
index 823844f..e97bd02 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalDistributedSystem.java
@@ -1625,7 +1625,7 @@ public class InternalDistributedSystem extends 
DistributedSystem
           // bug 38501: this has to happen *after*
           // the DM is closed :-(
           if (!preparingForReconnect) {
-            SystemTimer.cancelSwarm(this);
+            SystemTimer.cancelTimers(this);
           }
         } // finally timer cancelled
       } // finally dm closed
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/SystemTimer.java 
b/geode-core/src/main/java/org/apache/geode/internal/SystemTimer.java
index 9ce3525..7eddf43 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/SystemTimer.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/SystemTimer.java
@@ -15,32 +15,30 @@
 package org.apache.geode.internal;
 
 import java.lang.ref.WeakReference;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.CancelException;
-import org.apache.geode.SystemFailure;
 import org.apache.geode.annotations.internal.MakeNotStatic;
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.DistributedSystem;
 import org.apache.geode.logging.internal.log4j.api.LogService;
 
 /**
- * Instances of this class are like {@link Timer}, but are associated with a 
"swarm", which can be
- * cancelled as a group with {@link #cancelSwarm(Object)}.
+ * Instances of this class are like {@link Timer}, but are associated with a 
DistributedSystem,
+ * which can be
+ * cancelled as a group with {@link #cancelTimers(DistributedSystem)}.
  *
  * @see Timer
  * @see TimerTask
  *
- *      TODO -- with Java 1.5, this will be a template type so that the 
swarm's class can be
- *      specified.
  */
 public class SystemTimer {
   private static final Logger logger = LogService.getLogger();
@@ -49,12 +47,6 @@ public class SystemTimer {
       "IBM Corporation".equals(System.getProperty("java.vm.vendor"));
 
   /**
-   * Extra debugging for this class
-   */
-  // private static final boolean DEBUG = true;
-  static final boolean DEBUG = false;
-
-  /**
    * the underlying {@link Timer}
    */
   private final Timer timer;
@@ -62,119 +54,106 @@ public class SystemTimer {
   /**
    * True if this timer has been cancelled
    */
-  private boolean cancelled = false;
+  private volatile boolean cancelled = false;
 
   /**
-   * the swarm to which this timer belongs
+   * the DistributedSystem to which this timer belongs
    */
-  private final Object /* T */ swarm;
+  private final DistributedSystem distributedSystem;
 
   @Override
   public String toString() {
-    StringBuffer sb = new StringBuffer();
-    sb.append("SystemTimer[");
-    sb.append("swarm = " + swarm);
-    // sb.append("; timer = " + timer);
-    sb.append("]");
-    return sb.toString();
+    return "SystemTimer["
+        + "system = " + distributedSystem
+        + "]";
   }
 
   /**
-   * List of all of the swarms in the system
-   *
-   * @guarded.By self
+   * Map of all of the timers in the system
    */
-  // <T, HashMap<Object, ArrayList<WeakReference<SystemTimer>>>>
   @MakeNotStatic
-  private static final HashMap allSwarms = new HashMap();
+  private static final HashMap<DistributedSystem, 
Set<WeakReference<SystemTimer>>> distributedSystemTimers =
+      new HashMap<>();
 
   /**
-   * Add the given timer is in the given swarm. Used only by constructors.
+   * Add the given timer is in the given DistributedSystem. Used only by 
constructors.
    *
-   * @param swarm swarm to add the timer to
-   * @param t timer to add
+   * @param system DistributedSystem to add the timer to
+   * @param systemTimer timer to add
    */
-  private static void addToSwarm(Object /* T */ swarm, SystemTimer t) {
-    final boolean isDebugEnabled = logger.isTraceEnabled();
-    // Get or add list of timers for this swarm...
-    ArrayList /* ArrayList<WeakReference<SystemTimer>> */ swarmSet;
-    synchronized (allSwarms) {
-      swarmSet = (ArrayList) allSwarms.get(swarm);
-      if (swarmSet == null) {
-        if (isDebugEnabled) {
-          logger.trace("SystemTimer#addToSwarm: created swarm {}", swarm);
-        }
-        swarmSet = new ArrayList();
-        allSwarms.put(swarm, swarmSet);
+  private static void addTimer(DistributedSystem system, SystemTimer 
systemTimer) {
+    Set<WeakReference<SystemTimer>> timers;
+    synchronized (distributedSystemTimers) {
+      timers = distributedSystemTimers.get(system);
+      if (timers == null) {
+        timers = new HashSet<>();
+        distributedSystemTimers.put(system, timers);
       }
-    } // synchronized
+    }
 
-    // Add the timer to the swarm's list
-    if (isDebugEnabled) {
-      logger.trace("SystemTimer#addToSwarm: adding timer <{}>", t);
+    WeakReference<SystemTimer> wr = new WeakReference<>(systemTimer);
+    synchronized (timers) {
+      timers.add(wr);
+    }
+  }
+
+  /**
+   * Return the current number of DistributedSystems with timers
+   */
+  public static int distributedSystemCount() {
+    synchronized (distributedSystemTimers) {
+      return distributedSystemTimers.size();
     }
-    WeakReference /* WeakReference<SystemTimer> */ wr = new WeakReference(t);
-    synchronized (swarmSet) {
-      swarmSet.add(wr);
-    } // synchronized
   }
 
   /**
    * time that the last sweep was done
    *
-   * @see #sweepAllSwarms
+   * @see #sweepAllTimers
    */
   @MakeNotStatic
   private static long lastSweepAllTime = 0;
 
   /**
-   * Interval, in milliseconds, to sweep all swarms, measured from when the 
last sweep finished
+   * Interval, in milliseconds, to sweep all timers, measured from when the 
last sweep finished
    *
-   * @see #sweepAllSwarms
+   * @see #sweepAllTimers
    */
   private static final long SWEEP_ALL_INTERVAL = 2 * 60 * 1000; // 2 minutes
 
   /**
-   * Manually garbage collect {@link #allSwarms}, if it hasn't happened in a 
while.
+   * Manually garbage collect {@link #distributedSystemTimers}, if it hasn't 
happened in a while.
    *
    * @see #lastSweepAllTime
    */
-  private static void sweepAllSwarms() {
+  private static void sweepAllTimers() {
     if (System.currentTimeMillis() < lastSweepAllTime + SWEEP_ALL_INTERVAL) {
       // Too soon.
       return;
     }
     final boolean isDebugEnabled = logger.isTraceEnabled();
-    synchronized (allSwarms) {
-      Iterator it = allSwarms.entrySet().iterator();
-      while (it.hasNext()) { // iterate over allSwarms
-        Map.Entry entry = (Map.Entry) it.next();
-        ArrayList swarm = (ArrayList) entry.getValue();
-        synchronized (swarm) {
-          Iterator it2 = swarm.iterator();
-          while (it2.hasNext()) { // iterate over current swarm
-            WeakReference wr = (WeakReference) it2.next();
-            SystemTimer st = (SystemTimer) wr.get();
-            if (st == null) {
-              // Remove stale reference
-              it2.remove();
-              continue;
+    synchronized (distributedSystemTimers) {
+      Iterator<Map.Entry<DistributedSystem, Set<WeakReference<SystemTimer>>>> 
allSystemsIterator =
+          distributedSystemTimers.entrySet().iterator();
+      while (allSystemsIterator.hasNext()) {
+        Map.Entry<DistributedSystem, Set<WeakReference<SystemTimer>>> entry =
+            allSystemsIterator.next();
+        Set<WeakReference<SystemTimer>> timers = entry.getValue();
+        synchronized (timers) {
+          Iterator<WeakReference<SystemTimer>> timersIterator = 
timers.iterator();
+          while (timersIterator.hasNext()) {
+            WeakReference<SystemTimer> wr = timersIterator.next();
+            SystemTimer st = wr.get();
+            if (st == null || st.isCancelled()) {
+              timersIterator.remove();
             }
-            // Get rid of a cancelled timer; it's not interesting.
-            if (st.cancelled) {
-              it2.remove();
-              continue;
-            }
-          } // iterate over current swarm
-          if (swarm.size() == 0) { // Remove unused swarm
-            it.remove();
-            if (isDebugEnabled) {
-              logger.trace("SystemTimer#sweepAllSwarms: removed unused swarm 
{}", entry.getKey());
-            }
-          } // Remove unused swarm
-        } // synchronized swarm
-      } // iterate over allSwarms
-    } // synchronized allSwarms
+          }
+          if (timers.size() == 0) {
+            allSystemsIterator.remove();
+          }
+        }
+      }
+    }
 
     // Collect time at END of sweep. It means an extra call to the system
     // timer, but makes this potentially less active.
@@ -182,104 +161,75 @@ public class SystemTimer {
   }
 
   /**
-   * Remove given timer from the swarm.
+   * Remove given timer.
    *
-   * @param t timer to remove
+   * @param timerToRemove timer to remove
    *
    * @see #cancel()
    */
-  private static void removeFromSwarm(SystemTimer t) {
-    final boolean isDebugEnabled = logger.isTraceEnabled();
-    synchronized (allSwarms) {
-      // Get timer's swarm
-      ArrayList swarmSet = (ArrayList) allSwarms.get(t.swarm);
-      if (swarmSet == null) {
-        if (isDebugEnabled) {
-          logger.trace("SystemTimer#removeFromSwarm: timer already removed: 
{}", t);
-        }
+  private static void removeTimer(SystemTimer timerToRemove) {
+    synchronized (distributedSystemTimers) {
+      // Get the timers for the distributed system
+      Set<WeakReference<SystemTimer>> timers =
+          distributedSystemTimers.get(timerToRemove.distributedSystem);
+      if (timers == null) {
         return; // already gone
       }
 
-      // Remove timer from swarm
-      if (isDebugEnabled) {
-        logger.trace("SystemTimer#removeFromSwarm: removing timer <{}>", t);
-      }
-      synchronized (swarmSet) {
-        Iterator it = swarmSet.iterator();
-        while (it.hasNext()) {
-          WeakReference ref = (WeakReference) it.next();
-          SystemTimer t2 = (SystemTimer) ref.get();
-          if (t2 == null) {
-            // Since we've discovered an empty reference, we should remove it.
-            it.remove();
-            continue;
-          }
-          if (t2 == t) {
-            it.remove();
-            // Don't keep sweeping once we've found it; just quit.
+      synchronized (timers) {
+        Iterator<WeakReference<SystemTimer>> timersIterator = 
timers.iterator();
+        while (timersIterator.hasNext()) {
+          WeakReference<SystemTimer> ref = timersIterator.next();
+          SystemTimer timer = ref.get();
+          if (timer == null) {
+            timersIterator.remove();
+          } else if (timer == timerToRemove) {
+            timersIterator.remove();
             break;
+          } else if (timer.isCancelled()) {
+            timersIterator.remove();
           }
-          if (t2.cancelled) {
-            // But if we happen to run across a cancelled timer,
-            // remove it.
-            it.remove();
-            continue;
-          }
-        } // while
-
-        // While we're here, if the swarm has gone to zero size,
-        // we should remove it.
-        if (swarmSet.size() == 0) {
-          allSwarms.remove(t.swarm); // last reference
-          if (isDebugEnabled) {
-            logger.trace("SystemTimer#removeFromSwarm: removed last reference 
to {}", t.swarm);
-          }
         }
-      } // synchronized swarmSet
-    } // synchronized allSwarms
+        if (timers.size() == 0) {
+          distributedSystemTimers.remove(timerToRemove.distributedSystem); // 
last reference
+        }
+      }
+    }
 
-    sweepAllSwarms(); // Occasionally check global list, use any available 
logger :-)
+    sweepAllTimers(); // Occasionally check global list
   }
 
   /**
    * Cancel all outstanding timers
    *
-   * @param swarm the swarm to cancel
+   * @param system the DistributedSystem whose timers should be cancelled
    */
-  public static void cancelSwarm(Object /* T */ swarm) {
-    Assert.assertTrue(swarm instanceof InternalDistributedSystem); // TODO
-    // Find the swarmSet and remove it
-    ArrayList swarmSet;
-    synchronized (allSwarms) {
-      swarmSet = (ArrayList) allSwarms.get(swarm);
-      if (swarmSet == null) {
+  public static void cancelTimers(DistributedSystem system) {
+    Set<WeakReference<SystemTimer>> timers;
+    synchronized (distributedSystemTimers) {
+      timers = distributedSystemTimers.get(system);
+      if (timers == null) {
         return; // already cancelled
       }
       // Remove before releasing synchronization, so any fresh timer ends up
       // in a new set with same key
-      allSwarms.remove(swarmSet);
+      distributedSystemTimers.remove(system);
     } // synchronized
 
-    // Empty the swarmSet
-    synchronized (swarmSet) {
-      Iterator it = swarmSet.iterator();
-      while (it.hasNext()) {
-        WeakReference wr = (WeakReference) it.next();
-        SystemTimer st = (SystemTimer) wr.get();
+    // cancel all of the timers
+    synchronized (timers) {
+      for (WeakReference<SystemTimer> wr : timers) {
+        SystemTimer st = wr.get();
         // it.remove(); Not necessary, we're emptying the list...
         if (st != null) {
           st.cancelled = true; // for safety :-)
           st.timer.cancel(); // st.cancel() would just search for it again
         }
-      } // while
-    } // synchronized
+      }
+    }
   }
 
   public int timerPurge() {
-    if (logger.isTraceEnabled()) {
-      logger.trace("SystemTimer#timerPurge of {}", this);
-    }
-
     // Fix 39585, IBM's java.util.timer's purge() has stack overflow issue
     if (isIBM) {
       return 0;
@@ -287,44 +237,14 @@ public class SystemTimer {
     return this.timer.purge();
   }
 
-  // This creates a non-daemon timer thread. We don't EVER do this...
-  // /**
-  // * @see Timer#Timer()
-  // *
-  // * @param swarm the swarm this timer belongs to
-  // */
-  // public SystemTimer(DistributedSystem swarm) {
-  // this.timer = new Timer();
-  // this.swarm = swarm;
-  // addToSwarm(swarm, this);
-  // }
-
   /**
    * @see Timer#Timer(boolean)
-   * @param swarm the swarm this timer belongs to, currently must be a 
DistributedSystem
-   * @param isDaemon whether the timer is a daemon. Must be true for GemFire 
use.
+   * @param distributedSystem the DistributedSystem to which this timer belongs
    */
-  public SystemTimer(Object /* T */ swarm, boolean isDaemon) {
-    Assert.assertTrue(isDaemon); // we don't currently allow non-daemon timers
-    Assert.assertTrue(swarm instanceof InternalDistributedSystem,
-        "Attempt to create swarm on " + swarm); // TODO allow template class?
-    this.timer = new Timer(isDaemon);
-    this.swarm = swarm;
-    addToSwarm(swarm, this);
-  }
-
-  /**
-   * @param name the name to give the timer thread
-   * @param swarm the swarm this timer belongs to, currently must be a 
DistributedMember
-   * @param isDaemon whether the timer is a daemon. Must be true for GemFire 
use.
-   */
-  public SystemTimer(String name, Object /* T */ swarm, boolean isDaemon) {
-    Assert.assertTrue(isDaemon); // we don't currently allow non-daemon timers
-    Assert.assertTrue(swarm instanceof InternalDistributedSystem,
-        "Attempt to create swarm on " + swarm); // TODO allow template class?
-    this.timer = new Timer(name, isDaemon);
-    this.swarm = swarm;
-    addToSwarm(swarm, this);
+  public SystemTimer(DistributedSystem distributedSystem) {
+    this.timer = new Timer(true);
+    this.distributedSystem = distributedSystem;
+    addTimer(distributedSystem, this);
   }
 
   private void checkCancelled() throws IllegalStateException {
@@ -338,12 +258,6 @@ public class SystemTimer {
    */
   public void schedule(SystemTimerTask task, long delay) {
     checkCancelled();
-    if (logger.isTraceEnabled()) {
-      Date tilt = new Date(System.currentTimeMillis() + delay);
-      SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
-      logger.trace("SystemTimer#schedule (long): {}: expect task {} to fire 
around {}", this, task,
-          sdf.format(tilt));
-    }
     timer.schedule(task, delay);
   }
 
@@ -352,39 +266,13 @@ public class SystemTimer {
    */
   public void schedule(SystemTimerTask task, Date time) {
     checkCancelled();
-    if (logger.isTraceEnabled()) {
-      SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
-      logger.trace("SystemTimer#schedule (Date): {}: expect task {} to fire 
around {}", this, task,
-          sdf.format(time));
-    }
     timer.schedule(task, time);
   }
 
-  // Not currently used, so don't complicate things
-  // /**
-  // * @see Timer#schedule(TimerTask, long, long)
-  // */
-  // public void schedule(SystemTimerTask task, long delay, long period) {
-  // // TODO add debug statement
-  // checkCancelled();
-  // timer.schedule(task, delay, period);
-  // }
-
-  // Not currently used, so don't complicate things
-  // /**
-  // * @see Timer#schedule(TimerTask, Date, long)
-  // */
-  // public void schedule(SystemTimerTask task, Date firstTime, long period) {
-  // // TODO add debug statement
-  // checkCancelled();
-  // timer.schedule(task, firstTime, period);
-  // }
-
   /**
    * @see Timer#scheduleAtFixedRate(TimerTask, long, long)
    */
   public void scheduleAtFixedRate(SystemTimerTask task, long delay, long 
period) {
-    // TODO add debug statement
     checkCancelled();
     timer.scheduleAtFixedRate(task, delay, period);
   }
@@ -393,30 +281,24 @@ public class SystemTimer {
    * @see Timer#schedule(TimerTask, long, long)
    */
   public void schedule(SystemTimerTask task, long delay, long period) {
-    // TODO add debug statement
     checkCancelled();
     timer.schedule(task, delay, period);
   }
 
-  // Not currently used, so don't complicate things
-  // /**
-  // * @see Timer#scheduleAtFixedRate(TimerTask, Date, long)
-  // */
-  // public void scheduleAtFixedRate(SystemTimerTask task, Date firstTime,
-  // long period) {
-  // // TODO add debug statement
-  // checkCancelled();
-  // timer.scheduleAtFixedRate(task, firstTime, period);
-  // }
-
-
   /**
    * @see Timer#cancel()
    */
   public void cancel() {
     this.cancelled = true;
     timer.cancel();
-    removeFromSwarm(this);
+    removeTimer(this);
+  }
+
+  /**
+   * has this timer been cancelled?
+   */
+  public boolean isCancelled() {
+    return cancelled;
   }
 
   /**
@@ -426,6 +308,17 @@ public class SystemTimer {
    */
   public abstract static class SystemTimerTask extends TimerTask {
     protected static final Logger logger = LogService.getLogger();
+    private volatile boolean cancelled;
+
+    public boolean isCancelled() {
+      return cancelled;
+    }
+
+    @Override
+    public boolean cancel() {
+      cancelled = true;
+      return super.cancel();
+    }
 
     /**
      * This is your executed action
@@ -437,25 +330,14 @@ public class SystemTimer {
      */
     @Override
     public void run() {
-      final boolean isDebugEnabled = logger.isTraceEnabled();
-      if (isDebugEnabled) {
-        logger.trace("SystemTimer.MyTask: starting {}", this);
-      }
       try {
         this.run2();
       } catch (CancelException ignore) {
         // ignore: TimerThreads can fire during or near cache closure
-      } catch (VirtualMachineError e) {
-        SystemFailure.initiateFailure(e);
-        throw e;
       } catch (Throwable t) {
-        SystemFailure.checkFailure();
         logger.warn(String.format("Timer task <%s> encountered exception", 
this), t);
         // Don't rethrow, it will just get eaten and kill the timer
       }
-      if (isDebugEnabled) {
-        logger.trace("SystemTimer.MyTask: finished {}", this);
-      }
     }
   }
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/admin/StatAlertsManager.java
 
b/geode-core/src/main/java/org/apache/geode/internal/admin/StatAlertsManager.java
index 7fbbcb3..0205339 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/admin/StatAlertsManager.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/admin/StatAlertsManager.java
@@ -175,7 +175,7 @@ public class StatAlertsManager {
           "This manager has been cancelled");
     }
     // start and schedule new timer
-    timer = new SystemTimer(system /* swarm */, true);
+    timer = new SystemTimer(system /* swarm */);
 
     EvaluateAlertDefnsTask task = new EvaluateAlertDefnsTask();
     if (refreshAtFixedRate) {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/ExpirationScheduler.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/ExpirationScheduler.java
index e4bf8c9..0698e26 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/ExpirationScheduler.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/ExpirationScheduler.java
@@ -38,7 +38,7 @@ public class ExpirationScheduler {
       .getInteger(GeodeGlossary.GEMFIRE_PREFIX + "MAX_PENDING_CANCELS", 
10000).intValue();
 
   public ExpirationScheduler(InternalDistributedSystem ds) {
-    this.timer = new SystemTimer(ds, true);
+    this.timer = new SystemTimer(ds);
   }
 
   public void forcePurge() {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
index a710b95..78c0217 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
@@ -890,7 +890,7 @@ public class GemFireCacheImpl implements InternalCache, 
InternalClientCache, Has
         TypeRegistry::new,
         HARegionQueue::setMessageSyncInterval,
         FunctionService::registerFunction,
-        object -> new SystemTimer(object, true),
+        object -> new SystemTimer((DistributedSystem) object),
         TombstoneService::initialize,
         ExpirationScheduler::new,
         DiskStoreMonitor::new,
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PRSanityCheckMessage.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PRSanityCheckMessage.java
index 596429b..1d87d1d 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PRSanityCheckMessage.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PRSanityCheckMessage.java
@@ -124,7 +124,7 @@ public class PRSanityCheckMessage extends PartitionMessage {
       int sanityCheckInterval = Integer
           .getInteger(GeodeGlossary.GEMFIRE_PREFIX + "PRSanityCheckInterval", 
5000).intValue();
       if (sanityCheckInterval != 0) {
-        final SystemTimer tm = new SystemTimer(dm.getSystem(), true);
+        final SystemTimer tm = new SystemTimer(dm.getSystem());
         SystemTimer.SystemTimerTask st = new SystemTimer.SystemTimerTask() {
           @Override
           public void run2() {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index b4a475e..de1de70 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -489,7 +489,7 @@ public class AcceptorImpl implements Acceptor, Runnable {
         tmp_q = new LinkedBlockingQueue<>();
         tmp_commQ = new LinkedBlockingQueue<>();
         tmp_hs = new HashSet<>(512);
-        tmp_timer = new SystemTimer(internalCache.getDistributedSystem(), 
true);
+        tmp_timer = new SystemTimer(internalCache.getDistributedSystem());
       }
       selector = tmp_s;
       selectorQueue = tmp_q;
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 1ec7a06..45d56db 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -155,7 +155,7 @@ public class Connection implements Runnable {
   /**
    * The idle timeout timer task for this connection
    */
-  private SystemTimerTask idleTask;
+  private volatile SystemTimerTask idleTask;
 
   /**
    * If true then readers for thread owned sockets will send all messages on 
thread owned senders.
@@ -286,7 +286,7 @@ public class Connection implements Runnable {
   /**
    * task for detecting ack timeouts and issuing alerts
    */
-  private SystemTimer.SystemTimerTask ackTimeoutTask;
+  private volatile SystemTimer.SystemTimerTask ackTimeoutTask;
 
   /**
    * millisecond clock at the time message transmission started, if doing 
forced-disconnect
@@ -1433,11 +1433,15 @@ public class Connection implements Runnable {
     // This cancels the idle timer task, but it also removes the tasks 
reference to this connection,
     // freeing up the connection (and it's buffers for GC sooner.
     if (idleTask != null) {
-      idleTask.cancel();
+      synchronized (idleTask) {
+        idleTask.cancel();
+      }
     }
 
     if (ackTimeoutTask != null) {
-      ackTimeoutTask.cancel();
+      synchronized (ackTimeoutTask) {
+        ackTimeoutTask.cancel();
+      }
     }
   }
 
@@ -1935,7 +1939,13 @@ public class Connection implements Runnable {
       ackTimeoutTask = new SystemTimer.SystemTimerTask() {
         @Override
         public void run2() {
+          if (isSocketClosed()) {
+            // Connection is closing - nothing to do anymore
+            cancel();
+            return;
+          }
           if (owner.isClosed()) {
+            cancel();
             return;
           }
           byte connState;
@@ -1980,10 +1990,14 @@ public class Connection implements Runnable {
       synchronized (owner) {
         SystemTimer timer = owner.getIdleConnTimer();
         if (timer != null) {
-          if (msSA > 0) {
-            timer.scheduleAtFixedRate(ackTimeoutTask, msAW, Math.min(msAW, 
msSA));
-          } else {
-            timer.schedule(ackTimeoutTask, msAW);
+          synchronized (ackTimeoutTask) {
+            if (!ackTimeoutTask.isCancelled()) {
+              if (msSA > 0) {
+                timer.scheduleAtFixedRate(ackTimeoutTask, msAW, Math.min(msAW, 
msSA));
+              } else {
+                timer.schedule(ackTimeoutTask, msAW);
+              }
+            }
           }
         }
       }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
index 113d91a..0c8a9f9 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
@@ -206,7 +206,7 @@ public class ConnectionTable {
   private ConnectionTable(TCPConduit conduit) {
     owner = conduit;
     idleConnTimer = owner.idleConnectionTimeout != 0
-        ? new SystemTimer(conduit.getDM().getSystem(), true) : null;
+        ? new SystemTimer(conduit.getDM().getSystem()) : null;
     threadConnMaps = new ArrayList();
     threadConnectionMap = new ConcurrentHashMap();
     p2pReaderThreadPool = 
createThreadPoolForIO(conduit.getDM().getSystem().isShareSockets());
@@ -526,8 +526,12 @@ public class ConnectionTable {
           if (!closed) {
             IdleConnTT task = new IdleConnTT(conn);
             conn.setIdleTimeoutTask(task);
-            getIdleConnTimer().scheduleAtFixedRate(task, 
owner.idleConnectionTimeout,
-                owner.idleConnectionTimeout);
+            synchronized (task) {
+              if (!task.isCancelled()) {
+                getIdleConnTimer().scheduleAtFixedRate(task, 
owner.idleConnectionTimeout,
+                    owner.idleConnectionTimeout);
+              }
+            }
           }
         }
       } catch (IllegalStateException e) {
@@ -627,7 +631,7 @@ public class ConnectionTable {
       return null;
     }
     if (idleConnTimer == null) {
-      idleConnTimer = new SystemTimer(getDM().getSystem(), true);
+      idleConnTimer = new SystemTimer(getDM().getSystem());
     }
     return idleConnTimer;
   }
@@ -1212,25 +1216,25 @@ public class ConnectionTable {
 
   private static class IdleConnTT extends SystemTimer.SystemTimerTask {
 
-    private Connection c;
+    private Connection connection;
 
     private IdleConnTT(Connection c) {
-      this.c = c;
+      this.connection = c;
     }
 
     @Override
     public boolean cancel() {
-      Connection con = c;
+      Connection con = connection;
       if (con != null) {
         con.cleanUpOnIdleTaskCancel();
       }
-      c = null;
+      connection = null;
       return super.cancel();
     }
 
     @Override
     public void run2() {
-      Connection con = c;
+      Connection con = connection;
       if (con != null) {
         if (con.checkForIdleTimeout()) {
           cancel();
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/SystemTimerTest.java 
b/geode-core/src/test/java/org/apache/geode/internal/SystemTimerTest.java
new file mode 100644
index 0000000..00f7335
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/SystemTimerTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal;
+
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.mock;
+
+import java.util.Date;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.distributed.DistributedSystem;
+
+public class SystemTimerTest {
+
+  private DistributedSystem distributedSystem;
+  private SystemTimer systemTimer;
+
+  @Before
+  public void setup() {
+    this.distributedSystem = mock(DistributedSystem.class);
+    this.systemTimer = new SystemTimer(distributedSystem);
+  }
+
+  @After
+  public void teardown() {
+    if (!systemTimer.isCancelled()) {
+      systemTimer.cancel();
+    }
+  }
+
+  @Test
+  public void cancelTimer() {
+    assertThat(systemTimer.isCancelled()).isFalse();
+    int initialSystemCount = SystemTimer.distributedSystemCount();
+    SystemTimer.cancelTimers(distributedSystem);
+    assertThat(systemTimer.isCancelled()).isTrue();
+    
assertThat(SystemTimer.distributedSystemCount()).isEqualTo(initialSystemCount - 
1);
+  }
+
+  @Test
+  public void cancel() {
+    assertThat(systemTimer.isCancelled()).isFalse();
+    systemTimer.cancel();
+    assertThat(systemTimer.isCancelled()).isTrue();
+  }
+
+  @Test
+  public void scheduleNow() {
+    AtomicBoolean hasRun = new AtomicBoolean(false);
+    SystemTimer.SystemTimerTask task = new SystemTimer.SystemTimerTask() {
+      @Override
+      public void run2() {
+        hasRun.set(true);
+      }
+    };
+    systemTimer.schedule(task, 0);
+    await().until(() -> hasRun.get());
+  }
+
+  @Test
+  public void scheduleWithDelay() {
+    AtomicBoolean hasRun = new AtomicBoolean(false);
+    SystemTimer.SystemTimerTask task = new SystemTimer.SystemTimerTask() {
+      @Override
+      public void run2() {
+        hasRun.set(true);
+      }
+    };
+    final long millis = System.currentTimeMillis();
+    final int delay = 1000;
+    systemTimer.schedule(task, delay);
+    await().until(() -> hasRun.get());
+    assertThat(System.currentTimeMillis()).isGreaterThanOrEqualTo(millis + 
delay);
+  }
+
+  @Test
+  public void scheduleWithDate() {
+    AtomicBoolean hasRun = new AtomicBoolean(false);
+    SystemTimer.SystemTimerTask task = new SystemTimer.SystemTimerTask() {
+      @Override
+      public void run2() {
+        hasRun.set(true);
+      }
+    };
+    final long millis = System.currentTimeMillis();
+    final long delay = 1000;
+    final Date scheduleTime = new Date(System.currentTimeMillis() + delay);
+    systemTimer.schedule(task, scheduleTime);
+    await().until(() -> hasRun.get());
+    assertThat(System.currentTimeMillis()).isGreaterThanOrEqualTo(millis + 
delay);
+  }
+
+  @Test
+  public void scheduleRepeatedWithDelay() {
+    AtomicInteger invocations = new AtomicInteger(0);
+    SystemTimer.SystemTimerTask task = new SystemTimer.SystemTimerTask() {
+      @Override
+      public void run2() {
+        invocations.incrementAndGet();
+      }
+    };
+    final long millis = System.currentTimeMillis();
+    final int delay = 1000;
+    final int period = 500;
+    systemTimer.schedule(task, delay, period);
+    await().untilAsserted(() -> 
assertThat(invocations.get()).isGreaterThanOrEqualTo(2));
+    assertThat(System.currentTimeMillis()).isGreaterThanOrEqualTo(millis + 
delay + period);
+  }
+
+  @Test
+  public void scheduleAtFixedRate() {
+    AtomicInteger invocations = new AtomicInteger(0);
+    SystemTimer.SystemTimerTask task = new SystemTimer.SystemTimerTask() {
+      @Override
+      public void run2() {
+        invocations.incrementAndGet();
+      }
+    };
+    final long millis = System.currentTimeMillis();
+    final int delay = 1000;
+    final int period = 500;
+    systemTimer.scheduleAtFixedRate(task, delay, period);
+    await().untilAsserted(() -> 
assertThat(invocations.get()).isGreaterThanOrEqualTo(2));
+    assertThat(System.currentTimeMillis()).isGreaterThanOrEqualTo(millis + 
delay + period);
+  }
+
+  @Test
+  public void cancelTask() {
+    AtomicInteger invocations = new AtomicInteger(0);
+    SystemTimer.SystemTimerTask task = new SystemTimer.SystemTimerTask() {
+      @Override
+      public void run2() {
+        invocations.incrementAndGet();
+      }
+    };
+    assertThat(task.isCancelled()).isFalse();
+    task.cancel();
+    assertThat(task.isCancelled()).isTrue();
+    assertThatThrownBy(() -> systemTimer.schedule(task, 0))
+        .isInstanceOf(IllegalStateException.class);
+  }
+
+}

Reply via email to