[ 
https://issues.apache.org/jira/browse/GEODE-3964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16286732#comment-16286732
 ] 

ASF GitHub Bot commented on GEODE-3964:
---------------------------------------

galen-pivotal closed pull request #1088: GEODE-3964: More logging for suspect 
processing.
URL: https://github.com/apache/geode/pull/1088
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ReplyProcessor21.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ReplyProcessor21.java
index 3ab199b7bc..84a0b08132 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ReplyProcessor21.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ReplyProcessor21.java
@@ -14,6 +14,7 @@
  */
 package org.apache.geode.distributed.internal;
 
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -119,7 +120,7 @@
   /** Have we heard back from everyone? */
   private volatile boolean done;
 
-  protected boolean keeperCleanedUp;
+  private boolean keeperCleanedUp;
 
   /** Have we been aborted due to shutdown? */
   protected volatile boolean shutdown;
@@ -139,43 +140,35 @@
   protected final DM dmgr;
 
   /** Start time for replyWait stat, in nanos */
-  protected long statStart;
+  long statStart;
 
   /** Start time for ack-wait-threshold, in millis */
-  protected long initTime;
+  private long initTime;
 
   /**
    * whether this reply processor should perform severe-alert processing for 
the message being ack'd
    */
-  protected boolean severeAlertEnabled;
+  private boolean severeAlertEnabled;
 
   /**
    * whether the severe-alert timeout has been reset. This can happen if a 
member we're waiting for
    * is waiting on a suspect member, for instance.
    */
-  protected volatile boolean severeAlertTimerReset;
+  private volatile boolean severeAlertTimerReset;
 
   /**
    * whether this reply processor should shorten severe-alert processing due 
to another vm waiting
    * on this one. This is a thread-local so that lower level comm layers can 
tell that the interval
    * should be shortened
    */
-  public static final ThreadLocal SevereAlertShorten = new ThreadLocal() {
-    @Override
-    protected Object initialValue() {
-      return Boolean.FALSE;
-    }
-  };
+  private static final ThreadLocal<Boolean> severeAlertShorten =
+      ThreadLocal.withInitial(() -> Boolean.FALSE);
 
   /**
    * whether the next replyProcessor for the current thread should perform 
severe-alert processing
    */
-  private static ThreadLocal ForceSevereAlertProcessing = new ThreadLocal() {
-    @Override
-    protected Object initialValue() {
-      return Boolean.FALSE;
-    }
-  };
+  private static ThreadLocal<Boolean> forceSevereAlertProcessing =
+      ThreadLocal.withInitial(() -> Boolean.FALSE);
 
   ////////////////////// Static Methods /////////////////////
 
@@ -580,7 +573,7 @@ protected void processActiveMembers(Set activeMembers) {
     }
   }
 
-  protected void postWait() {
+  private void postWait() {
     waiting = false;
     removeListener();
     final DM mgr = getDistributionManager();
@@ -588,25 +581,6 @@ protected void postWait() {
     mgr.getCancelCriterion().checkCancelInProgress(null);
   }
 
-  // start waiting for replies without explicitly waiting for all of them using
-  // waitForReplies* methods; useful for streaming of results in function 
execution
-  public void startWait() {
-    if (!this.waiting && stillWaiting()) {
-      preWait();
-    }
-  }
-
-  // end waiting for replies without explicitly invoking waitForReplies*
-  // methods; useful for streaming of results in function execution
-  public void endWait(boolean doCleanup) {
-    try {
-      postWait();
-    } finally {
-      if (doCleanup) {
-        cleanup();
-      }
-    }
-  }
 
   /**
    * Wait a given number of milliseconds for the expected acks to be received. 
If <code>msecs</code>
@@ -674,7 +648,7 @@ public boolean waitForReplies(long msecs, 
StoppableCountDownLatch latch, boolean
    * @param msecs the number of milliseconds to wait for replies
    * @return whether or not we received all of the replies in the given amount 
of time
    */
-  protected boolean basicWait(long msecs, StoppableCountDownLatch latch)
+  private boolean basicWait(long msecs, StoppableCountDownLatch latch)
       throws InterruptedException, ReplyException {
     if (Thread.interrupted()) {
       throw new InterruptedException();
@@ -683,7 +657,10 @@ protected boolean basicWait(long msecs, 
StoppableCountDownLatch latch)
     if (stillWaiting()) {
       long timeout = getAckWaitThreshold() * 1000L;
       long timeSoFar = System.currentTimeMillis() - this.initTime;
-      long severeAlertTimeout = getAckSevereAlertThresholdMS();
+      final long severeAlertTimeout = getAckSevereAlertThresholdMS();
+      // only start SUSPECT processing if severe alerts are enabled
+      final boolean doSuspectProcessing =
+          isSevereAlertProcessingEnabled() && (severeAlertTimeout > 0);
       if (timeout <= 0) {
         timeout = Long.MAX_VALUE;
       }
@@ -695,25 +672,40 @@ protected boolean basicWait(long msecs, 
StoppableCountDownLatch latch)
         if (timedOut || !latch.await(timeout - timeSoFar - 1)) {
           this.dmgr.getCancelCriterion().checkCancelInProgress(null);
 
-          // only start SUSPECT processing if severe alerts are enabled
-          timeout(isSevereAlertProcessingEnabled() && (severeAlertTimeout > 
0), false);
+          timeout(doSuspectProcessing, false);
 
           // If ack-severe-alert-threshold has been set, we now
           // wait for that period of time and then force the non-responding
           // members from the system. Then we wait indefinitely
-          if (isSevereAlertProcessingEnabled() && severeAlertTimeout > 0) {
-            boolean timedout;
+          if (doSuspectProcessing) {
+            boolean wasNotUnlatched;
             do {
               this.severeAlertTimerReset = false; // retry if this gets set by 
suspect processing
                                                   // (splitbrain requirement)
-              timedout = !latch.await(severeAlertTimeout);
-            } while (timedout && this.severeAlertTimerReset);
-            if (timedout) {
+              wasNotUnlatched = !latch.await(severeAlertTimeout);
+            } while (wasNotUnlatched && this.severeAlertTimerReset);
+            if (wasNotUnlatched) {
               this.dmgr.getCancelCriterion().checkCancelInProgress(null);
               timeout(false, true);
-              // for consistency, we must now wait for a membership view
-              // that ejects the removed members
-              latch.await();
+
+              long suspectProcessingErrorAlertTimeout = severeAlertTimeout * 3;
+              if (!latch.await(suspectProcessingErrorAlertTimeout)) {
+                long now = System.currentTimeMillis();
+                long totalTimeElapsed = now - this.initTime;
+
+                String waitingOnMembers;
+                synchronized (members) {
+                  waitingOnMembers = Arrays.toString(members);
+                }
+                logger.fatal("An additional " + 
suspectProcessingErrorAlertTimeout
+                    + " milliseconds have elapsed while waiting for replies. 
Total of "
+                    + totalTimeElapsed + " milliseconds elapsed (init time:" + 
this.initTime
+                    + ", now: " + now + ") Waiting for members: " + 
waitingOnMembers);
+
+                // for consistency, we must now wait indefinitely for a 
membership view
+                // that ejects the removed members
+                latch.await();
+              }
             }
           } else {
             latch.await();
@@ -722,25 +714,23 @@ protected boolean basicWait(long msecs, 
StoppableCountDownLatch latch)
           logger.info(LocalizedMessage
               
.create(LocalizedStrings.ReplyProcessor21_WAIT_FOR_REPLIES_COMPLETED_1, 
shortName()));
         }
-      } else {
-        if (msecs > timeout) {
-          if (!latch.await(timeout)) {
-            timeout(isSevereAlertProcessingEnabled() && (severeAlertTimeout > 
0), false);
-            // after timeout alert, wait remaining time
-            if (!latch.await(msecs - timeout)) {
-              logger.info(LocalizedMessage.create(
-                  
LocalizedStrings.ReplyProcessor21_WAIT_FOR_REPLIES_TIMING_OUT_AFTER_0_SEC,
-                  Long.valueOf(msecs / 1000)));
-              return false;
-            }
-            // Give an info message since timeout gave a warning.
+      } else if (msecs > timeout) {
+        if (!latch.await(timeout)) {
+          timeout(doSuspectProcessing, false);
+          // after timeout alert, wait remaining time
+          if (!latch.await(msecs - timeout)) {
             logger.info(LocalizedMessage.create(
-                
LocalizedStrings.ReplyProcessor21_WAIT_FOR_REPLIES_COMPLETED_1, shortName()));
-          }
-        } else {
-          if (!latch.await(msecs)) {
+                
LocalizedStrings.ReplyProcessor21_WAIT_FOR_REPLIES_TIMING_OUT_AFTER_0_SEC,
+                Long.valueOf(msecs / 1000)));
             return false;
           }
+          // Give an info message since timeout gave a warning.
+          logger.info(LocalizedMessage
+              
.create(LocalizedStrings.ReplyProcessor21_WAIT_FOR_REPLIES_COMPLETED_1, 
shortName()));
+        }
+      } else {
+        if (!latch.await(msecs)) {
+          return false;
         }
       }
     }
@@ -1166,25 +1156,25 @@ public void enableSevereAlertProcessing() {
    * @param flag whether to shorten the time or not
    */
   public static void setShortSevereAlertProcessing(boolean flag) {
-    SevereAlertShorten.set(Boolean.valueOf(flag));
+    severeAlertShorten.set(flag);
   }
 
   public static boolean getShortSevereAlertProcessing() {
-    return ((Boolean) SevereAlertShorten.get()).booleanValue();
+    return severeAlertShorten.get();
   }
 
   /**
    * Force reply-waits in the current thread to perform severe-alert processing
    */
   public static void forceSevereAlertProcessing() {
-    ForceSevereAlertProcessing.set(Boolean.TRUE);
+    forceSevereAlertProcessing.set(Boolean.TRUE);
   }
 
   /**
    * Reset the forcing of severe-alert processing for the current thread
    */
   public static void unforceSevereAlertProcessing() {
-    ForceSevereAlertProcessing.set(Boolean.FALSE);
+    forceSevereAlertProcessing.set(Boolean.FALSE);
   }
 
   /**
@@ -1192,7 +1182,7 @@ public static void unforceSevereAlertProcessing() {
    * current thread to perform severe-alert processing.
    */
   public static boolean isSevereAlertProcessingForced() {
-    return ((Boolean) ForceSevereAlertProcessing.get()).booleanValue();
+    return forceSevereAlertProcessing.get();
   }
 
 
@@ -1201,7 +1191,7 @@ public static boolean isSevereAlertProcessingForced() {
    */
   public long getAckSevereAlertThresholdMS() {
     long disconnectTimeout = getSevereAlertThreshold() * 1000L;
-    if (disconnectTimeout > 0 && ((Boolean) 
SevereAlertShorten.get()).booleanValue()) {
+    if (disconnectTimeout > 0 && severeAlertShorten.get()) {
       disconnectTimeout = (long) (disconnectTimeout * PR_SEVERE_ALERT_RATIO);
     }
     return disconnectTimeout;
@@ -1212,16 +1202,16 @@ public boolean isSevereAlertProcessingEnabled() {
   }
 
 
-  private static final ThreadLocal messageId = new ThreadLocal();
+  private static final ThreadLocal<Integer> messageId = new ThreadLocal<>();
 
-  private static final Integer VOID_RPID = Integer.valueOf(0);
+  private static final Integer VOID_RPID = 0;
 
   /**
    * Used by messages to store the id for the current message into a thread 
local. This allows the
    * comms layer to still send replies even when it can't deserialize a 
message.
    */
   public static void setMessageRPId(int id) {
-    messageId.set(Integer.valueOf(id));
+    messageId.set(id);
   }
 
   public static void initMessageRPId() {
@@ -1240,7 +1230,7 @@ public static int getMessageRPId() {
     int result = 0;
     Object v = messageId.get();
     if (v != null) {
-      result = ((Integer) v).intValue();
+      result = (Integer) v;
     }
     return result;
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Add another severe-alert option
> -------------------------------
>
>                 Key: GEODE-3964
>                 URL: https://issues.apache.org/jira/browse/GEODE-3964
>             Project: Geode
>          Issue Type: Bug
>          Components: messaging
>            Reporter: Bruce Schuchardt
>
> Since suspect processing only commences when the ack-severe-alert-threshold 
> is reached it would be nice to have yet another alert if that processing 
> failed to kick out the slow-to-respond member and a thread is stuck for a 
> long time waiting for a reply.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to