This is an automated email from the ASF dual-hosted git repository.
bschuchardt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new f50b945 GEODE-5269 CommitConflictException after
TransactionInDoubtException
f50b945 is described below
commit f50b9450b918ad4409c5a6ebf374b88de8c75986
Author: Bruce Schuchardt <[email protected]>
AuthorDate: Fri Jun 1 13:26:05 2018 -0700
GEODE-5269 CommitConflictException after TransactionInDoubtException
Before sending a client a TransactionInDoubtException that is caused by
a server shutting down we now wait a bit for the server to finish shutting
down. This allows any locks it held to be released and avoids a
CommitConflictException if the client should immediately try another
transaction with the same key(s).
---
.../internal/ClusterDistributionManager.java | 2 +-
.../distributed/internal/DistributionManager.java | 7 ++-
.../internal/LonerDistributionManager.java | 2 +-
.../distributed/internal/direct/DirectChannel.java | 11 +++--
.../geode/internal/cache/PeerTXStateStub.java | 17 ++++---
.../cache/tier/sockets/command/CommitCommand.java | 50 +++++++++++++++++--
.../tier/sockets/command/CommitCommandTest.java | 57 ++++++++++++++++++++++
.../dunit/cache/internal/JUnit4CacheTestCase.java | 5 +-
8 files changed, 128 insertions(+), 23 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
index 8b5d82c..a513627 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
@@ -2856,7 +2856,7 @@ public class ClusterDistributionManager implements
DistributionManager {
}
@Override
- public boolean isCurrentMember(InternalDistributedMember id) {
+ public boolean isCurrentMember(DistributedMember id) {
Set m;
synchronized (this.membersLock) {
// access to members synchronized under membersLock in order to
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java
index 167f8ba..8bb84ae 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java
@@ -306,8 +306,11 @@ public interface DistributionManager extends ReplySender {
/** Returns a set of all roles currently in the distributed system. */
Set getAllRoles();
- /** Returns true if id is a current member of the distributed system */
- boolean isCurrentMember(InternalDistributedMember id);
+ /**
+ * Returns true if id is a current member of the distributed system
+ *
+ */
+ boolean isCurrentMember(DistributedMember id);
/**
* Remove given member from list of members who are pending a startup reply
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
index f74c34d..d6a6cd1 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
@@ -1256,7 +1256,7 @@ public class LonerDistributionManager implements
DistributionManager {
this.getId().setPort(this.lonerPort);
}
- public boolean isCurrentMember(InternalDistributedMember p_id) {
+ public boolean isCurrentMember(DistributedMember p_id) {
return getId().equals(p_id);
}
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
index c766d26..970957f 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
@@ -20,6 +20,7 @@ import java.io.NotSerializableException;
import java.net.InetAddress;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -314,6 +315,10 @@ public class DirectChannel {
if (!directReply && directMsg != null) {
directMsg.registerProcessor();
}
+ if (logger.isDebugEnabled()) {
+ logger.debug("Sending ({}) to {} peers ({}) via tcp/ip",
+ msg, p_destinations.length, Arrays.toString(p_destinations));
+ }
try {
do {
@@ -375,9 +380,9 @@ public class DirectChannel {
}
try {
- if (logger.isDebugEnabled()) {
- logger.debug("{}{}) to {} peers ({}) via tcp/ip",
- (retry ? "Retrying send (" : "Sending ("), msg, cons.size(),
cons);
+ if (retry && logger.isDebugEnabled()) {
+ logger.debug("Retrying send ({}{}) to {} peers ({}) via tcp/ip",
+ msg, cons.size(), cons);
}
DMStats stats = getDMStats();
List<?> sentCons; // used for cons we sent to this time
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/PeerTXStateStub.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/PeerTXStateStub.java
index 6211b6a..670bbb4 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/PeerTXStateStub.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/PeerTXStateStub.java
@@ -14,6 +14,7 @@
*/
package org.apache.geode.internal.cache;
+
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
@@ -131,14 +132,14 @@ public class PeerTXStateStub extends TXStateStub {
}
} catch (Exception e) {
this.getCache().getCancelCriterion().checkCancelInProgress(e);
- if (e.getCause() != null) {
- if (e.getCause() instanceof ForceReattemptException) {
- Throwable e2 = e.getCause();
- if (e2.getCause() != null && e2.getCause() instanceof
PrimaryBucketException) {
+ Throwable eCause = e.getCause();
+ if (eCause != null) {
+ if (eCause instanceof ForceReattemptException) {
+ if (eCause.getCause() instanceof PrimaryBucketException) {
// data rebalanced
TransactionDataRebalancedException tdnce =
- new
TransactionDataRebalancedException(e2.getCause().getMessage());
- tdnce.initCause(e2.getCause());
+ new
TransactionDataRebalancedException(eCause.getCause().getMessage());
+ tdnce.initCause(eCause.getCause());
throw tdnce;
} else {
// We cannot be sure that the member departed starting to process
commit request,
@@ -146,11 +147,11 @@ public class PeerTXStateStub extends TXStateStub {
// fixes 44939
TransactionInDoubtException tdnce =
new TransactionInDoubtException(e.getCause().getMessage());
- tdnce.initCause(e.getCause());
+ tdnce.initCause(eCause);
throw tdnce;
}
}
- throw new TransactionInDoubtException(e.getCause());
+ throw new TransactionInDoubtException(eCause);
} else {
throw new TransactionInDoubtException(e);
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java
index 936e9fb..efa085e 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java
@@ -15,7 +15,11 @@
package org.apache.geode.internal.cache.tier.sockets.command;
import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+import org.apache.geode.CancelException;
+import org.apache.geode.cache.TransactionInDoubtException;
+import org.apache.geode.distributed.DistributedMember;
import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.cache.TXCommitMessage;
@@ -47,6 +51,8 @@ public class CommitCommand extends BaseCommand {
@Override
public void cmdExecute(final Message clientMessage, final ServerConnection
serverConnection,
final SecurityService securityService, long start) throws IOException {
+
+
serverConnection.setAsTrue(REQUIRES_RESPONSE);
TXManagerImpl txMgr = (TXManagerImpl)
serverConnection.getCache().getCacheTransactionManager();
InternalDistributedMember client =
@@ -74,18 +80,27 @@ public class CommitCommand extends BaseCommand {
if (logger.isDebugEnabled()) {
logger.debug("TX: committing client tx: {}", txId);
}
- try {
-
- txId = txProxy.getTxId();
+ commitTransaction(clientMessage, serverConnection, txMgr, wasInProgress,
+ txProxy);
+ }
+ protected void commitTransaction(Message clientMessage, ServerConnection
serverConnection,
+ TXManagerImpl txMgr,
+ boolean wasInProgress, TXStateProxy txProxy) throws IOException {
+ Exception txException = null;
+ TXCommitMessage commitMsg = null;
+ TXId txId = txProxy.getTxId();
+ try {
txProxy.setCommitOnBehalfOfRemoteStub(true);
txMgr.commit();
commitMsg = txProxy.getCommitMessage();
+ logger.debug("Sending commit response to client: {}", commitMsg);
writeCommitResponse(commitMsg, clientMessage, serverConnection);
serverConnection.setAsTrue(RESPONDED);
+
} catch (Exception e) {
- sendException(clientMessage, serverConnection, e);
+ txException = e;
} finally {
if (txId != null) {
txMgr.removeHostedTXState(txId);
@@ -97,10 +112,35 @@ public class CommitCommand extends BaseCommand {
commitMsg.setClientVersion(null); // fixes bug 46529
}
}
+ if (txException != null) {
+ DistributedMember target = txProxy.getTarget();
+ // a TransactionInDoubtException caused by the TX host shutting down
means that
+ // the transaction may still be active and hold locks. We must wait for
the transaction
+ // host to finish shutting down before responding to the client or it
could encounter
+ // conflicts in retrying the transaction
+ if ((txException instanceof TransactionInDoubtException)
+ && (txException.getCause() instanceof CancelException)) {
+ logger.info(
+ "Waiting for departure of {} before throwing
TransactionInDoubtException.",
+ target);
+ try {
+
serverConnection.getCache().getDistributionManager().getMembershipManager()
+ .waitForDeparture(target);
+ } catch (TimeoutException e) {
+ // status will be logged below
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ logger.info("Done waiting. Transaction host {} in the cluster.",
+
serverConnection.getCache().getDistributionManager().isCurrentMember(target)
+ ? "is still"
+ : "is no longer");
+ }
+ sendException(clientMessage, serverConnection, txException);
+ }
}
-
protected static void writeCommitResponse(TXCommitMessage response, Message
origMsg,
ServerConnection servConn) throws IOException {
Message responseMsg = servConn.getResponseMessage();
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommandTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommandTest.java
index 4da081d..08f1425 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommandTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommandTest.java
@@ -14,14 +14,30 @@
*/
package org.apache.geode.internal.cache.tier.sockets.command;
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.net.InetAddress;
+
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.cache.TransactionInDoubtException;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.DistributionManager;
+import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.distributed.internal.membership.MembershipManager;
import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXStateProxy;
import org.apache.geode.internal.cache.tier.sockets.Message;
import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
import org.apache.geode.test.junit.categories.ClientServerTest;
@@ -50,4 +66,45 @@ public class CommitCommandTest {
CommitCommand.writeCommitResponse(null, origMsg, servConn);
}
+
+ /**
+ * GEODE-5269 CommitConflictException after TransactionInDoubtException
+ * CommitCommand needs to stall waiting for the host of a transaction to
+ * finish shutting down before sending a TransactionInDoubtException to
+ * the client.
+ */
+ @Test
+ public void testTransactionInDoubtWaitsForTargetDeparture() throws Exception
{
+ CommitCommand command = (CommitCommand) CommitCommand.getCommand();
+ Message clientMessage = mock(Message.class);
+ ServerConnection serverConnection = mock(ServerConnection.class);
+ TXManagerImpl txMgr = mock(TXManagerImpl.class);
+ TXStateProxy txProxy = mock(TXStateProxy.class);
+ InternalCache cache = mock(InternalCache.class);
+ DistributionManager distributionManager = mock(DistributionManager.class);
+ MembershipManager membershipManager = mock(MembershipManager.class);
+ boolean wasInProgress = false;
+
+ doReturn(cache).when(serverConnection).getCache();
+ doReturn(distributionManager).when(cache).getDistributionManager();
+
doReturn(membershipManager).when(distributionManager).getMembershipManager();
+ doReturn(false).when(distributionManager).isCurrentMember(isA(
+ InternalDistributedMember.class));
+
+
doReturn(mock(Message.class)).when(serverConnection).getErrorResponseMessage();
+
+ doReturn(new InternalDistributedMember(InetAddress.getLocalHost(),
1234)).when(txProxy)
+ .getTarget();
+
+ TransactionInDoubtException transactionInDoubtException =
+ new TransactionInDoubtException("tx in doubt");
+ transactionInDoubtException.initCause(new CacheClosedException("testing"));
+ doThrow(transactionInDoubtException).when(txMgr).commit();
+
+ command.commitTransaction(
+ clientMessage, serverConnection, txMgr, wasInProgress, txProxy);
+
+ verify(txMgr, atLeastOnce()).commit();
+ verify(membershipManager,
times(1)).waitForDeparture(isA(DistributedMember.class));
+ }
}
diff --git
a/geode-core/src/test/java/org/apache/geode/test/dunit/cache/internal/JUnit4CacheTestCase.java
b/geode-core/src/test/java/org/apache/geode/test/dunit/cache/internal/JUnit4CacheTestCase.java
index fa9a6f2..c93f921 100644
---
a/geode-core/src/test/java/org/apache/geode/test/dunit/cache/internal/JUnit4CacheTestCase.java
+++
b/geode-core/src/test/java/org/apache/geode/test/dunit/cache/internal/JUnit4CacheTestCase.java
@@ -41,7 +41,6 @@ import org.apache.geode.cache.TimeoutException;
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.client.PoolManager;
-import org.apache.geode.cache.client.internal.InternalClientCache;
import org.apache.geode.cache30.CacheSerializableRunnable;
import org.apache.geode.distributed.internal.DistributionMessageObserver;
import org.apache.geode.internal.cache.GemFireCacheImpl;
@@ -263,8 +262,8 @@ public abstract class JUnit4CacheTestCase extends
JUnit4DistributedTestCase
}
}
- public final InternalClientCache getClientCache() {
- return (InternalClientCache) cache;
+ public final ClientCache getClientCache() {
+ return (ClientCache) cache;
}
/**
--
To stop receiving notification emails like this one, please contact
[email protected].