This is an automated email from the ASF dual-hosted git repository.
eshu11 pushed a commit to branch feature/GEODE-5376
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-5376 by this
push:
new f77debe GEODE-5376: Move SynchronizationRunnable to TXState
f77debe is described below
commit f77debe7871f3bb43a0d88099ee5189093dc95dc
Author: eshu <[email protected]>
AuthorDate: Tue Jul 10 12:01:51 2018 -0700
GEODE-5376: Move SynchronizationRunnable to TXState
Different threads can handle JTA beforeCompletion and afterCompletion
after client failover.
Use TXStateSynchronizationRunnable to handle this case.
---
.../cache/DistTXStateProxyImplOnCoordinator.java | 6 -
.../internal/cache/PausedTXStateProxyImpl.java | 8 -
.../apache/geode/internal/cache/TXManagerImpl.java | 4 +
.../org/apache/geode/internal/cache/TXState.java | 159 +++++++++---
.../apache/geode/internal/cache/TXStateProxy.java | 9 -
.../geode/internal/cache/TXStateProxyImpl.java | 25 --
...le.java => TXStateSynchronizationRunnable.java} | 22 +-
.../sockets/command/TXSynchronizationCommand.java | 235 +++++++-----------
.../geode/internal/cache/tx/ClientTXStateStub.java | 4 +
...ava => TXStateSynchronizationRunnableTest.java} | 54 ++--
.../apache/geode/internal/cache/TXStateTest.java | 152 ++++++++++++
.../command/TXSynchronizationCommandTest.java | 119 +++++++++
.../ClientServerJTAFailoverDistributedTest.java | 273 +++++++++++++++++++++
13 files changed, 808 insertions(+), 262 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java
index c088c0f..031de9e 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java
@@ -117,9 +117,6 @@ public class DistTXStateProxyImplOnCoordinator extends
DistTXStateProxyImpl {
}
inProgress = preserveTx;
- if (this.synchRunnable != null) {
- this.synchRunnable.abort();
- }
}
}
@@ -281,9 +278,6 @@ public class DistTXStateProxyImplOnCoordinator extends
DistTXStateProxyImpl {
} finally {
inProgress = false;
- if (this.synchRunnable != null) {
- this.synchRunnable.abort();
- }
}
/*
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/PausedTXStateProxyImpl.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/PausedTXStateProxyImpl.java
index a796d5c..2e52551 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/PausedTXStateProxyImpl.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/PausedTXStateProxyImpl.java
@@ -390,14 +390,6 @@ public class PausedTXStateProxyImpl implements
TXStateProxy {
public void setJCATransaction() {}
@Override
- public void setSynchronizationRunnable(TXSynchronizationRunnable sync) {}
-
- @Override
- public TXSynchronizationRunnable getSynchronizationRunnable() {
- return null;
- }
-
- @Override
public void suspend() {}
@Override
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
index c5c7653..639656f 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
@@ -1875,4 +1875,8 @@ public class TXManagerImpl implements
CacheTransactionManager, MembershipListene
return hostedTXStates;
}
+ public boolean isHostedTXStatesEmpty() {
+ return hostedTXStates.isEmpty();
+ }
+
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
index 9768fb8..6b2ca35 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
@@ -24,23 +24,28 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Executor;
import java.util.concurrent.locks.ReentrantLock;
import javax.transaction.Status;
+import edu.umd.cs.findbugs.annotations.SuppressWarnings;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
+import org.apache.geode.InternalGemFireError;
import org.apache.geode.SystemFailure;
import org.apache.geode.cache.CommitConflictException;
import org.apache.geode.cache.DiskAccessException;
import org.apache.geode.cache.EntryNotFoundException;
+import org.apache.geode.cache.FailedSynchronizationException;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.Region.Entry;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.SynchronizationCommitConflictException;
import org.apache.geode.cache.TransactionDataRebalancedException;
+import org.apache.geode.cache.TransactionException;
import org.apache.geode.cache.TransactionId;
import org.apache.geode.cache.TransactionWriter;
import org.apache.geode.cache.TransactionWriterException;
@@ -48,6 +53,7 @@ import
org.apache.geode.cache.UnsupportedOperationInTransactionException;
import org.apache.geode.cache.client.internal.ServerRegionDataAccess;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.TXManagerCancelledException;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.cache.control.MemoryThresholds;
@@ -95,6 +101,15 @@ public class TXState implements TXStateInterface {
*/
private int modSerialNum;
private final List<EntryEventImpl> pendingCallbacks = new
ArrayList<EntryEventImpl>();
+ /**
+ * for client/server JTA transactions we need to have a single thread handle
both beforeCompletion
+ * and afterCompletion so that beforeCompletion can obtain locks for the
afterCompletion step.
+ * This is that thread
+ */
+ protected volatile TXStateSynchronizationRunnable syncRunnable;
+
+ private volatile SynchronizationCommitConflictException
beforeCompletionException;
+ private volatile RuntimeException afterCompletionException;
// Internal testing hooks
private Runnable internalAfterReservation;
@@ -901,6 +916,9 @@ public class TXState implements TXStateInterface {
synchronized (this.completionGuard) {
this.completionGuard.notifyAll();
}
+ if (this.syncRunnable != null) {
+ this.syncRunnable.abort();
+ }
if (iae != null && !this.proxy.getCache().isClosed()) {
throw iae;
}
@@ -1000,11 +1018,45 @@ public class TXState implements TXStateInterface {
if (this.closed) {
throw new TXManagerCancelledException();
}
- this.proxy.getTxMgr().setTXState(null);
- final long opStart = CachePerfStats.getStatTime();
- this.jtaLifeTime = opStart - getBeginTime();
+ TXStateSynchronizationRunnable sync =
createTxStateSynchronizationRunnable();
+ setSynchronizationRunnable(sync);
+
+ Executor exec = getExecutor();
+ exec.execute(sync);
+ sync.waitForFirstExecution();
+ if (getBeforeCompletionException() != null) {
+ throw getBeforeCompletionException();
+ }
+ }
+ TXStateSynchronizationRunnable createTxStateSynchronizationRunnable() {
+ Runnable beforeCompletion = new Runnable() {
+ @SuppressWarnings("synthetic-access")
+ public void run() {
+ doBeforeCompletion();
+ }
+ };
+ return new TXStateSynchronizationRunnable(getCache().getCancelCriterion(),
+ beforeCompletion);
+ }
+
+ Executor getExecutor() {
+ return
InternalDistributedSystem.getConnectedInstance().getDistributionManager()
+ .getWaitingThreadPool();
+ }
+
+ SynchronizationCommitConflictException getBeforeCompletionException() {
+ return beforeCompletionException;
+ }
+
+ private void setSynchronizationRunnable(TXStateSynchronizationRunnable
synchronizationRunnable) {
+ syncRunnable = synchronizationRunnable;
+ }
+
+ void doBeforeCompletion() {
+ final long opStart = CachePerfStats.getStatTime();
+ this.jtaLifeTime = opStart - getBeginTime();
try {
reserveAndCheck();
/*
@@ -1042,8 +1094,8 @@ public class TXState implements TXStateInterface {
}
} catch (CommitConflictException commitConflict) {
cleanup();
- this.proxy.getTxMgr().noteCommitFailure(opStart, this.jtaLifeTime, this);
- throw new SynchronizationCommitConflictException(
+ proxy.getTxMgr().noteCommitFailure(opStart, this.jtaLifeTime, this);
+ beforeCompletionException = new SynchronizationCommitConflictException(
LocalizedStrings.TXState_CONFLICT_DETECTED_IN_GEMFIRE_TRANSACTION_0
.toLocalizedString(getTransactionId()),
commitConflict);
@@ -1057,40 +1109,79 @@ public class TXState implements TXStateInterface {
*/
@Override
public void afterCompletion(int status) {
- // System.err.println("start afterCompletion");
+ this.proxy.getTxMgr().setTXState(null);
+
+ Runnable afterCompletion = new Runnable() {
+ @SuppressWarnings("synthetic-access")
+ public void run() {
+ doAfterCompletion(status);
+ }
+ };
+ // if there was a beforeCompletion call then there will be a thread
+ // sitting in the waiting pool to execute afterCompletion. Otherwise
+ // throw FailedSynchronizationException().
+ TXStateSynchronizationRunnable sync = getSynchronizationRunnable();
+ if (sync != null) {
+ sync.runSecondRunnable(afterCompletion);
+ if (getAfterCompletionException() != null) {
+ throw getAfterCompletionException();
+ }
+ } else {
+ // rollback does not run beforeCompletion.
+ if (status != Status.STATUS_ROLLEDBACK) {
+ throw new FailedSynchronizationException(
+ "Could not execute afterCompletion when beforeCompletion was not
executed");
+ }
+ doAfterCompletion(status);
+ }
+ }
+
+ TXStateSynchronizationRunnable getSynchronizationRunnable() {
+ return this.syncRunnable;
+ }
+
+ RuntimeException getAfterCompletionException() {
+ return afterCompletionException;
+ }
+
+ void doAfterCompletion(int status) {
final long opStart = CachePerfStats.getStatTime();
- switch (status) {
- case Status.STATUS_COMMITTED:
- // System.err.println("begin commit in afterCompletion");
- Assert.assertTrue(this.locks != null,
- "Gemfire Transaction afterCompletion called with illegal state.");
- try {
- proxy.getTxMgr().setTXState(null);
- commit();
- saveTXCommitMessageForClientFailover();
- } catch (CommitConflictException error) {
- Assert.assertTrue(false, "Gemfire Transaction " + getTransactionId()
- + " afterCompletion failed.due to CommitConflictException: " +
error);
- }
+ try {
+ switch (status) {
+ case Status.STATUS_COMMITTED:
+ Assert.assertTrue(this.locks != null,
+ "Gemfire Transaction afterCompletion called with illegal
state.");
+ try {
+ commit();
+ saveTXCommitMessageForClientFailover();
+ } catch (CommitConflictException error) {
+ Assert.assertTrue(false, "Gemfire Transaction " +
getTransactionId()
+ + " afterCompletion failed.due to CommitConflictException: " +
error);
+ }
- this.proxy.getTxMgr().noteCommitSuccess(opStart, this.jtaLifeTime,
this);
- this.locks = null;
- // System.err.println("end commit in afterCompletion");
- break;
- case Status.STATUS_ROLLEDBACK:
- this.jtaLifeTime = opStart - getBeginTime();
- this.proxy.getTxMgr().setTXState(null);
- rollback();
- saveTXCommitMessageForClientFailover();
- this.proxy.getTxMgr().noteRollbackSuccess(opStart, this.jtaLifeTime,
this);
- break;
- default:
- Assert.assertTrue(false, "Unknown JTA Synchronization status " +
status);
+ this.proxy.getTxMgr().noteCommitSuccess(opStart, this.jtaLifeTime,
this);
+ this.locks = null;
+ break;
+ case Status.STATUS_ROLLEDBACK:
+ this.jtaLifeTime = opStart - getBeginTime();
+ rollback();
+ saveTXCommitMessageForClientFailover();
+ this.proxy.getTxMgr().noteRollbackSuccess(opStart, this.jtaLifeTime,
this);
+ break;
+ default:
+ Assert.assertTrue(false, "Unknown JTA Synchronization status " +
status);
+ }
+ } catch (RuntimeException exception) {
+ LogService.getLogger().info("got exception " + exception);
+ afterCompletionException = exception;
+ } catch (InternalGemFireError error) {
+ TransactionException exception = new TransactionException(error);
+ afterCompletionException = exception;
}
- // System.err.println("end afterCompletion");
+
}
- private void saveTXCommitMessageForClientFailover() {
+ void saveTXCommitMessageForClientFailover() {
proxy.getTxMgr().saveTXStateForClientFailover(proxy);
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxy.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxy.java
index f037a02..7a79914 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxy.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxy.java
@@ -54,15 +54,6 @@ public interface TXStateProxy extends TXStateInterface {
void setJCATransaction();
/**
- * establishes the synchronization thread used for client/server
beforeCompletion/afterCompletion
- * processing
- *
- */
- void setSynchronizationRunnable(TXSynchronizationRunnable sync);
-
- TXSynchronizationRunnable getSynchronizationRunnable();
-
- /**
* Perform additional tasks required by the proxy to suspend a transaction
*/
void suspend();
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java
index f9aa2d4..00f15c3 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java
@@ -57,12 +57,6 @@ public class TXStateProxyImpl implements TXStateProxy {
private boolean commitRequestedByOwner;
private boolean isJCATransaction;
- /**
- * for client/server JTA transactions we need to have a single thread handle
both beforeCompletion
- * and afterCompletion so that beforeC can obtain locks for the afterC step.
This is that thread
- */
- protected volatile TXSynchronizationRunnable synchRunnable;
-
private final ReentrantLock lock = new ReentrantLock();
/** number of operations in this transaction */
@@ -99,16 +93,6 @@ public class TXStateProxyImpl implements TXStateProxy {
}
@Override
- public void setSynchronizationRunnable(TXSynchronizationRunnable synch) {
- this.synchRunnable = synch;
- }
-
- @Override
- public TXSynchronizationRunnable getSynchronizationRunnable() {
- return this.synchRunnable;
- }
-
- @Override
public ReentrantLock getLock() {
return this.lock;
}
@@ -230,9 +214,6 @@ public class TXStateProxyImpl implements TXStateProxy {
throw e;
} finally {
inProgress = preserveTx;
- if (this.synchRunnable != null) {
- this.synchRunnable.abort();
- }
}
}
@@ -410,9 +391,6 @@ public class TXStateProxyImpl implements TXStateProxy {
getRealDeal(null, null).rollback();
} finally {
inProgress = false;
- if (this.synchRunnable != null) {
- this.synchRunnable.abort();
- }
}
}
@@ -472,9 +450,6 @@ public class TXStateProxyImpl implements TXStateProxy {
getRealDeal(null, null).afterCompletion(status);
} finally {
this.inProgress = false;
- if (this.synchRunnable != null) {
- this.synchRunnable.abort();
- }
}
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXSynchronizationRunnable.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java
similarity index 86%
rename from
geode-core/src/main/java/org/apache/geode/internal/cache/TXSynchronizationRunnable.java
rename to
geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java
index 4603d93..415bcdd 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXSynchronizationRunnable.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java
@@ -17,22 +17,20 @@ package org.apache.geode.internal.cache;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelCriterion;
-import org.apache.geode.internal.cache.tier.sockets.CommBufferPool;
import org.apache.geode.internal.logging.LogService;
/**
- * TXSynchronizationThread manages beforeCompletion and afterCompletion calls
on behalf of a client
- * cache. The thread should be instantiated with a Runnable that invokes
beforeCompletion behavior.
+ * TXSynchronizationThread manages beforeCompletion and afterCompletion calls
in TXState.
+ * The thread should be instantiated with a Runnable that invokes
beforeCompletion behavior.
* Then you must invoke runSecondRunnable() with another Runnable that invokes
afterCompletion
* behavior.
*
- * @since GemFire 6.6
+ * @since Geode 1.8.0
*/
-public class TXSynchronizationRunnable implements Runnable {
+public class TXStateSynchronizationRunnable implements Runnable {
private static final Logger logger = LogService.getLogger();
private final CancelCriterion cancelCriterion;
- private final CommBufferPool commBufferPool;
private Runnable firstRunnable;
private final Object firstRunnableSync = new Object();
@@ -44,21 +42,15 @@ public class TXSynchronizationRunnable implements Runnable {
private boolean abort;
- public TXSynchronizationRunnable(final CancelCriterion cancelCriterion,
- final CommBufferPool commBufferPool, final Runnable beforeCompletion) {
+ public TXStateSynchronizationRunnable(final CancelCriterion cancelCriterion,
+ final Runnable beforeCompletion) {
this.cancelCriterion = cancelCriterion;
- this.commBufferPool = commBufferPool;
this.firstRunnable = beforeCompletion;
}
@Override
public void run() {
- commBufferPool.setTLCommBuffer();
- try {
- doSynchronizationOps();
- } finally {
- commBufferPool.releaseTLCommBuffer();
- }
+ doSynchronizationOps();
}
private void doSynchronizationOps() {
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
index fd9c17f..037702a 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
@@ -16,19 +16,14 @@
package org.apache.geode.internal.cache.tier.sockets.command;
import java.io.IOException;
-import java.util.concurrent.Executor;
-
-import javax.transaction.Status;
import
org.apache.geode.cache.client.internal.TXSynchronizationOp.CompletionType;
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.ReplyException;
import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.TXCommitMessage;
import org.apache.geode.internal.cache.TXId;
import org.apache.geode.internal.cache.TXManagerImpl;
import org.apache.geode.internal.cache.TXStateProxy;
-import org.apache.geode.internal.cache.TXSynchronizationRunnable;
import org.apache.geode.internal.cache.tier.Command;
import org.apache.geode.internal.cache.tier.MessageType;
import org.apache.geode.internal.cache.tier.sockets.BaseCommand;
@@ -50,21 +45,6 @@ public class TXSynchronizationCommand extends BaseCommand {
* (non-Javadoc)
*
* @see
- *
org.apache.geode.internal.cache.tier.sockets.BaseCommand#shouldMasqueradeForTx(org.apache.geode
- * .internal.cache.tier.sockets.Message,
- * org.apache.geode.internal.cache.tier.sockets.ServerConnection)
- */
- @Override
- protected boolean shouldMasqueradeForTx(Message clientMessage,
- ServerConnection serverConnection) {
- // masquerading is done in the waiting thread pool
- return false;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
*
org.apache.geode.internal.cache.tier.sockets.BaseCommand#cmdExecute(org.apache.geode.internal.
* cache.tier.sockets.Message,
org.apache.geode.internal.cache.tier.sockets.ServerConnection,
* long)
@@ -86,20 +66,18 @@ public class TXSynchronizationCommand extends BaseCommand {
statusPart = null;
}
- final TXManagerImpl txMgr =
- (TXManagerImpl)
serverConnection.getCache().getCacheTransactionManager();
- final InternalDistributedMember member =
- (InternalDistributedMember)
serverConnection.getProxyID().getDistributedMember();
+ final TXManagerImpl txMgr = getTXManager(serverConnection);
+ final InternalDistributedMember member =
getDistributedMember(serverConnection);
- // get the tx state without associating it with this thread. That's done
later
- final TXStateProxy txProxy = txMgr.masqueradeAs(clientMessage, member,
true);
+ final TXStateProxy txProxy = txMgr.getTXState();
+ assert txProxy != null;
final TXId txId = txProxy.getTxId();
TXCommitMessage commitMessage = txMgr.getRecentlyCompletedMessage(txId);
if (commitMessage != null && commitMessage !=
TXCommitMessage.ROLLBACK_MSG) {
assert type == CompletionType.AFTER_COMPLETION;
try {
- CommitCommand.writeCommitResponse(commitMessage, clientMessage,
serverConnection);
+ writeCommitResponse(clientMessage, serverConnection, commitMessage);
} catch (IOException e) {
if (isDebugEnabled) {
logger.debug("Problem writing reply to client", e);
@@ -117,137 +95,92 @@ public class TXSynchronizationCommand extends BaseCommand
{
return;
}
- // we have to run beforeCompletion and afterCompletion in the same thread
- // because beforeCompletion obtains locks for the thread and
afterCompletion
- // releases them
- if (txProxy != null) {
- try {
- if (type == CompletionType.BEFORE_COMPLETION) {
- Runnable beforeCompletion = new Runnable() {
- @SuppressWarnings("synthetic-access")
- public void run() {
- TXStateProxy txState = null;
- Throwable failureException = null;
- try {
- txState = txMgr.masqueradeAs(clientMessage, member, false);
- if (isDebugEnabled) {
- logger.debug("Executing beforeCompletion() notification for
transaction {}",
- clientMessage.getTransactionId());
- }
- txState.setIsJTA(true);
- txState.beforeCompletion();
- try {
- writeReply(clientMessage, serverConnection);
- } catch (IOException e) {
- if (isDebugEnabled) {
- logger.debug("Problem writing reply to client", e);
- }
- }
- serverConnection.setAsTrue(RESPONDED);
- } catch (ReplyException e) {
- failureException = e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } catch (Exception e) {
- failureException = e;
- } finally {
- txMgr.unmasquerade(txState);
- }
- if (failureException != null) {
- try {
- writeException(clientMessage, failureException, false,
serverConnection);
- } catch (IOException ioe) {
- if (isDebugEnabled) {
- logger.debug("Problem writing reply to client", ioe);
- }
- }
- serverConnection.setAsTrue(RESPONDED);
- }
+ try {
+ if (type == CompletionType.BEFORE_COMPLETION) {
+ if (isDebugEnabled) {
+ logger.debug("Executing beforeCompletion() notification for
transaction {}",
+ clientMessage.getTransactionId());
+ }
+ Throwable failureException = null;
+ try {
+ txProxy.setIsJTA(true);
+ txProxy.beforeCompletion();
+ try {
+ writeReply(clientMessage, serverConnection);
+ } catch (IOException e) {
+ if (isDebugEnabled) {
+ logger.debug("Problem writing reply to client", e);
+ }
+ }
+ serverConnection.setAsTrue(RESPONDED);
+ } catch (ReplyException e) {
+ failureException = e.getCause();
+ } catch (Exception e) {
+ failureException = e;
+ }
+ if (failureException != null) {
+ try {
+ writeException(clientMessage, failureException, false,
serverConnection);
+ } catch (IOException ioe) {
+ if (isDebugEnabled) {
+ logger.debug("Problem writing reply to client", ioe);
}
- };
- TXSynchronizationRunnable sync =
- new
TXSynchronizationRunnable(serverConnection.getCache().getCancelCriterion(),
- serverConnection.getAcceptor(), beforeCompletion);
- txProxy.setSynchronizationRunnable(sync);
- Executor exec =
InternalDistributedSystem.getConnectedInstance().getDistributionManager()
- .getWaitingThreadPool();
- exec.execute(sync);
- sync.waitForFirstExecution();
- } else {
- Runnable afterCompletion = new Runnable() {
- @SuppressWarnings("synthetic-access")
- public void run() {
- TXStateProxy txState = null;
- try {
- txState = txMgr.masqueradeAs(clientMessage, member, false);
- int status = statusPart.getInt();
- if (isDebugEnabled) {
- logger.debug("Executing afterCompletion({}) notification for
transaction {}",
- status, clientMessage.getTransactionId());
- }
- txState.setIsJTA(true);
- txState.afterCompletion(status);
- // GemFire commits during afterCompletion - send the commit
info back to the client
- // where it can be applied to the local cache
- TXCommitMessage cmsg = txState.getCommitMessage();
- try {
- CommitCommand.writeCommitResponse(cmsg, clientMessage,
serverConnection);
- txMgr.removeHostedTXState(txState.getTxId());
- } catch (IOException e) {
- // not much can be done here
- if (isDebugEnabled || (e instanceof
MessageTooLargeException)) {
- logger.warn("Problem writing reply to client", e);
- }
- }
- serverConnection.setAsTrue(RESPONDED);
- } catch (RuntimeException e) {
- try {
- writeException(clientMessage, e, false, serverConnection);
- } catch (IOException ioe) {
- if (isDebugEnabled) {
- logger.debug("Problem writing reply to client", ioe);
- }
- }
- serverConnection.setAsTrue(RESPONDED);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } finally {
- txMgr.unmasquerade(txState);
- }
+ }
+ serverConnection.setAsTrue(RESPONDED);
+ }
+ } else {
+ try {
+ int status = statusPart.getInt();
+ if (isDebugEnabled) {
+ logger.debug("Executing afterCompletion({}) notification for
transaction {}",
+ status, clientMessage.getTransactionId());
+ }
+ txProxy.setIsJTA(true);
+ txProxy.setCommitOnBehalfOfRemoteStub(true);
+ txProxy.afterCompletion(status);
+ // GemFire commits during afterCompletion - send the commit info
back to the client
+ // where it can be applied to the local cache
+ TXCommitMessage cmsg = txProxy.getCommitMessage();
+ try {
+ writeCommitResponse(clientMessage, serverConnection, cmsg);
+ txMgr.removeHostedTXState(txProxy.getTxId());
+ } catch (IOException e) {
+ // not much can be done here
+ if (isDebugEnabled || (e instanceof MessageTooLargeException)) {
+ logger.warn("Problem writing reply to client", e);
}
- };
- // if there was a beforeCompletion call then there will be a thread
- // sitting in the waiting pool to execute afterCompletion. Otherwise
- // we have failed-over and may need to do beforeCompletion & hope
that it works
- TXSynchronizationRunnable sync =
txProxy.getSynchronizationRunnable();
- if (sync != null) {
- sync.runSecondRunnable(afterCompletion);
- } else {
- if (statusPart.getInt() == Status.STATUS_COMMITTED) {
- TXStateProxy txState = txMgr.masqueradeAs(clientMessage, member,
false);
- try {
- if (isDebugEnabled) {
- logger.debug(
- "Executing beforeCompletion() notification for
transaction {} after failover",
- clientMessage.getTransactionId());
- }
- txState.setIsJTA(true);
- txState.beforeCompletion();
- } finally {
- txMgr.unmasquerade(txState);
- }
+ }
+ serverConnection.setAsTrue(RESPONDED);
+ } catch (RuntimeException e) {
+ try {
+ writeException(clientMessage, e, false, serverConnection);
+ } catch (IOException ioe) {
+ if (isDebugEnabled) {
+ logger.debug("Problem writing reply to client", ioe);
}
- afterCompletion.run();
}
+ serverConnection.setAsTrue(RESPONDED);
}
- } catch (Exception e) {
- writeException(clientMessage, MessageType.EXCEPTION, e, false,
serverConnection);
- serverConnection.setAsTrue(RESPONDED);
- }
- if (isDebugEnabled) {
- logger.debug("Sent tx synchronization response");
}
+ } catch (Exception e) {
+ writeException(clientMessage, MessageType.EXCEPTION, e, false,
serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
+ }
+ if (isDebugEnabled) {
+ logger.debug("Sent tx synchronization response");
}
}
+ void writeCommitResponse(Message clientMessage, ServerConnection
serverConnection,
+ TXCommitMessage commitMessage) throws IOException {
+ CommitCommand.writeCommitResponse(commitMessage, clientMessage,
serverConnection);
+ }
+
+ InternalDistributedMember getDistributedMember(ServerConnection
serverConnection) {
+ return (InternalDistributedMember)
serverConnection.getProxyID().getDistributedMember();
+ }
+
+ TXManagerImpl getTXManager(ServerConnection serverConnection) {
+ return (TXManagerImpl)
serverConnection.getCache().getCacheTransactionManager();
+ }
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java
index c97d9f0..4469b71 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java
@@ -322,4 +322,8 @@ public class ClientTXStateStub extends TXStateStub {
public void setAfterLocalLocks(Runnable afterLocalLocks) {
this.internalAfterLocalLocks = afterLocalLocks;
}
+
+ public ServerLocation getServerAffinityLocation() {
+ return serverAffinityLocation;
+ }
}
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/TXSynchronizationRunnableTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnableTest.java
similarity index 52%
rename from
geode-core/src/test/java/org/apache/geode/internal/cache/TXSynchronizationRunnableTest.java
rename to
geode-core/src/test/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnableTest.java
index a6ba2f5..05b811b 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/TXSynchronizationRunnableTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnableTest.java
@@ -18,6 +18,9 @@ import static
org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import org.junit.Before;
import org.junit.Test;
@@ -25,40 +28,63 @@ import org.junit.experimental.categories.Category;
import org.apache.geode.CancelCriterion;
import org.apache.geode.cache.CacheClosedException;
-import org.apache.geode.internal.cache.tier.sockets.CommBufferPool;
import org.apache.geode.test.junit.categories.UnitTest;
@Category(UnitTest.class)
-public class TXSynchronizationRunnableTest {
-
+public class TXStateSynchronizationRunnableTest {
private CancelCriterion cancelCriterion;
- private CommBufferPool commBufferPool;
private Runnable beforeCompletion;
+ private Runnable afterCompletion;
private CacheClosedException exception;
-
@Before
public void setUp() {
exception = new CacheClosedException();
cancelCriterion = mock(CancelCriterion.class);
- commBufferPool = mock(CommBufferPool.class);
beforeCompletion = mock(Runnable.class);
+ afterCompletion = mock(Runnable.class);
+ }
+ @Test
+ public void waitForFirstExecutionThrowsExceptionIfCacheClosed() {
doThrow(exception).when(cancelCriterion).checkCancelInProgress(any());
+ TXStateSynchronizationRunnable runnable =
+ new TXStateSynchronizationRunnable(cancelCriterion, beforeCompletion);
+ assertThatThrownBy(() ->
runnable.waitForFirstExecution()).isSameAs(exception);
}
@Test
- public void test() {
- TXSynchronizationRunnable runnable =
- new TXSynchronizationRunnable(cancelCriterion, commBufferPool,
beforeCompletion);
- assertThatThrownBy(() ->
runnable.waitForFirstExecution()).isSameAs(exception);
+ public void runSecondRunnableThrowsExceptionIfCacheClosed() {
+ doThrow(exception).when(cancelCriterion).checkCancelInProgress(any());
+ TXStateSynchronizationRunnable runnable =
+ new TXStateSynchronizationRunnable(cancelCriterion, beforeCompletion);
+ assertThatThrownBy(() ->
runnable.runSecondRunnable(afterCompletion)).isSameAs(exception);
}
@Test
- public void test1() {
- TXSynchronizationRunnable runnable =
- new TXSynchronizationRunnable(cancelCriterion, commBufferPool,
beforeCompletion);
- assertThatThrownBy(() ->
runnable.runSecondRunnable(mock(Runnable.class))).isSameAs(exception);
+ public void doSynchronizationOpsWaitsUntilRunSecondRunnable() {
+ TXStateSynchronizationRunnable runnable =
+ new TXStateSynchronizationRunnable(cancelCriterion, beforeCompletion);
+ new Thread(() -> {
+ runnable.runSecondRunnable(afterCompletion);
+ }).start();
+ runnable.run();
+ verify(beforeCompletion, times(1)).run();
+ verify(afterCompletion, times(1)).run();
}
+
+ @Test
+ public void doSynchronizationOpsDoesNotRunSecondRunnableIfAborted() {
+ TXStateSynchronizationRunnable runnable =
+ new TXStateSynchronizationRunnable(cancelCriterion, beforeCompletion);
+ runnable.abort();
+ new Thread(() -> {
+ runnable.runSecondRunnable(afterCompletion);
+ }).start();
+ runnable.run();
+ verify(beforeCompletion, times(1)).run();
+ verify(afterCompletion, never()).run();
+ }
+
}
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
new file mode 100644
index 0000000..0dce944
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.Executor;
+
+import javax.transaction.Status;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.InternalGemFireError;
+import org.apache.geode.cache.CommitConflictException;
+import org.apache.geode.cache.SynchronizationCommitConflictException;
+import org.apache.geode.cache.TransactionDataNodeHasDepartedException;
+import org.apache.geode.cache.TransactionException;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class TXStateTest {
+ private CancelCriterion cancelCriterion;
+
+ private TXStateProxyImpl txStateProxy;
+ private CommitConflictException exception;
+ private TXStateSynchronizationRunnable txStateSynch;
+ private SynchronizationCommitConflictException
synchronizationCommitConflictException;
+ private RuntimeException runtimeException;
+ private TransactionDataNodeHasDepartedException
transactionDataNodeHasDepartedException;
+
+ @Before
+ public void setup() {
+ txStateProxy = mock(TXStateProxyImpl.class);
+
+ cancelCriterion = mock(CancelCriterion.class);
+ exception = new CommitConflictException("");
+ txStateSynch = mock(TXStateSynchronizationRunnable.class);
+ synchronizationCommitConflictException = new
SynchronizationCommitConflictException("");
+ runtimeException = new RuntimeException();
+ transactionDataNodeHasDepartedException = new
TransactionDataNodeHasDepartedException("");
+
+ when(txStateProxy.getTxMgr()).thenReturn(mock(TXManagerImpl.class));
+ }
+
+ @Test
+ public void
beforeCompletionThrowsSynchronizationCommitConflictExceptionIfBeforeCompletionExceptionIsSet()
{
+ TXState txState = spy(new TXState(txStateProxy, true));
+
+ doReturn(mock(Executor.class)).when(txState).getExecutor();
+
doReturn(txStateSynch).when(txState).createTxStateSynchronizationRunnable();
+
doReturn(synchronizationCommitConflictException).when(txState).getBeforeCompletionException();
+
+ assertThatThrownBy(() -> txState.beforeCompletion())
+ .isSameAs(synchronizationCommitConflictException);
+ }
+
+ @Test
+ public void
beforeCompletionExceptionIsSetWhenDoBeforeCompletionCouldNotLockKeys() {
+ TXState txState = spy(new TXState(txStateProxy, true));
+ doThrow(exception).when(txState).reserveAndCheck();
+
+ txState.doBeforeCompletion();
+ assertThat(txState.getBeforeCompletionException())
+ .isInstanceOf(SynchronizationCommitConflictException.class);
+ }
+
+
+ @Test
+ public void afterCompletionThrowsExceptionIfAfterCompletionExceptionIsSet() {
+ TXState txState = spy(new TXState(txStateProxy, true));
+
+ doReturn(txStateSynch).when(txState).getSynchronizationRunnable();
+ doReturn(runtimeException).when(txState).getAfterCompletionException();
+
+ assertThatThrownBy(() -> txState.afterCompletion(Status.STATUS_COMMITTED))
+ .isSameAs(runtimeException);
+ }
+
+ @Test
+ public void
afterCompletionExceptionIsSetWhenCommitFailedWithTransactionDataNodeHasDepartedException()
{
+ TXState txState = spy(new TXState(txStateProxy, true));
+ doReturn(mock(InternalCache.class)).when(txState).getCache();
+ txState.reserveAndCheck();
+ doThrow(transactionDataNodeHasDepartedException).when(txState).commit();
+
+ txState.doAfterCompletion(Status.STATUS_COMMITTED);
+ assertThat(txState.getAfterCompletionException())
+ .isSameAs(transactionDataNodeHasDepartedException);
+ }
+
+ @Test
+ public void
afterCompletionExceptionIsSetToTransactionExceptionWhenCommitFailedWithCommitConflictException()
{
+ TXState txState = spy(new TXState(txStateProxy, true));
+ doReturn(mock(InternalCache.class)).when(txState).getCache();
+ txState.reserveAndCheck();
+ doThrow(exception).when(txState).commit();
+
+ txState.doAfterCompletion(Status.STATUS_COMMITTED);
+
+
assertThat(txState.getAfterCompletionException()).isInstanceOf(TransactionException.class);
+ TransactionException transactionException =
+ (TransactionException) txState.getAfterCompletionException();
+
assertThat(transactionException.getCause()).isInstanceOf(InternalGemFireError.class);
+ }
+
+
+ @Test
+ public void afterCompletionCanCommitJTA() {
+ TXState txState = spy(new TXState(txStateProxy, false));
+ doReturn(mock(InternalCache.class)).when(txState).getCache();
+ txState.reserveAndCheck();
+ txState.closed = true;
+ txState.doAfterCompletion(Status.STATUS_COMMITTED);
+
+ assertThat(txState.locks).isNull();
+ verify(txState, times(1)).saveTXCommitMessageForClientFailover();
+ }
+
+ @Test
+ public void afterCompletionCanRollbackJTA() {
+ TXState txState = spy(new TXState(txStateProxy, true));
+ txState.afterCompletion(Status.STATUS_ROLLEDBACK);
+
+ verify(txState, times(1)).rollback();
+ verify(txState, times(1)).saveTXCommitMessageForClientFailover();
+ }
+
+}
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommandTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommandTest.java
new file mode 100644
index 0000000..adac845
--- /dev/null
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommandTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets.command;
+
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import javax.transaction.Status;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.client.internal.TXSynchronizationOp;
+import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.TXCommitMessage;
+import org.apache.geode.internal.cache.TXId;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXStateProxyImpl;
+import org.apache.geode.internal.cache.tier.Command;
+import org.apache.geode.internal.cache.tier.sockets.Message;
+import org.apache.geode.internal.cache.tier.sockets.Part;
+import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class TXSynchronizationCommandTest {
+ private Message clientMessage;
+ private ServerConnection serverConnection;
+ private TXManagerImpl txManager;
+ private TXStateProxyImpl txStateProxy;
+ private TXId txId;
+ private TXCommitMessage txCommitMessage;
+ private InternalDistributedMember member;
+ private Part part0;
+ private Part part1;
+ private Part part2;
+ private RuntimeException exception;
+ private TXSynchronizationCommand command;
+
+ @Before
+ public void setup() {
+ clientMessage = mock(Message.class);
+ serverConnection = mock(ServerConnection.class);
+ txManager = mock(TXManagerImpl.class);
+ member = mock(InternalDistributedMember.class);
+ txStateProxy = mock(TXStateProxyImpl.class);
+ txId = mock(TXId.class);
+ txCommitMessage = mock(TXCommitMessage.class);
+ part0 = mock(Part.class);
+ part1 = mock(Part.class);
+ part2 = mock(Part.class);
+ exception = new RuntimeException();
+ command = mock(TXSynchronizationCommand.class);
+
+ when(clientMessage.getPart(0)).thenReturn(part0);
+ when(clientMessage.getPart(1)).thenReturn(part1);
+ when(clientMessage.getPart(2)).thenReturn(part2);
+ doReturn(txManager).when(command).getTXManager(serverConnection);
+ doReturn(member).when(command).getDistributedMember(serverConnection);
+ when(txManager.getTXState()).thenReturn(txStateProxy);
+ when(txStateProxy.getTxId()).thenReturn(txId);
+ }
+
+ @Test
+ public void commandCanSendBackCommitMessageIfAlreadyCommitted() throws
Exception {
+
when(part0.getInt()).thenReturn(TXSynchronizationOp.CompletionType.AFTER_COMPLETION.ordinal());
+
when(txManager.getRecentlyCompletedMessage(txId)).thenReturn(txCommitMessage);
+ doNothing().when(command).writeCommitResponse(clientMessage,
serverConnection, txCommitMessage);
+
+ doCallRealMethod().when(command).cmdExecute(clientMessage,
serverConnection, null, 1);
+ command.cmdExecute(clientMessage, serverConnection, null, 1);
+
+ verify(command, times(1)).writeCommitResponse(clientMessage,
serverConnection, txCommitMessage);
+ verify(serverConnection, times(1)).setAsTrue(Command.RESPONDED);
+ }
+
+ @Test
+ public void commandCanInvokeBeforeCompletion() throws Exception {
+
when(part0.getInt()).thenReturn(TXSynchronizationOp.CompletionType.BEFORE_COMPLETION.ordinal());
+
+ doCallRealMethod().when(command).cmdExecute(clientMessage,
serverConnection, null, 1);
+ command.cmdExecute(clientMessage, serverConnection, null, 1);
+
+ verify(txStateProxy, times(1)).beforeCompletion();
+ verify(serverConnection, times(1)).setAsTrue(Command.RESPONDED);
+ }
+
+ @Test
+ public void commandCanSendBackCommitMessageAfterInvokeAfterCompletion()
throws Exception {
+
when(part0.getInt()).thenReturn(TXSynchronizationOp.CompletionType.AFTER_COMPLETION.ordinal());
+ when(part2.getInt()).thenReturn(Status.STATUS_COMMITTED);
+ when(txStateProxy.getCommitMessage()).thenReturn(txCommitMessage);
+
+ doCallRealMethod().when(command).cmdExecute(clientMessage,
serverConnection, null, 1);
+ command.cmdExecute(clientMessage, serverConnection, null, 1);
+
+ verify(txStateProxy, times(1)).afterCompletion(Status.STATUS_COMMITTED);
+ verify(command, times(1)).writeCommitResponse(clientMessage,
serverConnection, txCommitMessage);
+ verify(serverConnection, times(1)).setAsTrue(Command.RESPONDED);
+ }
+}
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/jta/functional/ClientServerJTAFailoverDistributedTest.java
b/geode-core/src/test/java/org/apache/geode/internal/jta/functional/ClientServerJTAFailoverDistributedTest.java
new file mode 100644
index 0000000..43ab209
--- /dev/null
+++
b/geode-core/src/test/java/org/apache/geode/internal/jta/functional/ClientServerJTAFailoverDistributedTest.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.jta.functional;
+
+import static org.apache.geode.test.dunit.VM.getHostName;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.junit.Assert.assertEquals;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+
+import javax.transaction.Status;
+
+import org.awaitility.Awaitility;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.PartitionAttributes;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.TransactionId;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.client.PoolFactory;
+import org.apache.geode.cache.client.PoolManager;
+import org.apache.geode.cache.client.internal.InternalClientCache;
+import org.apache.geode.cache.client.internal.PoolImpl;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.distributed.internal.ServerLocation;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXStateProxyImpl;
+import org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil;
+import org.apache.geode.internal.cache.tx.ClientTXStateStub;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.ClientCacheRule;
+import org.apache.geode.test.dunit.rules.DistributedTestRule;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+@Category(DistributedTest.class)
+public class ClientServerJTAFailoverDistributedTest implements Serializable {
+ private String hostName;
+ private String uniqueName;
+ private String regionName;
+ private VM server1;
+ private VM server2;
+ private VM server3;
+ private VM client1;
+ private int port1;
+ private int port2;
+
+ private final int key = 1;
+ private final String value = "value1";
+ private final String newValue = "value2";
+
+ @Rule
+ public DistributedTestRule distributedTestRule = new DistributedTestRule();
+
+ @Rule
+ public CacheRule cacheRule = new CacheRule();
+
+ @Rule
+ public ClientCacheRule clientCacheRule = new ClientCacheRule();
+
+ @Rule
+ public SerializableTestName testName = new SerializableTestName();
+
+ @Before
+ public void setUp() {
+ server1 = getVM(0);
+ server2 = getVM(1);
+ server3 = getVM(2);
+ client1 = getVM(3);
+
+ hostName = getHostName();
+ uniqueName = getClass().getSimpleName() + "_" + testName.getMethodName();
+ regionName = uniqueName + "_region";
+ }
+
+ @Test
+ public void jtaCanFailoverAfterDoneBeforeCompletion() {
+ server3.invoke(() -> createServerRegion(1, false));
+ server3.invoke(() -> doPut(key, value));
+ port1 = server1.invoke(() -> createServerRegion(1, true));
+ port2 = server2.invoke(() -> createServerRegion(1, true));
+
+ client1.invoke(() -> createClientRegion(port1, port2));
+
+ Object[] beforeCompletionResults = client1.invoke(() ->
doBeforeCompletion());
+
+ int port = (Integer) beforeCompletionResults[1];
+
+ if (port == port1) {
+ server1.invoke(() -> cacheRule.getCache().close());
+ } else {
+ assert port == port2;
+ server2.invoke(() -> cacheRule.getCache().close());
+ }
+
+ client1.invoke(() -> doAfterCompletion((TransactionId)
beforeCompletionResults[0], true));
+ }
+
+ private int createServerRegion(int totalNumBuckets, boolean isAccessor)
throws Exception {
+ PartitionAttributesFactory factory = new PartitionAttributesFactory();
+ factory.setTotalNumBuckets(totalNumBuckets);
+ if (isAccessor) {
+ factory.setLocalMaxMemory(0);
+ }
+ PartitionAttributes partitionAttributes = factory.create();
+ cacheRule.getOrCreateCache().createRegionFactory(RegionShortcut.PARTITION)
+ .setPartitionAttributes(partitionAttributes).create(regionName);
+
+ CacheServer server = cacheRule.getCache().addCacheServer();
+ server.setPort(0);
+ server.start();
+ return server.getPort();
+ }
+
+ private void createClientRegion(int... ports) {
+ clientCacheRule.createClientCache();
+
+ CacheServerTestUtil.disableShufflingOfEndpoints();
+ PoolImpl pool;
+ try {
+ pool = getPool(ports);
+ } finally {
+ CacheServerTestUtil.enableShufflingOfEndpoints();
+ }
+
+ ClientRegionFactory crf =
+
clientCacheRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.LOCAL);
+ crf.setPoolName(pool.getName());
+ crf.create(regionName);
+
+ if (ports.length > 1) {
+ pool.acquireConnection(new ServerLocation(hostName, port1));
+ }
+ }
+
+ private PoolImpl getPool(int... ports) {
+ PoolFactory factory = PoolManager.createFactory();
+ for (int port : ports) {
+ factory.addServer(hostName, port);
+ }
+ return (PoolImpl) factory.setReadTimeout(2000).setSocketBufferSize(1000)
+ .setMinConnections(4).create(uniqueName);
+ }
+
+ private void doPut(int key, String value) {
+ cacheRule.getCache().getRegion(regionName).put(key, value);
+ }
+
+ private Object[] doBeforeCompletion() {
+ Object[] results = new Object[2];
+ InternalClientCache cache = clientCacheRule.getClientCache();
+ Region region = cache.getRegion(regionName);
+ TXManagerImpl txManager = (TXManagerImpl)
cache.getCacheTransactionManager();
+ txManager.begin();
+ region.put(key, newValue);
+
+ TXStateProxyImpl txStateProxy = (TXStateProxyImpl) txManager.getTXState();
+ ClientTXStateStub clientTXStateStub = (ClientTXStateStub)
txStateProxy.getRealDeal(null, null);
+ clientTXStateStub.beforeCompletion();
+ TransactionId transactionId = txManager.suspend();
+ int port = clientTXStateStub.getServerAffinityLocation().getPort();
+ results[0] = transactionId;
+ results[1] = port;
+ return results;
+ }
+
+ private void doAfterCompletion(TransactionId transactionId, boolean
isCommit) {
+ InternalClientCache cache = clientCacheRule.getClientCache();
+ Region region = cache.getRegion(regionName);
+ TXManagerImpl txManager = (TXManagerImpl)
cache.getCacheTransactionManager();
+ txManager.resume(transactionId);
+
+ TXStateProxyImpl txStateProxy = (TXStateProxyImpl) txManager.getTXState();
+ ClientTXStateStub clientTXStateStub = (ClientTXStateStub)
txStateProxy.getRealDeal(null, null);
+ try {
+ clientTXStateStub
+ .afterCompletion(isCommit ? Status.STATUS_COMMITTED :
Status.STATUS_ROLLEDBACK);
+ } catch (Exception exception) {
+ LogService.getLogger().info("exception stack ", exception);
+ throw exception;
+ }
+ if (isCommit) {
+ assertEquals(newValue, region.get(key));
+ } else {
+ assertEquals(value, region.get(key));
+ }
+ }
+
+ @Test
+ public void jtaCanFailoverToJTAHostAfterDoneBeforeCompletion() {
+ port2 = server2.invoke(() -> createServerRegion(1, false));
+ server2.invoke(() -> doPut(key, value));
+ port1 = server1.invoke(() -> createServerRegion(1, true));
+
+ client1.invoke(() -> createClientRegion(port1, port2));
+ Object[] beforeCompletionResults = client1.invoke(() ->
doBeforeCompletion());
+
+ server1.invoke(() -> cacheRule.getCache().close());
+
+ client1.invoke(() -> doAfterCompletion((TransactionId)
beforeCompletionResults[0], true));
+ }
+
+ @Test
+ public void jtaCanFailoverWithRollbackAfterDoneBeforeCompletion() {
+ server3.invoke(() -> createServerRegion(1, false));
+ server3.invoke(() -> doPut(key, value));
+ port1 = server1.invoke(() -> createServerRegion(1, true));
+ port2 = server2.invoke(() -> createServerRegion(1, true));
+
+ client1.invoke(() -> createClientRegion(port1, port2));
+
+ Object[] beforeCompletionResults = client1.invoke(() ->
doBeforeCompletion());
+
+ int port = (Integer) beforeCompletionResults[1];
+
+ if (port == port1) {
+ server1.invoke(() -> cacheRule.getCache().close());
+ } else {
+ assert port == port2;
+ server2.invoke(() -> cacheRule.getCache().close());
+ }
+
+ client1.invoke(() -> doAfterCompletion((TransactionId)
beforeCompletionResults[0], false));
+
+ createClientRegion(port == port1 ? port2 : port1);
+ doPutTransaction(true);
+ }
+
+ private void doPutTransaction(boolean isClient) {
+ Region region;
+ TXManagerImpl txManager;
+ if (isClient) {
+ InternalClientCache cache = clientCacheRule.getClientCache();
+ region = cache.getRegion(regionName);
+ txManager = (TXManagerImpl) cache.getCacheTransactionManager();
+ } else {
+ InternalCache cache = cacheRule.getCache();
+ region = cache.getRegion(regionName);
+ txManager = (TXManagerImpl) cache.getCacheTransactionManager();
+
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).pollInterval(10,
TimeUnit.MILLISECONDS)
+ .until(() -> txManager.isHostedTXStatesEmpty());
+ }
+ txManager.begin();
+ region.put(key, newValue);
+ txManager.commit();
+ assertEquals(newValue, region.get(key));
+ }
+
+}