This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new cad7a1b  GEODE-5269 CommitConflictException after 
TransactionInDoubtException
cad7a1b is described below

commit cad7a1bd117c84f3ba1cd282ec87ccd6f3c9c22b
Author: Bruce Schuchardt <[email protected]>
AuthorDate: Mon Jun 4 10:05:08 2018 -0700

    GEODE-5269 CommitConflictException after TransactionInDoubtException
    
    Before sending a client a TransactionInDoubtException that is caused by
    a server shutting down we now wait a bit for the server to finish shutting
    down.  This allows any locks it held to be released and avoids a
    CommitConflictException if the client should immediately try another
    transaction with the same key(s).
    
    The server will wait up to 1/2 of the client's read-timeout for the
    other server to finish shutting down.
---
 .../internal/ClusterDistributionManager.java       |  2 +-
 .../distributed/internal/DistributionManager.java  |  7 ++-
 .../internal/LonerDistributionManager.java         |  2 +-
 .../distributed/internal/direct/DirectChannel.java | 11 ++--
 .../internal/membership/MembershipManager.java     | 13 +++++
 .../membership/gms/mgr/GMSMembershipManager.java   | 18 +++++--
 .../geode/internal/cache/PeerTXStateStub.java      | 17 +++---
 .../cache/tier/sockets/command/CommitCommand.java  | 60 +++++++++++++++++++--
 .../tier/sockets/command/CommitCommandTest.java    | 62 ++++++++++++++++++++++
 .../dunit/cache/internal/JUnit4CacheTestCase.java  |  5 +-
 10 files changed, 171 insertions(+), 26 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
index 8b5d82c..a513627 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
@@ -2856,7 +2856,7 @@ public class ClusterDistributionManager implements 
DistributionManager {
   }
 
   @Override
-  public boolean isCurrentMember(InternalDistributedMember id) {
+  public boolean isCurrentMember(DistributedMember id) {
     Set m;
     synchronized (this.membersLock) {
       // access to members synchronized under membersLock in order to
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java
index 167f8ba..8bb84ae 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java
@@ -306,8 +306,11 @@ public interface DistributionManager extends ReplySender {
   /** Returns a set of all roles currently in the distributed system. */
   Set getAllRoles();
 
-  /** Returns true if id is a current member of the distributed system */
-  boolean isCurrentMember(InternalDistributedMember id);
+  /**
+   * Returns true if id is a current member of the distributed system
+   *
+   */
+  boolean isCurrentMember(DistributedMember id);
 
   /**
    * Remove given member from list of members who are pending a startup reply
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
index f74c34d..d6a6cd1 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
@@ -1256,7 +1256,7 @@ public class LonerDistributionManager implements 
DistributionManager {
     this.getId().setPort(this.lonerPort);
   }
 
-  public boolean isCurrentMember(InternalDistributedMember p_id) {
+  public boolean isCurrentMember(DistributedMember p_id) {
     return getId().equals(p_id);
   }
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
index c766d26..970957f 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
@@ -20,6 +20,7 @@ import java.io.NotSerializableException;
 import java.net.InetAddress;
 import java.net.SocketTimeoutException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -314,6 +315,10 @@ public class DirectChannel {
     if (!directReply && directMsg != null) {
       directMsg.registerProcessor();
     }
+    if (logger.isDebugEnabled()) {
+      logger.debug("Sending ({}) to {} peers ({}) via tcp/ip",
+          msg, p_destinations.length, Arrays.toString(p_destinations));
+    }
 
     try {
       do {
@@ -375,9 +380,9 @@ public class DirectChannel {
         }
 
         try {
-          if (logger.isDebugEnabled()) {
-            logger.debug("{}{}) to {} peers ({}) via tcp/ip",
-                (retry ? "Retrying send (" : "Sending ("), msg, cons.size(), 
cons);
+          if (retry && logger.isDebugEnabled()) {
+            logger.debug("Retrying send ({}{}) to {} peers ({}) via tcp/ip",
+                msg, cons.size(), cons);
           }
           DMStats stats = getDMStats();
           List<?> sentCons; // used for cons we sent to this time
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/MembershipManager.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/MembershipManager.java
index f335da2..7c227ed 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/MembershipManager.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/MembershipManager.java
@@ -187,6 +187,19 @@ public interface MembershipManager {
   boolean waitForDeparture(DistributedMember mbr) throws TimeoutException, 
InterruptedException;
 
   /**
+   * Wait for the given member to not be in the membership view and for all 
direct-channel receivers
+   * for this member to be closed.
+   *
+   * @param mbr the member
+   * @param timeoutMS amount of time to wait before giving up
+   * @return for testing purposes this returns true if the serial queue for 
the member was flushed
+   * @throws InterruptedException if interrupted by another thread
+   * @throws TimeoutException if we wait too long for the member to go away
+   */
+  boolean waitForDeparture(DistributedMember mbr, int timeoutMS)
+      throws TimeoutException, InterruptedException;
+
+  /**
    * Returns true if remoteId is an existing member, otherwise waits till 
timeout. Returns false if
    * remoteId is not confirmed to be a member.
    *
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
index f393924..de2fce2 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
@@ -2239,13 +2239,26 @@ public class GMSMembershipManager implements 
MembershipManager, Manager {
    */
   public boolean waitForDeparture(DistributedMember mbr)
       throws TimeoutException, InterruptedException {
+    int memberTimeout = 
this.services.getConfig().getDistributionConfig().getMemberTimeout();
+    return waitForDeparture(mbr, memberTimeout * 4);
+  }
+
+  /*
+   * (non-Javadoc) MembershipManager method: wait for the given member to be 
gone. Throws
+   * TimeoutException if the wait goes too long
+   *
+   * @see
+   * 
org.apache.geode.distributed.internal.membership.MembershipManager#waitForDeparture(org.apache.
+   * geode.distributed.DistributedMember)
+   */
+  public boolean waitForDeparture(DistributedMember mbr, int timeoutMs)
+      throws TimeoutException, InterruptedException {
     if (Thread.interrupted())
       throw new InterruptedException();
     boolean result = false;
     DirectChannel dc = directChannel;
     InternalDistributedMember idm = (InternalDistributedMember) mbr;
-    int memberTimeout = 
this.services.getConfig().getDistributionConfig().getMemberTimeout();
-    long pauseTime = (memberTimeout < 1000) ? 100 : memberTimeout / 10;
+    long pauseTime = (timeoutMs < 4000) ? 100 : timeoutMs / 40;
     boolean wait;
     int numWaits = 0;
     do {
@@ -2277,7 +2290,6 @@ public class GMSMembershipManager implements 
MembershipManager, Manager {
       if (wait) {
         numWaits++;
         if (numWaits > 40) {
-          // waited over 4 * memberTimeout ms. Give up at this point
           throw new TimeoutException("waited too long for " + idm + " to be 
removed");
         }
         Thread.sleep(pauseTime);
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PeerTXStateStub.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PeerTXStateStub.java
index 6211b6a..670bbb4 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PeerTXStateStub.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PeerTXStateStub.java
@@ -14,6 +14,7 @@
  */
 package org.apache.geode.internal.cache;
 
+
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.CancelException;
@@ -131,14 +132,14 @@ public class PeerTXStateStub extends TXStateStub {
       }
     } catch (Exception e) {
       this.getCache().getCancelCriterion().checkCancelInProgress(e);
-      if (e.getCause() != null) {
-        if (e.getCause() instanceof ForceReattemptException) {
-          Throwable e2 = e.getCause();
-          if (e2.getCause() != null && e2.getCause() instanceof 
PrimaryBucketException) {
+      Throwable eCause = e.getCause();
+      if (eCause != null) {
+        if (eCause instanceof ForceReattemptException) {
+          if (eCause.getCause() instanceof PrimaryBucketException) {
             // data rebalanced
             TransactionDataRebalancedException tdnce =
-                new 
TransactionDataRebalancedException(e2.getCause().getMessage());
-            tdnce.initCause(e2.getCause());
+                new 
TransactionDataRebalancedException(eCause.getCause().getMessage());
+            tdnce.initCause(eCause.getCause());
             throw tdnce;
           } else {
             // We cannot be sure that the member departed starting to process 
commit request,
@@ -146,11 +147,11 @@ public class PeerTXStateStub extends TXStateStub {
             // fixes 44939
             TransactionInDoubtException tdnce =
                 new TransactionInDoubtException(e.getCause().getMessage());
-            tdnce.initCause(e.getCause());
+            tdnce.initCause(eCause);
             throw tdnce;
           }
         }
-        throw new TransactionInDoubtException(e.getCause());
+        throw new TransactionInDoubtException(eCause);
       } else {
         throw new TransactionInDoubtException(e);
       }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java
index 936e9fb..3ded490 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java
@@ -15,7 +15,11 @@
 package org.apache.geode.internal.cache.tier.sockets.command;
 
 import java.io.IOException;
+import java.util.concurrent.TimeoutException;
 
+import org.apache.geode.CancelException;
+import org.apache.geode.cache.TransactionInDoubtException;
+import org.apache.geode.distributed.DistributedMember;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.cache.TXCommitMessage;
@@ -47,6 +51,8 @@ public class CommitCommand extends BaseCommand {
   @Override
   public void cmdExecute(final Message clientMessage, final ServerConnection 
serverConnection,
       final SecurityService securityService, long start) throws IOException {
+
+
     serverConnection.setAsTrue(REQUIRES_RESPONSE);
     TXManagerImpl txMgr = (TXManagerImpl) 
serverConnection.getCache().getCacheTransactionManager();
     InternalDistributedMember client =
@@ -74,18 +80,27 @@ public class CommitCommand extends BaseCommand {
     if (logger.isDebugEnabled()) {
       logger.debug("TX: committing client tx: {}", txId);
     }
-    try {
-
-      txId = txProxy.getTxId();
+    commitTransaction(clientMessage, serverConnection, txMgr, wasInProgress,
+        txProxy);
+  }
 
+  protected void commitTransaction(Message clientMessage, ServerConnection 
serverConnection,
+      TXManagerImpl txMgr,
+      boolean wasInProgress, TXStateProxy txProxy) throws IOException {
+    Exception txException = null;
+    TXCommitMessage commitMsg = null;
+    TXId txId = txProxy.getTxId();
+    try {
       txProxy.setCommitOnBehalfOfRemoteStub(true);
       txMgr.commit();
 
       commitMsg = txProxy.getCommitMessage();
+      logger.debug("Sending commit response to client: {}", commitMsg);
       writeCommitResponse(commitMsg, clientMessage, serverConnection);
       serverConnection.setAsTrue(RESPONDED);
+
     } catch (Exception e) {
-      sendException(clientMessage, serverConnection, e);
+      txException = e;
     } finally {
       if (txId != null) {
         txMgr.removeHostedTXState(txId);
@@ -97,10 +112,45 @@ public class CommitCommand extends BaseCommand {
         commitMsg.setClientVersion(null); // fixes bug 46529
       }
     }
+    if (txException != null) {
+      DistributedMember target = txProxy.getTarget();
+      // a TransactionInDoubtException caused by the TX host shutting down 
means that
+      // the transaction may still be active and hold locks. We must wait for 
the transaction
+      // host to finish shutting down before responding to the client or it 
could encounter
+      // conflicts in retrying the transaction
+      try {
+        if ((txException instanceof TransactionInDoubtException)
+            && (txException.getCause() instanceof CancelException)) {
+          // base the wait time on the client's read-timeout setting so that 
we respond before
+          // it gives up reading. Since we've already done a commit we've 
eaten up some time
+          // so we use a WAG of half the read-timeout
+          int timeToWait = 
serverConnection.getHandshake().getClientReadTimeout() / 2;
+          if (timeToWait < 0) {
+            return;
+          }
+          logger.info(
+              "Waiting up to {}ms for departure of {} before throwing 
TransactionInDoubtException.",
+              timeToWait, target);
+          try {
+            
serverConnection.getCache().getDistributionManager().getMembershipManager()
+                .waitForDeparture(target, timeToWait);
+          } catch (TimeoutException e) {
+            // status will be logged below
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+          }
+          logger.info("Done waiting.  Transaction host {} in the cluster.",
+              
serverConnection.getCache().getDistributionManager().isCurrentMember(target)
+                  ? "is still"
+                  : "is no longer");
+        }
+      } finally {
+        sendException(clientMessage, serverConnection, txException);
+      }
+    }
   }
 
 
-
   protected static void writeCommitResponse(TXCommitMessage response, Message 
origMsg,
       ServerConnection servConn) throws IOException {
     Message responseMsg = servConn.getResponseMessage();
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommandTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommandTest.java
index 4da081d..a3f2ef9 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommandTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommandTest.java
@@ -14,14 +14,31 @@
  */
 package org.apache.geode.internal.cache.tier.sockets.command;
 
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.net.InetAddress;
+
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.cache.TransactionInDoubtException;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.DistributionManager;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.distributed.internal.membership.MembershipManager;
 import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXStateProxy;
+import org.apache.geode.internal.cache.tier.ServerSideHandshake;
 import org.apache.geode.internal.cache.tier.sockets.Message;
 import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
 import org.apache.geode.test.junit.categories.ClientServerTest;
@@ -50,4 +67,49 @@ public class CommitCommandTest {
 
     CommitCommand.writeCommitResponse(null, origMsg, servConn);
   }
+
+  /**
+   * GEODE-5269 CommitConflictException after TransactionInDoubtException
+   * CommitCommand needs to stall waiting for the host of a transaction to
+   * finish shutting down before sending a TransactionInDoubtException to
+   * the client.
+   */
+  @Test
+  public void testTransactionInDoubtWaitsForTargetDeparture() throws Exception 
{
+    CommitCommand command = (CommitCommand) CommitCommand.getCommand();
+    Message clientMessage = mock(Message.class);
+    ServerConnection serverConnection = mock(ServerConnection.class);
+    TXManagerImpl txMgr = mock(TXManagerImpl.class);
+    TXStateProxy txProxy = mock(TXStateProxy.class);
+    InternalCache cache = mock(InternalCache.class);
+    DistributionManager distributionManager = mock(DistributionManager.class);
+    MembershipManager membershipManager = mock(MembershipManager.class);
+    ServerSideHandshake handshake = mock(ServerSideHandshake.class);
+    boolean wasInProgress = false;
+
+    doReturn(cache).when(serverConnection).getCache();
+    doReturn(distributionManager).when(cache).getDistributionManager();
+    
doReturn(membershipManager).when(distributionManager).getMembershipManager();
+    doReturn(false).when(distributionManager).isCurrentMember(isA(
+        InternalDistributedMember.class));
+
+    
doReturn(mock(Message.class)).when(serverConnection).getErrorResponseMessage();
+    doReturn(handshake).when(serverConnection).getHandshake();
+    doReturn(1000).when(handshake).getClientReadTimeout();
+
+    doReturn(new InternalDistributedMember(InetAddress.getLocalHost(), 
1234)).when(txProxy)
+        .getTarget();
+
+    TransactionInDoubtException transactionInDoubtException =
+        new TransactionInDoubtException("tx in doubt");
+    transactionInDoubtException.initCause(new CacheClosedException("testing"));
+    doThrow(transactionInDoubtException).when(txMgr).commit();
+
+    command.commitTransaction(
+        clientMessage, serverConnection, txMgr, wasInProgress, txProxy);
+
+    verify(txMgr, atLeastOnce()).commit();
+    verify(membershipManager, 
times(1)).waitForDeparture(isA(DistributedMember.class),
+        isA(Integer.class));
+  }
 }
diff --git 
a/geode-core/src/test/java/org/apache/geode/test/dunit/cache/internal/JUnit4CacheTestCase.java
 
b/geode-core/src/test/java/org/apache/geode/test/dunit/cache/internal/JUnit4CacheTestCase.java
index fa9a6f2..c93f921 100644
--- 
a/geode-core/src/test/java/org/apache/geode/test/dunit/cache/internal/JUnit4CacheTestCase.java
+++ 
b/geode-core/src/test/java/org/apache/geode/test/dunit/cache/internal/JUnit4CacheTestCase.java
@@ -41,7 +41,6 @@ import org.apache.geode.cache.TimeoutException;
 import org.apache.geode.cache.client.ClientCache;
 import org.apache.geode.cache.client.ClientCacheFactory;
 import org.apache.geode.cache.client.PoolManager;
-import org.apache.geode.cache.client.internal.InternalClientCache;
 import org.apache.geode.cache30.CacheSerializableRunnable;
 import org.apache.geode.distributed.internal.DistributionMessageObserver;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
@@ -263,8 +262,8 @@ public abstract class JUnit4CacheTestCase extends 
JUnit4DistributedTestCase
     }
   }
 
-  public final InternalClientCache getClientCache() {
-    return (InternalClientCache) cache;
+  public final ClientCache getClientCache() {
+    return (ClientCache) cache;
   }
 
   /**

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to