This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch feature/GEODE-5393
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-5393 by this 
push:
     new 04ca2c9  GEODE-5393: StateFlushOperation hangs waiting for 
non-existant operation to complete
04ca2c9 is described below

commit 04ca2c998ec1022e86f94b072f6b1e9ad22f604d
Author: Bruce Schuchardt <[email protected]>
AuthorDate: Mon Jul 9 08:50:56 2018 -0700

    GEODE-5393: StateFlushOperation hangs waiting for non-existant operation to 
complete
    
    Refactored OperationMonitor so that we decide once whether to track
    threads and, if so, create a subclass of OperationMonitor that always
    tracks threads.  Otherwise we create a non-thread-tracking OperationMonitor.
    
    The problem with the previous implementation was that the log level can
    be changed.  If someone changes the level from config, or higher, to
    fine level the code would start throwing UnsupportedOperationExceptions
    when trying to modify Collections.emptyMap
---
 .../distributed/internal/DistributionAdvisor.java  | 145 +++++++++++++--------
 1 file changed, 93 insertions(+), 52 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
index c7e2830..6113499 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
@@ -127,7 +127,9 @@ public class DistributionAdvisor {
    * The operationMonitor tracks in-progress cache operations and holds the 
profile set
    * version number
    */
-  private final OperationMonitor operationMonitor = new OperationMonitor(this);
+  private final OperationMonitor operationMonitor =
+      logger.isDebugEnabled() ? new ThreadTrackingOperationMonitor(this)
+          : new OperationMonitor(this);
 
 
   /**
@@ -1566,20 +1568,10 @@ public class DistributionAdvisor {
      * for debugging stalled state-flush operations we track threads 
performing operations
      * and capture the state when startOperatiopn is invoked
      */
-    private final Map<Thread, ExceptionWrapper> currentVersionOperationThreads;
-    private final Map<Thread, ExceptionWrapper> 
previousVersionOperationThreads;
     private boolean closed;
 
     private OperationMonitor(DistributionAdvisor distributionAdvisor) {
       this.distributionAdvisor = distributionAdvisor;
-      this.currentVersionOperationThreads =
-          logger.isDebugEnabled()
-              ? new HashMap<>()
-              : Collections.emptyMap();
-      this.previousVersionOperationThreads =
-          logger.isDebugEnabled()
-              ? new HashMap<>()
-              : Collections.emptyMap();
     }
 
     private synchronized void incrementMembershipVersion() {
@@ -1597,10 +1589,7 @@ public class DistributionAdvisor {
         incrementMembershipVersion();
         previousVersionOpCount += currentVersionOpCount;
         currentVersionOpCount = 0;
-        if (logger.isDebugEnabled()) {
-          
previousVersionOperationThreads.putAll(currentVersionOperationThreads);
-          currentVersionOperationThreads.clear();
-        }
+        membershipVersionChanged();
       }
     }
 
@@ -1614,10 +1603,7 @@ public class DistributionAdvisor {
      * @since GemFire 5.1
      */
     private synchronized long startOperation() {
-      if (logger.isDebugEnabled()) {
-        currentVersionOperationThreads.put(Thread.currentThread(),
-            new ExceptionWrapper(new Exception("stack trace")));
-      }
+      logNewOperation();
       currentVersionOpCount++;
       return membershipVersion;
     }
@@ -1632,14 +1618,10 @@ public class DistributionAdvisor {
     private synchronized void endOperation(long version) {
       if (version == membershipVersion) {
         currentVersionOpCount--;
-        if (logger.isDebugEnabled()) {
-          currentVersionOperationThreads.remove(Thread.currentThread());
-        }
+        logEndOperation(true);
       } else {
         previousVersionOpCount--;
-        if (logger.isDebugEnabled()) {
-          previousVersionOperationThreads.remove(Thread.currentThread());
-        }
+        logEndOperation(false);
       }
     }
 
@@ -1685,29 +1667,9 @@ public class DistributionAdvisor {
         long now = System.currentTimeMillis();
         if ((!warned) && System.currentTimeMillis() >= warnTime) {
           warned = true;
-          alertLogger.warn("This thread has been stalled for {} milliseconds 
waiting for "
-              + "current operations to complete.", warnMS);
-          if (logger.isDebugEnabled()) {
-            synchronized (this) {
-              logger
-                  .debug("Waiting for these threads: {}", 
previousVersionOperationThreads);
-              logger
-                  .debug("New version threads are {}", 
currentVersionOperationThreads);
-            }
-          }
+          logWaitOnOperationsWarning(alertLogger, warnMS);
         } else if (warned && !severeAlertIssued && (now >= severeAlertTime)) {
-          // OSProcess.printStacks(0);
-          alertLogger.fatal("This thread has been stalled for {} milliseconds "
-              + "waiting for current operations to complete.  Something may be 
blocking operations.",
-              severeAlertMS);
-          if (logger.isDebugEnabled()) {
-            synchronized (this) {
-              logger
-                  .debug("Waiting for these threads: {}", 
previousVersionOperationThreads);
-              logger
-                  .debug("New version threads are {}", 
currentVersionOperationThreads);
-            }
-          }
+          logWaitOnOperationsSevere(alertLogger, severeAlertMS);
           severeAlertIssued = true;
         }
       }
@@ -1720,11 +1682,7 @@ public class DistributionAdvisor {
         previousVersionOpCount =
             previousVersionOpCount + currentVersionOpCount;
         currentVersionOpCount = 0;
-        if (logger.isDebugEnabled()) {
-          previousVersionOperationThreads
-              .putAll(currentVersionOperationThreads);
-          currentVersionOperationThreads.clear();
-        }
+        membershipVersionChanged();
       }
     }
 
@@ -1734,6 +1692,88 @@ public class DistributionAdvisor {
       closed = true;
     }
 
+    void logNewOperation() {}
+
+    void logEndOperation(boolean newOperation) {}
+
+    void logWaitOnOperationsSevere(Logger alertLogger, long severeAlertMS) {
+      // OSProcess.printStacks(0);
+      alertLogger.fatal("This thread has been stalled for {} milliseconds "
+          + "waiting for current operations to complete.  Something may be 
blocking operations.",
+          severeAlertMS);
+    }
+
+    void logWaitOnOperationsWarning(Logger alertLogger, long warnMS) {
+      alertLogger.warn("This thread has been stalled for {} milliseconds 
waiting for "
+          + "current operations to complete.", warnMS);
+    }
+
+    void membershipVersionChanged() {}
+
+  }
+
+  private static class ThreadTrackingOperationMonitor extends OperationMonitor 
{
+
+    /**
+     * for debugging stalled state-flush operations we track threads 
performing operations
+     * and capture the state when startOperatiopn is invoked
+     */
+    private final Map<Thread, ExceptionWrapper> currentVersionOperationThreads;
+    private final Map<Thread, ExceptionWrapper> 
previousVersionOperationThreads;
+
+    private ThreadTrackingOperationMonitor(
+        DistributionAdvisor distributionAdvisor) {
+      super(distributionAdvisor);
+      this.currentVersionOperationThreads = new HashMap<>();
+      this.previousVersionOperationThreads = new HashMap<>();
+    }
+
+    @Override
+    void logNewOperation() {
+      currentVersionOperationThreads.put(Thread.currentThread(),
+          new ExceptionWrapper(new Exception("stack trace")));
+    }
+
+    @Override
+    void logEndOperation(boolean newOp) {
+      if (newOp) {
+        currentVersionOperationThreads.remove(Thread.currentThread());
+      } else {
+        previousVersionOperationThreads.remove(Thread.currentThread());
+      }
+    }
+
+    @Override
+    void logWaitOnOperationsWarning(Logger alertLogger, long warnMS) {
+      super.logWaitOnOperationsWarning(alertLogger, warnMS);
+      synchronized (this) {
+        logger
+            .debug("Waiting for these threads: {}", 
previousVersionOperationThreads);
+        logger
+            .debug("New version threads are {}", 
currentVersionOperationThreads);
+      }
+    }
+
+    @Override
+    void logWaitOnOperationsSevere(Logger alertLogger, long severeAlertMS) {
+      super.logWaitOnOperationsSevere(alertLogger, severeAlertMS);
+      synchronized (this) {
+        logger
+            .debug("Waiting for these threads: {}", 
previousVersionOperationThreads);
+        logger
+            .debug("New version threads are {}", 
currentVersionOperationThreads);
+      }
+    }
+
+    @Override
+    void membershipVersionChanged() {
+      super.membershipVersionChanged();
+      previousVersionOperationThreads
+          .putAll(currentVersionOperationThreads);
+      currentVersionOperationThreads.clear();
+    }
+
+
     /**
      * ExceptionWrapper is used in debugging hangs in 
waitForCurrentOperations(). It
      * captures the call stack of a thread invoking startOperation().
@@ -1761,4 +1801,5 @@ public class DistributionAdvisor {
     }
 
   }
+
 }

Reply via email to