This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 9d87117  GEODE-5393: StateFlushOperation hangs waiting for 
non-existant operation to complete
9d87117 is described below

commit 9d87117b131172e05ba28fc6154aca3370115aa1
Author: Bruce Schuchardt <[email protected]>
AuthorDate: Mon Jul 9 15:45:29 2018 -0700

    GEODE-5393: StateFlushOperation hangs waiting for non-existant operation to 
complete
    
    I've added additional debugging to DistributionAdvisor so that it knows
    which threads are performing operations and can log them at debug level.
    This let me determine that a putAll operation was the source of the hang
    due to an exception being thrown during message distribution in
    DistributedCacheOperation.startOperation().  The exception resulted in
    DistributionAdvisor.endOperation() not being invoked correctly.
---
 .../distributed/internal/DistributionAdvisor.java  | 427 +++++++++++++--------
 .../internal/admin/remote/ShutdownAllRequest.java  |  19 +-
 .../internal/cache/DistributedCacheOperation.java  |  37 +-
 .../geode/internal/cache/LocalRegionDataView.java  |   4 +-
 .../geode/internal/cache/TXCommitMessage.java      |   2 +-
 .../cache/DistributedCacheOperationTest.java       |  61 +++
 6 files changed, 367 insertions(+), 183 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
index 9ac4563..71d64a4 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
@@ -17,6 +17,8 @@ package org.apache.geode.distributed.internal;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -122,11 +124,12 @@ public class DistributionAdvisor {
   private final AtomicInteger profileVersionSequencer = new 
AtomicInteger(START_VERSION_NUMBER);
 
   /**
-   * This system property is not supported and disabling intelligent messaging 
is currently
-   * problematic
+   * The operationMonitor tracks in-progress cache operations and holds the 
profile set
+   * version number
    */
-  protected static final boolean disabled = 
Boolean.getBoolean("disable-intelligent-msg");
-
+  private final OperationMonitor operationMonitor =
+      logger.isDebugEnabled() ? new ThreadTrackingOperationMonitor(this)
+          : new OperationMonitor(this);
 
 
   /**
@@ -143,36 +146,12 @@ public class DistributionAdvisor {
   private final Object initializeLock = new Object();
 
   /**
-   * the version of the profile set
-   *
-   * @since GemFire 5.1
-   */
-  private long membershipVersion;
-
-  /**
    * whether membership ops are closed (because the DA's been closed). Access 
under synchronization
    * on (this)
    */
   private boolean membershipClosed;
 
   /**
-   * opCountLock guards access to previousVersionOpCount and 
currentVersionOpCount
-   */
-  private final Object opCountLock = new Object();
-
-  /**
-   * the number of operations in-progress for previous versions of the profile 
set. Guarded by
-   * opCountLock
-   */
-  private long previousVersionOpCount;
-
-  /**
-   * the number of operations in-progress for the current version of the 
profile set. Guarded by
-   * opCountLock
-   */
-  private long currentVersionOpCount;
-
-  /**
    * Hold onto removed profiles to compare to late-processed profiles. Fix for 
bug 36881. Protected
    * by synchronizing on this DistributionAdvisor. guarded.By this 
DistributionAdvisor
    */
@@ -399,10 +378,7 @@ public class DistributionAdvisor {
     try {
       synchronized (this) {
         this.membershipClosed = true;
-        synchronized (this.opCountLock) {
-          this.previousVersionOpCount = 0;
-          this.currentVersionOpCount = 0;
-        }
+        operationMonitor.close();
       }
       getDistributionManager().removeMembershipListener(this.ml);
     } catch (CancelException e) {
@@ -412,6 +388,7 @@ public class DistributionAdvisor {
     }
   }
 
+
   /**
    * Atomically add listener to the list to receive notification when a *new* 
profile is added or a
    * profile is removed, and return adviseGeneric(). This ensures that no 
membership listener calls
@@ -616,20 +593,11 @@ public class DistributionAdvisor {
         newProfile.initialMembershipVersion = 
oldProfile.initialMembershipVersion;
       } else {
         if (!membershipClosed) {
-          membershipVersion++;
-          if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE)) {
-            logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE,
-                "StateFlush incremented membership version: {}", 
membershipVersion);
-          }
-          newProfile.initialMembershipVersion = membershipVersion;
-          synchronized (this.opCountLock) {
-            previousVersionOpCount += currentVersionOpCount;
-            currentVersionOpCount = 0;
-          }
+          operationMonitor.initNewProfile(newProfile);
         }
       }
     } else {
-      forceNewMembershipVersion();
+      operationMonitor.forceNewMembershipVersion();
     }
 
     if (isTraceEnabled_DistributionAdvisor) {
@@ -719,23 +687,8 @@ public class DistributionAdvisor {
    *
    * @since GemFire 5.1
    */
-  public synchronized void forceNewMembershipVersion() {
-    if (!membershipClosed) {
-      membershipVersion++;
-      if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE)) {
-        logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE,
-            "StateFlush forced new membership version: {}", membershipVersion);
-      }
-      synchronized (this.opCountLock) {
-        previousVersionOpCount += currentVersionOpCount;
-        currentVersionOpCount = 0;
-        if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_STATE_FLUSH_VERBOSE)) 
{
-          logger.trace(LogMarker.DISTRIBUTION_STATE_FLUSH_VERBOSE,
-              "advisor for {} forced new membership version to {} 
previousOpCount={}", getAdvisee(),
-              membershipVersion, previousVersionOpCount);
-        }
-      }
-    }
+  public void forceNewMembershipVersion() {
+    operationMonitor.forceNewMembershipVersion();
   }
 
   /**
@@ -746,20 +699,8 @@ public class DistributionAdvisor {
    * @return the current membership version for this advisor
    * @since GemFire 5.1
    */
-  public synchronized long startOperation() {
-    if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_STATE_FLUSH_VERBOSE)) {
-      logger.trace(LogMarker.DISTRIBUTION_STATE_FLUSH_VERBOSE,
-          "startOperation() op count is now {} in view version {}", 
currentVersionOpCount + 1,
-          membershipVersion);
-    }
-    synchronized (this.opCountLock) {
-      currentVersionOpCount++;
-      if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE)) {
-        logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE, "StateFlush current 
opcount incremented: {}",
-            currentVersionOpCount);
-      }
-    }
-    return membershipVersion;
+  public long startOperation() {
+    return operationMonitor.startOperation();
   }
 
   /**
@@ -769,23 +710,8 @@ public class DistributionAdvisor {
    * @param version The membership version returned by startOperation
    * @since GemFire 5.1
    */
-  public synchronized long endOperation(long version) {
-    synchronized (this.opCountLock) {
-      if (version == membershipVersion) {
-        currentVersionOpCount--;
-        if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE)) {
-          logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE,
-              "StateFlush current opcount deccremented: {}", 
currentVersionOpCount);
-        }
-      } else {
-        previousVersionOpCount--;
-        if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE)) {
-          logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE,
-              "StateFlush previous opcount incremented: {}", 
previousVersionOpCount);
-        }
-      }
-    }
-    return membershipVersion;
+  public void endOperation(long version) {
+    operationMonitor.endOperation(version);
   }
 
   /**
@@ -795,63 +721,13 @@ public class DistributionAdvisor {
    * @since GemFire 5.1
    */
   public void waitForCurrentOperations() {
-    long timeout =
-        1000L * 
this.getDistributionManager().getSystem().getConfig().getAckWaitThreshold();
-    waitForCurrentOperations(logger, timeout, timeout * 2L);
+    operationMonitor.waitForCurrentOperations();
   }
 
   public void waitForCurrentOperations(Logger alertLogger, long warnMS, long 
severeAlertMS) {
     // this may wait longer than it should if the membership version changes, 
dumping
     // more operations into the previousVersionOpCount
-    final long startTime = System.currentTimeMillis();
-    final long warnTime = startTime + warnMS;
-    final long severeAlertTime = startTime + severeAlertMS;
-    boolean warned = false;
-    boolean severeAlertIssued = false;
-    final boolean isDebugEnabled_STATE_FLUSH_OP =
-        
DistributionAdvisor.logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE);
-    while (true) {
-      long opCount;
-      synchronized (this.opCountLock) {
-        opCount = this.previousVersionOpCount;
-      }
-      if (opCount <= 0) {
-        if (warned) {
-          alertLogger.info("Wait for current operations completed");
-        }
-        break;
-      }
-      // The advisor's close() method will set the pVOC to zero. This loop
-      // must not terminate due to cache closure until that happens.
-      // See bug 34361 comment 79
-      if (isDebugEnabled_STATE_FLUSH_OP) {
-        DistributionAdvisor.logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE,
-            "Waiting for current operations to finish({})", opCount);
-      }
-      try {
-        Thread.sleep(50);
-      } catch (InterruptedException e) {
-        throw new GemFireIOException("State flush interrupted");
-      }
-      long now = System.currentTimeMillis();
-      if ((!warned) && System.currentTimeMillis() >= warnTime) {
-        warned = true;
-        alertLogger.warn("This operation has been stalled for {} milliseconds 
waiting for "
-            + "current operations to complete.", warnMS);
-      } else if (warned && !severeAlertIssued && (now >= severeAlertTime)) {
-        // OSProcess.printStacks(0);
-        alertLogger.fatal("This operation has been stalled for {} milliseconds 
"
-            + "waiting for current operations to complete.  Something may be 
blocking operations.",
-            severeAlertMS);
-        severeAlertIssued = true;
-      }
-    }
-    if (this.membershipClosed) {
-      if (isDebugEnabled_STATE_FLUSH_OP) {
-        DistributionAdvisor.logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE,
-            "State Flush stopped waiting for operations to distribute because 
advisor has been closed");
-      }
-    }
+    operationMonitor.waitForCurrentOperations(alertLogger, warnMS, 
severeAlertMS);
   }
 
   /**
@@ -1279,12 +1155,6 @@ public class DistributionAdvisor {
   /** All advise methods go through this method */
   protected Set<InternalDistributedMember> adviseFilter(Filter f) {
     initializationGate();
-    if (disabled) {
-      if (logger.isDebugEnabled()) {
-        logger.debug("Intelligent Messaging Disabled");
-      }
-      return getDefaultDistributionMembers();
-    }
     Set<InternalDistributedMember> recipients = null;
     Profile[] locProfiles = this.profiles; // grab current profiles
     for (int i = 0; i < locProfiles.length; i++) {
@@ -1310,12 +1180,6 @@ public class DistributionAdvisor {
    **/
   protected boolean satisfiesFilter(Filter f) {
     initializationGate();
-    if (disabled) {
-      if (logger.isDebugEnabled()) {
-        logger.debug("Intelligent Messaging Disabled");
-      }
-      return !getDefaultDistributionMembers().isEmpty();
-    }
     Profile[] locProfiles = this.profiles; // grab current profiles
     for (Profile p : locProfiles) {
       if (f.include(p)) {
@@ -1680,4 +1544,257 @@ public class DistributionAdvisor {
       return null;
     }
   }
+
+
+
+  private static class OperationMonitor {
+    private final DistributionAdvisor distributionAdvisor;
+
+    /**
+     * the version of the profile set
+     */
+    private long membershipVersion;
+
+    /**
+     * the number of operations in-progress for previous versions of the 
profile set
+     */
+    private long previousVersionOpCount;
+    /**
+     * the number of operations in-progress for the current version of the 
profile set
+     */
+    private long currentVersionOpCount;
+
+    /**
+     * for debugging stalled state-flush operations we track threads 
performing operations
+     * and capture the state when startOperatiopn is invoked
+     */
+    private boolean closed;
+
+    private OperationMonitor(DistributionAdvisor distributionAdvisor) {
+      this.distributionAdvisor = distributionAdvisor;
+    }
+
+    private synchronized void incrementMembershipVersion() {
+      membershipVersion++;
+    }
+
+    /**
+     * Create a new version of the membership profile set. This is used in 
flushing state out of the
+     * VM for previous versions of the set.
+     *
+     * @since GemFire 5.1
+     */
+    synchronized void forceNewMembershipVersion() {
+      if (!closed) {
+        incrementMembershipVersion();
+        previousVersionOpCount += currentVersionOpCount;
+        currentVersionOpCount = 0;
+        membershipVersionChanged();
+      }
+    }
+
+    /**
+     * this method must be invoked at the start of every operation that can 
modify the state of
+     * resource. The return value must be recorded and sent to the advisor in 
an endOperation
+     * message when messages for the operation have been put in the 
DistributionManager's outgoing
+     * "queue".
+     *
+     * @return the current membership version for this advisor
+     * @since GemFire 5.1
+     */
+    synchronized long startOperation() {
+      logNewOperation();
+      currentVersionOpCount++;
+      return membershipVersion;
+    }
+
+    /**
+     * This method must be invoked when messages for an operation have been 
put in the
+     * DistributionManager's outgoing queue.
+     *
+     * @param version The membership version returned by startOperation
+     * @since GemFire 5.1
+     */
+    synchronized void endOperation(long version) {
+      if (version == membershipVersion) {
+        currentVersionOpCount--;
+        logEndOperation(true);
+      } else {
+        previousVersionOpCount--;
+        logEndOperation(false);
+      }
+    }
+
+    /**
+     * wait for the current operations being sent on views prior to the 
joining of the given member
+     * to be placed on communication channels before returning
+     *
+     * @since GemFire 5.1
+     */
+    void waitForCurrentOperations() {
+      long timeout =
+          1000L * 
distributionAdvisor.getDistributionManager().getSystem().getConfig()
+              .getAckWaitThreshold();
+      waitForCurrentOperations(logger, timeout, timeout * 2L);
+    }
+
+    void waitForCurrentOperations(Logger alertLogger, long warnMS, long 
severeAlertMS) {
+      // this may wait longer than it should if the membership version 
changes, dumping
+      // more operations into the previousVersionOpCount
+      final long startTime = System.currentTimeMillis();
+      final long warnTime = startTime + warnMS;
+      final long severeAlertTime = startTime + severeAlertMS;
+      boolean warned = false;
+      boolean severeAlertIssued = false;
+      while (operationsAreInProgress()) {
+        // The advisor's close() method will set the pVOC to zero. This loop
+        // must not terminate due to cache closure until that happens.
+        try {
+          Thread.sleep(50);
+        } catch (InterruptedException e) {
+          throw new GemFireIOException("State flush interrupted");
+        }
+        long now = System.currentTimeMillis();
+        if ((!warned) && System.currentTimeMillis() >= warnTime) {
+          warned = true;
+          logWaitOnOperationsWarning(alertLogger, warnMS);
+        } else if (warned && !severeAlertIssued && (now >= severeAlertTime)) {
+          logWaitOnOperationsSevere(alertLogger, severeAlertMS);
+          severeAlertIssued = true;
+        }
+      }
+      if (warned) {
+        alertLogger.info("Wait for current operations completed");
+      }
+    }
+
+    synchronized boolean operationsAreInProgress() {
+      return previousVersionOpCount > 0;
+    }
+
+    synchronized void initNewProfile(Profile newProfile) {
+      membershipVersion++;
+      newProfile.initialMembershipVersion = membershipVersion;
+      previousVersionOpCount =
+          previousVersionOpCount + currentVersionOpCount;
+      currentVersionOpCount = 0;
+      membershipVersionChanged();
+    }
+
+    synchronized void close() {
+      previousVersionOpCount = 0;
+      currentVersionOpCount = 0;
+      closed = true;
+    }
+
+    void logNewOperation() {}
+
+    void logEndOperation(boolean newOperation) {}
+
+    void logWaitOnOperationsSevere(Logger alertLogger, long severeAlertMS) {
+      // OSProcess.printStacks(0);
+      alertLogger.fatal("This thread has been stalled for {} milliseconds "
+          + "waiting for current operations to complete.  Something may be 
blocking operations.",
+          severeAlertMS);
+    }
+
+    void logWaitOnOperationsWarning(Logger alertLogger, long warnMS) {
+      alertLogger.warn("This thread has been stalled for {} milliseconds 
waiting for "
+          + "current operations to complete.", warnMS);
+    }
+
+    void membershipVersionChanged() {}
+
+  }
+
+  private static class ThreadTrackingOperationMonitor extends OperationMonitor 
{
+
+    /**
+     * for debugging stalled state-flush operations we track threads 
performing operations
+     * and capture the state when startOperatiopn is invoked
+     */
+    private final Map<Thread, ExceptionWrapper> currentVersionOperationThreads;
+    private final Map<Thread, ExceptionWrapper> 
previousVersionOperationThreads;
+
+    private ThreadTrackingOperationMonitor(
+        DistributionAdvisor distributionAdvisor) {
+      super(distributionAdvisor);
+      this.currentVersionOperationThreads = new HashMap<>();
+      this.previousVersionOperationThreads = new HashMap<>();
+    }
+
+    @Override
+    void logNewOperation() {
+      currentVersionOperationThreads.put(Thread.currentThread(),
+          new ExceptionWrapper(new Exception("stack trace")));
+    }
+
+    @Override
+    void logEndOperation(boolean newOp) {
+      if (newOp) {
+        currentVersionOperationThreads.remove(Thread.currentThread());
+      } else {
+        previousVersionOperationThreads.remove(Thread.currentThread());
+      }
+    }
+
+    @Override
+    void logWaitOnOperationsWarning(Logger alertLogger, long warnMS) {
+      super.logWaitOnOperationsWarning(alertLogger, warnMS);
+      synchronized (this) {
+        logger
+            .debug("Waiting for these threads: {}", 
previousVersionOperationThreads);
+        logger
+            .debug("New version threads are {}", 
currentVersionOperationThreads);
+      }
+    }
+
+    @Override
+    void logWaitOnOperationsSevere(Logger alertLogger, long severeAlertMS) {
+      super.logWaitOnOperationsSevere(alertLogger, severeAlertMS);
+      synchronized (this) {
+        logger
+            .debug("Waiting for these threads: {}", 
previousVersionOperationThreads);
+        logger
+            .debug("New version threads are {}", 
currentVersionOperationThreads);
+      }
+    }
+
+    @Override
+    void membershipVersionChanged() {
+      super.membershipVersionChanged();
+      previousVersionOperationThreads
+          .putAll(currentVersionOperationThreads);
+      currentVersionOperationThreads.clear();
+    }
+
+
+    /**
+     * ExceptionWrapper is used in debugging hangs in 
waitForCurrentOperations(). It
+     * captures the call stack of a thread invoking startOperation().
+     */
+    private static class ExceptionWrapper {
+      private Exception exception;
+
+      ExceptionWrapper(Exception exception) {
+        this.exception = exception;
+      }
+
+      @Override
+      public String toString() {
+        StringBuilder builder = new StringBuilder(500);
+        OutputStream os = new OutputStream() {
+          @Override
+          public void write(int i) {
+            builder.append((char) i);
+          }
+        };
+        PrintStream stream = new PrintStream(os);
+        exception.printStackTrace(stream);
+        return builder.toString();
+      }
+    }
+
+  }
+
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/admin/remote/ShutdownAllRequest.java
 
b/geode-core/src/main/java/org/apache/geode/internal/admin/remote/ShutdownAllRequest.java
index 4579fe1..1d074fc 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/admin/remote/ShutdownAllRequest.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/admin/remote/ShutdownAllRequest.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Set;
 import java.util.TreeSet;
 
@@ -260,13 +261,11 @@ public class ShutdownAllRequest extends AdminRequest {
       }
       if (msg instanceof ShutdownAllResponse) {
         if (((ShutdownAllResponse) msg).isToShutDown()) {
-          if (logger.isDebugEnabled()) {
-            synchronized (results) {
-              logger.debug("{} adding {} to result set {}", this, 
msg.getSender(),
-                  results);
-            }
+          synchronized (results) {
+            logger.debug("{} adding {} to result set {}", this, 
msg.getSender(),
+                results);
+            this.results.add(msg.getSender());
           }
-          this.results.add(msg.getSender());
         } else {
           // for member without cache, we will not wait for its result
           // so no need to wait its DS to close either
@@ -291,9 +290,11 @@ public class ShutdownAllRequest extends AdminRequest {
     }
 
     public Set getResults() {
-      logger.debug("{} shutdownAll returning {}", this,
-          results/* , new Exception("stack trace") */);
-      return results;
+      synchronized (results) {
+        logger.debug("{} shutdownAll returning {}", this,
+            results);
+        return new HashSet(results);
+      }
     }
   }
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
index 4fee0a0..a7c3533 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
@@ -253,25 +253,30 @@ public abstract class DistributedCacheOperation {
   public long startOperation() {
     DistributedRegion region = getRegion();
     long viewVersion = -1;
-    if (this.containsRegionContentChange()) {
-      viewVersion = region.getDistributionAdvisor().startOperation();
-    }
-    if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE)) {
-      logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE, "dispatching operation in 
view version {}",
-          viewVersion);
-    }
     try {
-      _distribute();
-    } catch (InvalidVersionException e) {
-      if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
-        logger.trace(LogMarker.DM_VERBOSE,
-            "PutAll failed since versions were missing; retrying again", e);
+      if (this.containsRegionContentChange()) {
+        viewVersion = region.getDistributionAdvisor().startOperation();
       }
+      if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE)) {
+        logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE, "dispatching operation 
in view version {}",
+            viewVersion);
+      }
+      try {
+        _distribute();
+      } catch (InvalidVersionException e) {
+        if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
+          logger.trace(LogMarker.DM_VERBOSE,
+              "PutAll failed since versions were missing; retrying", e);
+        }
 
-      if (test_InvalidVersionAction != null) {
-        test_InvalidVersionAction.run();
+        if (test_InvalidVersionAction != null) {
+          test_InvalidVersionAction.run();
+        }
+        _distribute();
       }
-      _distribute();
+    } catch (RuntimeException | Error e) {
+      endOperation(viewVersion);
+      throw e;
     }
     return viewVersion;
   }
@@ -310,7 +315,7 @@ public abstract class DistributedCacheOperation {
    * members. This method should wrapped by startOperation() and 
endOperation() in try/finally
    * block.
    */
-  private void _distribute() {
+  protected void _distribute() {
     DistributedRegion region = getRegion();
     DistributionManager mgr = region.getDistributionManager();
     boolean reliableOp = isOperationReliable() && 
region.requiresReliabilityCheck();
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
index c17a25c..6779574 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
@@ -326,7 +326,7 @@ public class LocalRegionDataView implements 
InternalDataView {
       token = reg.postPutAllSend(putallOp, successfulPuts);
       reg.postPutAllFireEvents(putallOp, successfulPuts);
     } finally {
-      if (reg instanceof DistributedRegion) {
+      if (token != -1 && reg instanceof DistributedRegion) {
         putallOp.endOperation(token);
       }
     }
@@ -348,7 +348,7 @@ public class LocalRegionDataView implements 
InternalDataView {
       token = reg.postRemoveAllSend(op, successfulOps);
       reg.postRemoveAllFireEvents(op, successfulOps);
     } finally {
-      if (reg instanceof DistributedRegion) {
+      if (token != -1 && reg instanceof DistributedRegion) {
         op.endOperation(token);
       }
     }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
index 722130e..13f15ea 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
@@ -300,7 +300,7 @@ public class TXCommitMessage extends 
PooledDistributionMessage
       // need to continue the iteration if one of the regions is destroyed
       // since others may still be okay
       try {
-        long newv = dr.getDistributionAdvisor().endOperation(viewVersion);
+        dr.getDistributionAdvisor().endOperation(viewVersion);
       } catch (RuntimeException ex) {
         rte = ex;
       }
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java
index c0eb997..bde43fb 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java
@@ -15,6 +15,8 @@
 package org.apache.geode.internal.cache;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -26,6 +28,7 @@ import java.util.Map;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import org.apache.geode.cache.CacheEvent;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import 
org.apache.geode.internal.cache.DistributedCacheOperation.CacheOperationMessage;
 import org.apache.geode.internal.cache.persistence.PersistentMemberID;
@@ -48,4 +51,62 @@ public class DistributedCacheOperationTest {
 
     assertThat(mockDistributedCacheOperation.supportsDirectAck()).isFalse();
   }
+
+  /**
+   * The startOperation and endOperation methods of DistributedCacheOperation 
record the
+   * beginning and end of distribution of an operation. If startOperation is 
invoked it
+   * is essential that endOperation be invoked or the state-flush operation 
will hang.<br>
+   * This test ensures that if distribution of the operation throws an 
exception then
+   * endOperation is correctly invoked before allowing the exception to escape 
the startOperation
+   * method.
+   */
+  @Test
+  public void endOperationIsInvokedOnDistributionError() {
+    DistributedRegion region = mock(DistributedRegion.class);
+    CacheDistributionAdvisor advisor = mock(CacheDistributionAdvisor.class);
+    when(region.getDistributionAdvisor()).thenReturn(advisor);
+    TestOperation operation = new TestOperation(null);
+    operation.region = region;
+    try {
+      operation.startOperation();
+    } catch (RuntimeException e) {
+      assertEquals("boom", e.getMessage());
+    }
+    assertTrue(operation.endOperationInvoked);
+  }
+
+  static class TestOperation extends DistributedCacheOperation {
+    boolean endOperationInvoked;
+    DistributedRegion region;
+
+    public TestOperation(CacheEvent event) {
+      super(event);
+    }
+
+    @Override
+    public DistributedRegion getRegion() {
+      return region;
+    }
+
+    @Override
+    public boolean containsRegionContentChange() {
+      return true;
+    }
+
+    @Override
+    public void endOperation(long viewVersion) {
+      endOperationInvoked = true;
+      super.endOperation(viewVersion);
+    }
+
+    @Override
+    protected CacheOperationMessage createMessage() {
+      return null;
+    }
+
+    @Override
+    protected void _distribute() {
+      throw new RuntimeException("boom");
+    }
+  }
 }

Reply via email to