This is an automated email from the ASF dual-hosted git repository. eshu11 pushed a commit to branch feature/GEODE-5376 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 69d51c91a13ebc89d31ded924690c2a5e3c369e7 Author: eshu <[email protected]> AuthorDate: Tue Jul 10 12:01:51 2018 -0700 GEODE-5376: Move SynchronizationRunnable to TXState Different threads can handle JTA beforeCompletion and afterCompletion after client failover. Use TXStateSynchronizationRunnable to handle this case. --- .../cache/DistTXStateProxyImplOnCoordinator.java | 6 - .../internal/cache/PausedTXStateProxyImpl.java | 8 - .../apache/geode/internal/cache/TXManagerImpl.java | 4 + .../org/apache/geode/internal/cache/TXState.java | 159 +++++++++--- .../apache/geode/internal/cache/TXStateProxy.java | 9 - .../geode/internal/cache/TXStateProxyImpl.java | 25 -- ...le.java => TXStateSynchronizationRunnable.java} | 22 +- .../sockets/command/TXSynchronizationCommand.java | 235 +++++++----------- .../geode/internal/cache/tx/ClientTXStateStub.java | 4 + ...ava => TXStateSynchronizationRunnableTest.java} | 54 ++-- .../apache/geode/internal/cache/TXStateTest.java | 152 ++++++++++++ .../command/TXSynchronizationCommandTest.java | 119 +++++++++ .../ClientServerJTAFailoverDistributedTest.java | 273 +++++++++++++++++++++ 13 files changed, 808 insertions(+), 262 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java index c088c0f..031de9e 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java @@ -117,9 +117,6 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl { } inProgress = preserveTx; - if (this.synchRunnable != null) { - this.synchRunnable.abort(); - } } } @@ -281,9 +278,6 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl { } finally { inProgress = false; - if (this.synchRunnable != null) { - this.synchRunnable.abort(); - } } /* diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PausedTXStateProxyImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PausedTXStateProxyImpl.java index a796d5c..2e52551 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PausedTXStateProxyImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PausedTXStateProxyImpl.java @@ -390,14 +390,6 @@ public class PausedTXStateProxyImpl implements TXStateProxy { public void setJCATransaction() {} @Override - public void setSynchronizationRunnable(TXSynchronizationRunnable sync) {} - - @Override - public TXSynchronizationRunnable getSynchronizationRunnable() { - return null; - } - - @Override public void suspend() {} @Override diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java index c5c7653..639656f 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java @@ -1875,4 +1875,8 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene return hostedTXStates; } + public boolean isHostedTXStatesEmpty() { + return hostedTXStates.isEmpty(); + } + } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java index 9768fb8..6b2ca35 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java @@ -24,23 +24,28 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Executor; import java.util.concurrent.locks.ReentrantLock; import javax.transaction.Status; +import edu.umd.cs.findbugs.annotations.SuppressWarnings; import org.apache.logging.log4j.Logger; import org.apache.geode.CancelException; +import org.apache.geode.InternalGemFireError; import org.apache.geode.SystemFailure; import org.apache.geode.cache.CommitConflictException; import org.apache.geode.cache.DiskAccessException; import org.apache.geode.cache.EntryNotFoundException; +import org.apache.geode.cache.FailedSynchronizationException; import org.apache.geode.cache.Operation; import org.apache.geode.cache.Region; import org.apache.geode.cache.Region.Entry; import org.apache.geode.cache.RegionDestroyedException; import org.apache.geode.cache.SynchronizationCommitConflictException; import org.apache.geode.cache.TransactionDataRebalancedException; +import org.apache.geode.cache.TransactionException; import org.apache.geode.cache.TransactionId; import org.apache.geode.cache.TransactionWriter; import org.apache.geode.cache.TransactionWriterException; @@ -48,6 +53,7 @@ import org.apache.geode.cache.UnsupportedOperationInTransactionException; import org.apache.geode.cache.client.internal.ServerRegionDataAccess; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.TXManagerCancelledException; +import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.Assert; import org.apache.geode.internal.cache.control.MemoryThresholds; @@ -95,6 +101,15 @@ public class TXState implements TXStateInterface { */ private int modSerialNum; private final List<EntryEventImpl> pendingCallbacks = new ArrayList<EntryEventImpl>(); + /** + * for client/server JTA transactions we need to have a single thread handle both beforeCompletion + * and afterCompletion so that beforeCompletion can obtain locks for the afterCompletion step. + * This is that thread + */ + protected volatile TXStateSynchronizationRunnable syncRunnable; + + private volatile SynchronizationCommitConflictException beforeCompletionException; + private volatile RuntimeException afterCompletionException; // Internal testing hooks private Runnable internalAfterReservation; @@ -901,6 +916,9 @@ public class TXState implements TXStateInterface { synchronized (this.completionGuard) { this.completionGuard.notifyAll(); } + if (this.syncRunnable != null) { + this.syncRunnable.abort(); + } if (iae != null && !this.proxy.getCache().isClosed()) { throw iae; } @@ -1000,11 +1018,45 @@ public class TXState implements TXStateInterface { if (this.closed) { throw new TXManagerCancelledException(); } - this.proxy.getTxMgr().setTXState(null); - final long opStart = CachePerfStats.getStatTime(); - this.jtaLifeTime = opStart - getBeginTime(); + TXStateSynchronizationRunnable sync = createTxStateSynchronizationRunnable(); + setSynchronizationRunnable(sync); + + Executor exec = getExecutor(); + exec.execute(sync); + sync.waitForFirstExecution(); + if (getBeforeCompletionException() != null) { + throw getBeforeCompletionException(); + } + } + TXStateSynchronizationRunnable createTxStateSynchronizationRunnable() { + Runnable beforeCompletion = new Runnable() { + @SuppressWarnings("synthetic-access") + public void run() { + doBeforeCompletion(); + } + }; + return new TXStateSynchronizationRunnable(getCache().getCancelCriterion(), + beforeCompletion); + } + + Executor getExecutor() { + return InternalDistributedSystem.getConnectedInstance().getDistributionManager() + .getWaitingThreadPool(); + } + + SynchronizationCommitConflictException getBeforeCompletionException() { + return beforeCompletionException; + } + + private void setSynchronizationRunnable(TXStateSynchronizationRunnable synchronizationRunnable) { + syncRunnable = synchronizationRunnable; + } + + void doBeforeCompletion() { + final long opStart = CachePerfStats.getStatTime(); + this.jtaLifeTime = opStart - getBeginTime(); try { reserveAndCheck(); /* @@ -1042,8 +1094,8 @@ public class TXState implements TXStateInterface { } } catch (CommitConflictException commitConflict) { cleanup(); - this.proxy.getTxMgr().noteCommitFailure(opStart, this.jtaLifeTime, this); - throw new SynchronizationCommitConflictException( + proxy.getTxMgr().noteCommitFailure(opStart, this.jtaLifeTime, this); + beforeCompletionException = new SynchronizationCommitConflictException( LocalizedStrings.TXState_CONFLICT_DETECTED_IN_GEMFIRE_TRANSACTION_0 .toLocalizedString(getTransactionId()), commitConflict); @@ -1057,40 +1109,79 @@ public class TXState implements TXStateInterface { */ @Override public void afterCompletion(int status) { - // System.err.println("start afterCompletion"); + this.proxy.getTxMgr().setTXState(null); + + Runnable afterCompletion = new Runnable() { + @SuppressWarnings("synthetic-access") + public void run() { + doAfterCompletion(status); + } + }; + // if there was a beforeCompletion call then there will be a thread + // sitting in the waiting pool to execute afterCompletion. Otherwise + // throw FailedSynchronizationException(). + TXStateSynchronizationRunnable sync = getSynchronizationRunnable(); + if (sync != null) { + sync.runSecondRunnable(afterCompletion); + if (getAfterCompletionException() != null) { + throw getAfterCompletionException(); + } + } else { + // rollback does not run beforeCompletion. + if (status != Status.STATUS_ROLLEDBACK) { + throw new FailedSynchronizationException( + "Could not execute afterCompletion when beforeCompletion was not executed"); + } + doAfterCompletion(status); + } + } + + TXStateSynchronizationRunnable getSynchronizationRunnable() { + return this.syncRunnable; + } + + RuntimeException getAfterCompletionException() { + return afterCompletionException; + } + + void doAfterCompletion(int status) { final long opStart = CachePerfStats.getStatTime(); - switch (status) { - case Status.STATUS_COMMITTED: - // System.err.println("begin commit in afterCompletion"); - Assert.assertTrue(this.locks != null, - "Gemfire Transaction afterCompletion called with illegal state."); - try { - proxy.getTxMgr().setTXState(null); - commit(); - saveTXCommitMessageForClientFailover(); - } catch (CommitConflictException error) { - Assert.assertTrue(false, "Gemfire Transaction " + getTransactionId() - + " afterCompletion failed.due to CommitConflictException: " + error); - } + try { + switch (status) { + case Status.STATUS_COMMITTED: + Assert.assertTrue(this.locks != null, + "Gemfire Transaction afterCompletion called with illegal state."); + try { + commit(); + saveTXCommitMessageForClientFailover(); + } catch (CommitConflictException error) { + Assert.assertTrue(false, "Gemfire Transaction " + getTransactionId() + + " afterCompletion failed.due to CommitConflictException: " + error); + } - this.proxy.getTxMgr().noteCommitSuccess(opStart, this.jtaLifeTime, this); - this.locks = null; - // System.err.println("end commit in afterCompletion"); - break; - case Status.STATUS_ROLLEDBACK: - this.jtaLifeTime = opStart - getBeginTime(); - this.proxy.getTxMgr().setTXState(null); - rollback(); - saveTXCommitMessageForClientFailover(); - this.proxy.getTxMgr().noteRollbackSuccess(opStart, this.jtaLifeTime, this); - break; - default: - Assert.assertTrue(false, "Unknown JTA Synchronization status " + status); + this.proxy.getTxMgr().noteCommitSuccess(opStart, this.jtaLifeTime, this); + this.locks = null; + break; + case Status.STATUS_ROLLEDBACK: + this.jtaLifeTime = opStart - getBeginTime(); + rollback(); + saveTXCommitMessageForClientFailover(); + this.proxy.getTxMgr().noteRollbackSuccess(opStart, this.jtaLifeTime, this); + break; + default: + Assert.assertTrue(false, "Unknown JTA Synchronization status " + status); + } + } catch (RuntimeException exception) { + LogService.getLogger().info("got exception " + exception); + afterCompletionException = exception; + } catch (InternalGemFireError error) { + TransactionException exception = new TransactionException(error); + afterCompletionException = exception; } - // System.err.println("end afterCompletion"); + } - private void saveTXCommitMessageForClientFailover() { + void saveTXCommitMessageForClientFailover() { proxy.getTxMgr().saveTXStateForClientFailover(proxy); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxy.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxy.java index f037a02..7a79914 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxy.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxy.java @@ -54,15 +54,6 @@ public interface TXStateProxy extends TXStateInterface { void setJCATransaction(); /** - * establishes the synchronization thread used for client/server beforeCompletion/afterCompletion - * processing - * - */ - void setSynchronizationRunnable(TXSynchronizationRunnable sync); - - TXSynchronizationRunnable getSynchronizationRunnable(); - - /** * Perform additional tasks required by the proxy to suspend a transaction */ void suspend(); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java index f9aa2d4..00f15c3 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java @@ -57,12 +57,6 @@ public class TXStateProxyImpl implements TXStateProxy { private boolean commitRequestedByOwner; private boolean isJCATransaction; - /** - * for client/server JTA transactions we need to have a single thread handle both beforeCompletion - * and afterCompletion so that beforeC can obtain locks for the afterC step. This is that thread - */ - protected volatile TXSynchronizationRunnable synchRunnable; - private final ReentrantLock lock = new ReentrantLock(); /** number of operations in this transaction */ @@ -99,16 +93,6 @@ public class TXStateProxyImpl implements TXStateProxy { } @Override - public void setSynchronizationRunnable(TXSynchronizationRunnable synch) { - this.synchRunnable = synch; - } - - @Override - public TXSynchronizationRunnable getSynchronizationRunnable() { - return this.synchRunnable; - } - - @Override public ReentrantLock getLock() { return this.lock; } @@ -230,9 +214,6 @@ public class TXStateProxyImpl implements TXStateProxy { throw e; } finally { inProgress = preserveTx; - if (this.synchRunnable != null) { - this.synchRunnable.abort(); - } } } @@ -410,9 +391,6 @@ public class TXStateProxyImpl implements TXStateProxy { getRealDeal(null, null).rollback(); } finally { inProgress = false; - if (this.synchRunnable != null) { - this.synchRunnable.abort(); - } } } @@ -472,9 +450,6 @@ public class TXStateProxyImpl implements TXStateProxy { getRealDeal(null, null).afterCompletion(status); } finally { this.inProgress = false; - if (this.synchRunnable != null) { - this.synchRunnable.abort(); - } } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXSynchronizationRunnable.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java similarity index 86% rename from geode-core/src/main/java/org/apache/geode/internal/cache/TXSynchronizationRunnable.java rename to geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java index 4603d93..415bcdd 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXSynchronizationRunnable.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java @@ -17,22 +17,20 @@ package org.apache.geode.internal.cache; import org.apache.logging.log4j.Logger; import org.apache.geode.CancelCriterion; -import org.apache.geode.internal.cache.tier.sockets.CommBufferPool; import org.apache.geode.internal.logging.LogService; /** - * TXSynchronizationThread manages beforeCompletion and afterCompletion calls on behalf of a client - * cache. The thread should be instantiated with a Runnable that invokes beforeCompletion behavior. + * TXSynchronizationThread manages beforeCompletion and afterCompletion calls in TXState. + * The thread should be instantiated with a Runnable that invokes beforeCompletion behavior. * Then you must invoke runSecondRunnable() with another Runnable that invokes afterCompletion * behavior. * - * @since GemFire 6.6 + * @since Geode 1.8.0 */ -public class TXSynchronizationRunnable implements Runnable { +public class TXStateSynchronizationRunnable implements Runnable { private static final Logger logger = LogService.getLogger(); private final CancelCriterion cancelCriterion; - private final CommBufferPool commBufferPool; private Runnable firstRunnable; private final Object firstRunnableSync = new Object(); @@ -44,21 +42,15 @@ public class TXSynchronizationRunnable implements Runnable { private boolean abort; - public TXSynchronizationRunnable(final CancelCriterion cancelCriterion, - final CommBufferPool commBufferPool, final Runnable beforeCompletion) { + public TXStateSynchronizationRunnable(final CancelCriterion cancelCriterion, + final Runnable beforeCompletion) { this.cancelCriterion = cancelCriterion; - this.commBufferPool = commBufferPool; this.firstRunnable = beforeCompletion; } @Override public void run() { - commBufferPool.setTLCommBuffer(); - try { - doSynchronizationOps(); - } finally { - commBufferPool.releaseTLCommBuffer(); - } + doSynchronizationOps(); } private void doSynchronizationOps() { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java index fd9c17f..037702a 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java @@ -16,19 +16,14 @@ package org.apache.geode.internal.cache.tier.sockets.command; import java.io.IOException; -import java.util.concurrent.Executor; - -import javax.transaction.Status; import org.apache.geode.cache.client.internal.TXSynchronizationOp.CompletionType; -import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.distributed.internal.ReplyException; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.cache.TXCommitMessage; import org.apache.geode.internal.cache.TXId; import org.apache.geode.internal.cache.TXManagerImpl; import org.apache.geode.internal.cache.TXStateProxy; -import org.apache.geode.internal.cache.TXSynchronizationRunnable; import org.apache.geode.internal.cache.tier.Command; import org.apache.geode.internal.cache.tier.MessageType; import org.apache.geode.internal.cache.tier.sockets.BaseCommand; @@ -50,21 +45,6 @@ public class TXSynchronizationCommand extends BaseCommand { * (non-Javadoc) * * @see - * org.apache.geode.internal.cache.tier.sockets.BaseCommand#shouldMasqueradeForTx(org.apache.geode - * .internal.cache.tier.sockets.Message, - * org.apache.geode.internal.cache.tier.sockets.ServerConnection) - */ - @Override - protected boolean shouldMasqueradeForTx(Message clientMessage, - ServerConnection serverConnection) { - // masquerading is done in the waiting thread pool - return false; - } - - /* - * (non-Javadoc) - * - * @see * org.apache.geode.internal.cache.tier.sockets.BaseCommand#cmdExecute(org.apache.geode.internal. * cache.tier.sockets.Message, org.apache.geode.internal.cache.tier.sockets.ServerConnection, * long) @@ -86,20 +66,18 @@ public class TXSynchronizationCommand extends BaseCommand { statusPart = null; } - final TXManagerImpl txMgr = - (TXManagerImpl) serverConnection.getCache().getCacheTransactionManager(); - final InternalDistributedMember member = - (InternalDistributedMember) serverConnection.getProxyID().getDistributedMember(); + final TXManagerImpl txMgr = getTXManager(serverConnection); + final InternalDistributedMember member = getDistributedMember(serverConnection); - // get the tx state without associating it with this thread. That's done later - final TXStateProxy txProxy = txMgr.masqueradeAs(clientMessage, member, true); + final TXStateProxy txProxy = txMgr.getTXState(); + assert txProxy != null; final TXId txId = txProxy.getTxId(); TXCommitMessage commitMessage = txMgr.getRecentlyCompletedMessage(txId); if (commitMessage != null && commitMessage != TXCommitMessage.ROLLBACK_MSG) { assert type == CompletionType.AFTER_COMPLETION; try { - CommitCommand.writeCommitResponse(commitMessage, clientMessage, serverConnection); + writeCommitResponse(clientMessage, serverConnection, commitMessage); } catch (IOException e) { if (isDebugEnabled) { logger.debug("Problem writing reply to client", e); @@ -117,137 +95,92 @@ public class TXSynchronizationCommand extends BaseCommand { return; } - // we have to run beforeCompletion and afterCompletion in the same thread - // because beforeCompletion obtains locks for the thread and afterCompletion - // releases them - if (txProxy != null) { - try { - if (type == CompletionType.BEFORE_COMPLETION) { - Runnable beforeCompletion = new Runnable() { - @SuppressWarnings("synthetic-access") - public void run() { - TXStateProxy txState = null; - Throwable failureException = null; - try { - txState = txMgr.masqueradeAs(clientMessage, member, false); - if (isDebugEnabled) { - logger.debug("Executing beforeCompletion() notification for transaction {}", - clientMessage.getTransactionId()); - } - txState.setIsJTA(true); - txState.beforeCompletion(); - try { - writeReply(clientMessage, serverConnection); - } catch (IOException e) { - if (isDebugEnabled) { - logger.debug("Problem writing reply to client", e); - } - } - serverConnection.setAsTrue(RESPONDED); - } catch (ReplyException e) { - failureException = e.getCause(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (Exception e) { - failureException = e; - } finally { - txMgr.unmasquerade(txState); - } - if (failureException != null) { - try { - writeException(clientMessage, failureException, false, serverConnection); - } catch (IOException ioe) { - if (isDebugEnabled) { - logger.debug("Problem writing reply to client", ioe); - } - } - serverConnection.setAsTrue(RESPONDED); - } + try { + if (type == CompletionType.BEFORE_COMPLETION) { + if (isDebugEnabled) { + logger.debug("Executing beforeCompletion() notification for transaction {}", + clientMessage.getTransactionId()); + } + Throwable failureException = null; + try { + txProxy.setIsJTA(true); + txProxy.beforeCompletion(); + try { + writeReply(clientMessage, serverConnection); + } catch (IOException e) { + if (isDebugEnabled) { + logger.debug("Problem writing reply to client", e); + } + } + serverConnection.setAsTrue(RESPONDED); + } catch (ReplyException e) { + failureException = e.getCause(); + } catch (Exception e) { + failureException = e; + } + if (failureException != null) { + try { + writeException(clientMessage, failureException, false, serverConnection); + } catch (IOException ioe) { + if (isDebugEnabled) { + logger.debug("Problem writing reply to client", ioe); } - }; - TXSynchronizationRunnable sync = - new TXSynchronizationRunnable(serverConnection.getCache().getCancelCriterion(), - serverConnection.getAcceptor(), beforeCompletion); - txProxy.setSynchronizationRunnable(sync); - Executor exec = InternalDistributedSystem.getConnectedInstance().getDistributionManager() - .getWaitingThreadPool(); - exec.execute(sync); - sync.waitForFirstExecution(); - } else { - Runnable afterCompletion = new Runnable() { - @SuppressWarnings("synthetic-access") - public void run() { - TXStateProxy txState = null; - try { - txState = txMgr.masqueradeAs(clientMessage, member, false); - int status = statusPart.getInt(); - if (isDebugEnabled) { - logger.debug("Executing afterCompletion({}) notification for transaction {}", - status, clientMessage.getTransactionId()); - } - txState.setIsJTA(true); - txState.afterCompletion(status); - // GemFire commits during afterCompletion - send the commit info back to the client - // where it can be applied to the local cache - TXCommitMessage cmsg = txState.getCommitMessage(); - try { - CommitCommand.writeCommitResponse(cmsg, clientMessage, serverConnection); - txMgr.removeHostedTXState(txState.getTxId()); - } catch (IOException e) { - // not much can be done here - if (isDebugEnabled || (e instanceof MessageTooLargeException)) { - logger.warn("Problem writing reply to client", e); - } - } - serverConnection.setAsTrue(RESPONDED); - } catch (RuntimeException e) { - try { - writeException(clientMessage, e, false, serverConnection); - } catch (IOException ioe) { - if (isDebugEnabled) { - logger.debug("Problem writing reply to client", ioe); - } - } - serverConnection.setAsTrue(RESPONDED); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } finally { - txMgr.unmasquerade(txState); - } + } + serverConnection.setAsTrue(RESPONDED); + } + } else { + try { + int status = statusPart.getInt(); + if (isDebugEnabled) { + logger.debug("Executing afterCompletion({}) notification for transaction {}", + status, clientMessage.getTransactionId()); + } + txProxy.setIsJTA(true); + txProxy.setCommitOnBehalfOfRemoteStub(true); + txProxy.afterCompletion(status); + // GemFire commits during afterCompletion - send the commit info back to the client + // where it can be applied to the local cache + TXCommitMessage cmsg = txProxy.getCommitMessage(); + try { + writeCommitResponse(clientMessage, serverConnection, cmsg); + txMgr.removeHostedTXState(txProxy.getTxId()); + } catch (IOException e) { + // not much can be done here + if (isDebugEnabled || (e instanceof MessageTooLargeException)) { + logger.warn("Problem writing reply to client", e); } - }; - // if there was a beforeCompletion call then there will be a thread - // sitting in the waiting pool to execute afterCompletion. Otherwise - // we have failed-over and may need to do beforeCompletion & hope that it works - TXSynchronizationRunnable sync = txProxy.getSynchronizationRunnable(); - if (sync != null) { - sync.runSecondRunnable(afterCompletion); - } else { - if (statusPart.getInt() == Status.STATUS_COMMITTED) { - TXStateProxy txState = txMgr.masqueradeAs(clientMessage, member, false); - try { - if (isDebugEnabled) { - logger.debug( - "Executing beforeCompletion() notification for transaction {} after failover", - clientMessage.getTransactionId()); - } - txState.setIsJTA(true); - txState.beforeCompletion(); - } finally { - txMgr.unmasquerade(txState); - } + } + serverConnection.setAsTrue(RESPONDED); + } catch (RuntimeException e) { + try { + writeException(clientMessage, e, false, serverConnection); + } catch (IOException ioe) { + if (isDebugEnabled) { + logger.debug("Problem writing reply to client", ioe); } - afterCompletion.run(); } + serverConnection.setAsTrue(RESPONDED); } - } catch (Exception e) { - writeException(clientMessage, MessageType.EXCEPTION, e, false, serverConnection); - serverConnection.setAsTrue(RESPONDED); - } - if (isDebugEnabled) { - logger.debug("Sent tx synchronization response"); } + } catch (Exception e) { + writeException(clientMessage, MessageType.EXCEPTION, e, false, serverConnection); + serverConnection.setAsTrue(RESPONDED); + } + if (isDebugEnabled) { + logger.debug("Sent tx synchronization response"); } } + void writeCommitResponse(Message clientMessage, ServerConnection serverConnection, + TXCommitMessage commitMessage) throws IOException { + CommitCommand.writeCommitResponse(commitMessage, clientMessage, serverConnection); + } + + InternalDistributedMember getDistributedMember(ServerConnection serverConnection) { + return (InternalDistributedMember) serverConnection.getProxyID().getDistributedMember(); + } + + TXManagerImpl getTXManager(ServerConnection serverConnection) { + return (TXManagerImpl) serverConnection.getCache().getCacheTransactionManager(); + } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java index c97d9f0..4469b71 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java @@ -322,4 +322,8 @@ public class ClientTXStateStub extends TXStateStub { public void setAfterLocalLocks(Runnable afterLocalLocks) { this.internalAfterLocalLocks = afterLocalLocks; } + + public ServerLocation getServerAffinityLocation() { + return serverAffinityLocation; + } } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/TXSynchronizationRunnableTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnableTest.java similarity index 52% rename from geode-core/src/test/java/org/apache/geode/internal/cache/TXSynchronizationRunnableTest.java rename to geode-core/src/test/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnableTest.java index a6ba2f5..05b811b 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/TXSynchronizationRunnableTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnableTest.java @@ -18,6 +18,9 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import org.junit.Before; import org.junit.Test; @@ -25,40 +28,63 @@ import org.junit.experimental.categories.Category; import org.apache.geode.CancelCriterion; import org.apache.geode.cache.CacheClosedException; -import org.apache.geode.internal.cache.tier.sockets.CommBufferPool; import org.apache.geode.test.junit.categories.UnitTest; @Category(UnitTest.class) -public class TXSynchronizationRunnableTest { - +public class TXStateSynchronizationRunnableTest { private CancelCriterion cancelCriterion; - private CommBufferPool commBufferPool; private Runnable beforeCompletion; + private Runnable afterCompletion; private CacheClosedException exception; - @Before public void setUp() { exception = new CacheClosedException(); cancelCriterion = mock(CancelCriterion.class); - commBufferPool = mock(CommBufferPool.class); beforeCompletion = mock(Runnable.class); + afterCompletion = mock(Runnable.class); + } + @Test + public void waitForFirstExecutionThrowsExceptionIfCacheClosed() { doThrow(exception).when(cancelCriterion).checkCancelInProgress(any()); + TXStateSynchronizationRunnable runnable = + new TXStateSynchronizationRunnable(cancelCriterion, beforeCompletion); + assertThatThrownBy(() -> runnable.waitForFirstExecution()).isSameAs(exception); } @Test - public void test() { - TXSynchronizationRunnable runnable = - new TXSynchronizationRunnable(cancelCriterion, commBufferPool, beforeCompletion); - assertThatThrownBy(() -> runnable.waitForFirstExecution()).isSameAs(exception); + public void runSecondRunnableThrowsExceptionIfCacheClosed() { + doThrow(exception).when(cancelCriterion).checkCancelInProgress(any()); + TXStateSynchronizationRunnable runnable = + new TXStateSynchronizationRunnable(cancelCriterion, beforeCompletion); + assertThatThrownBy(() -> runnable.runSecondRunnable(afterCompletion)).isSameAs(exception); } @Test - public void test1() { - TXSynchronizationRunnable runnable = - new TXSynchronizationRunnable(cancelCriterion, commBufferPool, beforeCompletion); - assertThatThrownBy(() -> runnable.runSecondRunnable(mock(Runnable.class))).isSameAs(exception); + public void doSynchronizationOpsWaitsUntilRunSecondRunnable() { + TXStateSynchronizationRunnable runnable = + new TXStateSynchronizationRunnable(cancelCriterion, beforeCompletion); + new Thread(() -> { + runnable.runSecondRunnable(afterCompletion); + }).start(); + runnable.run(); + verify(beforeCompletion, times(1)).run(); + verify(afterCompletion, times(1)).run(); } + + @Test + public void doSynchronizationOpsDoesNotRunSecondRunnableIfAborted() { + TXStateSynchronizationRunnable runnable = + new TXStateSynchronizationRunnable(cancelCriterion, beforeCompletion); + runnable.abort(); + new Thread(() -> { + runnable.runSecondRunnable(afterCompletion); + }).start(); + runnable.run(); + verify(beforeCompletion, times(1)).run(); + verify(afterCompletion, never()).run(); + } + } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java new file mode 100644 index 0000000..0dce944 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.concurrent.Executor; + +import javax.transaction.Status; + +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.CancelCriterion; +import org.apache.geode.InternalGemFireError; +import org.apache.geode.cache.CommitConflictException; +import org.apache.geode.cache.SynchronizationCommitConflictException; +import org.apache.geode.cache.TransactionDataNodeHasDepartedException; +import org.apache.geode.cache.TransactionException; +import org.apache.geode.test.junit.categories.UnitTest; + +@Category(UnitTest.class) +public class TXStateTest { + private CancelCriterion cancelCriterion; + + private TXStateProxyImpl txStateProxy; + private CommitConflictException exception; + private TXStateSynchronizationRunnable txStateSynch; + private SynchronizationCommitConflictException synchronizationCommitConflictException; + private RuntimeException runtimeException; + private TransactionDataNodeHasDepartedException transactionDataNodeHasDepartedException; + + @Before + public void setup() { + txStateProxy = mock(TXStateProxyImpl.class); + + cancelCriterion = mock(CancelCriterion.class); + exception = new CommitConflictException(""); + txStateSynch = mock(TXStateSynchronizationRunnable.class); + synchronizationCommitConflictException = new SynchronizationCommitConflictException(""); + runtimeException = new RuntimeException(); + transactionDataNodeHasDepartedException = new TransactionDataNodeHasDepartedException(""); + + when(txStateProxy.getTxMgr()).thenReturn(mock(TXManagerImpl.class)); + } + + @Test + public void beforeCompletionThrowsSynchronizationCommitConflictExceptionIfBeforeCompletionExceptionIsSet() { + TXState txState = spy(new TXState(txStateProxy, true)); + + doReturn(mock(Executor.class)).when(txState).getExecutor(); + doReturn(txStateSynch).when(txState).createTxStateSynchronizationRunnable(); + doReturn(synchronizationCommitConflictException).when(txState).getBeforeCompletionException(); + + assertThatThrownBy(() -> txState.beforeCompletion()) + .isSameAs(synchronizationCommitConflictException); + } + + @Test + public void beforeCompletionExceptionIsSetWhenDoBeforeCompletionCouldNotLockKeys() { + TXState txState = spy(new TXState(txStateProxy, true)); + doThrow(exception).when(txState).reserveAndCheck(); + + txState.doBeforeCompletion(); + assertThat(txState.getBeforeCompletionException()) + .isInstanceOf(SynchronizationCommitConflictException.class); + } + + + @Test + public void afterCompletionThrowsExceptionIfAfterCompletionExceptionIsSet() { + TXState txState = spy(new TXState(txStateProxy, true)); + + doReturn(txStateSynch).when(txState).getSynchronizationRunnable(); + doReturn(runtimeException).when(txState).getAfterCompletionException(); + + assertThatThrownBy(() -> txState.afterCompletion(Status.STATUS_COMMITTED)) + .isSameAs(runtimeException); + } + + @Test + public void afterCompletionExceptionIsSetWhenCommitFailedWithTransactionDataNodeHasDepartedException() { + TXState txState = spy(new TXState(txStateProxy, true)); + doReturn(mock(InternalCache.class)).when(txState).getCache(); + txState.reserveAndCheck(); + doThrow(transactionDataNodeHasDepartedException).when(txState).commit(); + + txState.doAfterCompletion(Status.STATUS_COMMITTED); + assertThat(txState.getAfterCompletionException()) + .isSameAs(transactionDataNodeHasDepartedException); + } + + @Test + public void afterCompletionExceptionIsSetToTransactionExceptionWhenCommitFailedWithCommitConflictException() { + TXState txState = spy(new TXState(txStateProxy, true)); + doReturn(mock(InternalCache.class)).when(txState).getCache(); + txState.reserveAndCheck(); + doThrow(exception).when(txState).commit(); + + txState.doAfterCompletion(Status.STATUS_COMMITTED); + + assertThat(txState.getAfterCompletionException()).isInstanceOf(TransactionException.class); + TransactionException transactionException = + (TransactionException) txState.getAfterCompletionException(); + assertThat(transactionException.getCause()).isInstanceOf(InternalGemFireError.class); + } + + + @Test + public void afterCompletionCanCommitJTA() { + TXState txState = spy(new TXState(txStateProxy, false)); + doReturn(mock(InternalCache.class)).when(txState).getCache(); + txState.reserveAndCheck(); + txState.closed = true; + txState.doAfterCompletion(Status.STATUS_COMMITTED); + + assertThat(txState.locks).isNull(); + verify(txState, times(1)).saveTXCommitMessageForClientFailover(); + } + + @Test + public void afterCompletionCanRollbackJTA() { + TXState txState = spy(new TXState(txStateProxy, true)); + txState.afterCompletion(Status.STATUS_ROLLEDBACK); + + verify(txState, times(1)).rollback(); + verify(txState, times(1)).saveTXCommitMessageForClientFailover(); + } + +} diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommandTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommandTest.java new file mode 100644 index 0000000..adac845 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommandTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.tier.sockets.command; + +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import javax.transaction.Status; + +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.cache.client.internal.TXSynchronizationOp; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.TXCommitMessage; +import org.apache.geode.internal.cache.TXId; +import org.apache.geode.internal.cache.TXManagerImpl; +import org.apache.geode.internal.cache.TXStateProxyImpl; +import org.apache.geode.internal.cache.tier.Command; +import org.apache.geode.internal.cache.tier.sockets.Message; +import org.apache.geode.internal.cache.tier.sockets.Part; +import org.apache.geode.internal.cache.tier.sockets.ServerConnection; +import org.apache.geode.test.junit.categories.UnitTest; + +@Category(UnitTest.class) +public class TXSynchronizationCommandTest { + private Message clientMessage; + private ServerConnection serverConnection; + private TXManagerImpl txManager; + private TXStateProxyImpl txStateProxy; + private TXId txId; + private TXCommitMessage txCommitMessage; + private InternalDistributedMember member; + private Part part0; + private Part part1; + private Part part2; + private RuntimeException exception; + private TXSynchronizationCommand command; + + @Before + public void setup() { + clientMessage = mock(Message.class); + serverConnection = mock(ServerConnection.class); + txManager = mock(TXManagerImpl.class); + member = mock(InternalDistributedMember.class); + txStateProxy = mock(TXStateProxyImpl.class); + txId = mock(TXId.class); + txCommitMessage = mock(TXCommitMessage.class); + part0 = mock(Part.class); + part1 = mock(Part.class); + part2 = mock(Part.class); + exception = new RuntimeException(); + command = mock(TXSynchronizationCommand.class); + + when(clientMessage.getPart(0)).thenReturn(part0); + when(clientMessage.getPart(1)).thenReturn(part1); + when(clientMessage.getPart(2)).thenReturn(part2); + doReturn(txManager).when(command).getTXManager(serverConnection); + doReturn(member).when(command).getDistributedMember(serverConnection); + when(txManager.getTXState()).thenReturn(txStateProxy); + when(txStateProxy.getTxId()).thenReturn(txId); + } + + @Test + public void commandCanSendBackCommitMessageIfAlreadyCommitted() throws Exception { + when(part0.getInt()).thenReturn(TXSynchronizationOp.CompletionType.AFTER_COMPLETION.ordinal()); + when(txManager.getRecentlyCompletedMessage(txId)).thenReturn(txCommitMessage); + doNothing().when(command).writeCommitResponse(clientMessage, serverConnection, txCommitMessage); + + doCallRealMethod().when(command).cmdExecute(clientMessage, serverConnection, null, 1); + command.cmdExecute(clientMessage, serverConnection, null, 1); + + verify(command, times(1)).writeCommitResponse(clientMessage, serverConnection, txCommitMessage); + verify(serverConnection, times(1)).setAsTrue(Command.RESPONDED); + } + + @Test + public void commandCanInvokeBeforeCompletion() throws Exception { + when(part0.getInt()).thenReturn(TXSynchronizationOp.CompletionType.BEFORE_COMPLETION.ordinal()); + + doCallRealMethod().when(command).cmdExecute(clientMessage, serverConnection, null, 1); + command.cmdExecute(clientMessage, serverConnection, null, 1); + + verify(txStateProxy, times(1)).beforeCompletion(); + verify(serverConnection, times(1)).setAsTrue(Command.RESPONDED); + } + + @Test + public void commandCanSendBackCommitMessageAfterInvokeAfterCompletion() throws Exception { + when(part0.getInt()).thenReturn(TXSynchronizationOp.CompletionType.AFTER_COMPLETION.ordinal()); + when(part2.getInt()).thenReturn(Status.STATUS_COMMITTED); + when(txStateProxy.getCommitMessage()).thenReturn(txCommitMessage); + + doCallRealMethod().when(command).cmdExecute(clientMessage, serverConnection, null, 1); + command.cmdExecute(clientMessage, serverConnection, null, 1); + + verify(txStateProxy, times(1)).afterCompletion(Status.STATUS_COMMITTED); + verify(command, times(1)).writeCommitResponse(clientMessage, serverConnection, txCommitMessage); + verify(serverConnection, times(1)).setAsTrue(Command.RESPONDED); + } +} diff --git a/geode-core/src/test/java/org/apache/geode/internal/jta/functional/ClientServerJTAFailoverDistributedTest.java b/geode-core/src/test/java/org/apache/geode/internal/jta/functional/ClientServerJTAFailoverDistributedTest.java new file mode 100644 index 0000000..43ab209 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/jta/functional/ClientServerJTAFailoverDistributedTest.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.jta.functional; + +import static org.apache.geode.test.dunit.VM.getHostName; +import static org.apache.geode.test.dunit.VM.getVM; +import static org.junit.Assert.assertEquals; + +import java.io.Serializable; +import java.util.concurrent.TimeUnit; + +import javax.transaction.Status; + +import org.awaitility.Awaitility; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.cache.PartitionAttributes; +import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.TransactionId; +import org.apache.geode.cache.client.ClientRegionFactory; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.cache.client.PoolFactory; +import org.apache.geode.cache.client.PoolManager; +import org.apache.geode.cache.client.internal.InternalClientCache; +import org.apache.geode.cache.client.internal.PoolImpl; +import org.apache.geode.cache.server.CacheServer; +import org.apache.geode.distributed.internal.ServerLocation; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.TXManagerImpl; +import org.apache.geode.internal.cache.TXStateProxyImpl; +import org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil; +import org.apache.geode.internal.cache.tx.ClientTXStateStub; +import org.apache.geode.internal.logging.LogService; +import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.dunit.rules.CacheRule; +import org.apache.geode.test.dunit.rules.ClientCacheRule; +import org.apache.geode.test.dunit.rules.DistributedTestRule; +import org.apache.geode.test.junit.categories.DistributedTest; +import org.apache.geode.test.junit.rules.serializable.SerializableTestName; + +@Category(DistributedTest.class) +public class ClientServerJTAFailoverDistributedTest implements Serializable { + private String hostName; + private String uniqueName; + private String regionName; + private VM server1; + private VM server2; + private VM server3; + private VM client1; + private int port1; + private int port2; + + private final int key = 1; + private final String value = "value1"; + private final String newValue = "value2"; + + @Rule + public DistributedTestRule distributedTestRule = new DistributedTestRule(); + + @Rule + public CacheRule cacheRule = new CacheRule(); + + @Rule + public ClientCacheRule clientCacheRule = new ClientCacheRule(); + + @Rule + public SerializableTestName testName = new SerializableTestName(); + + @Before + public void setUp() { + server1 = getVM(0); + server2 = getVM(1); + server3 = getVM(2); + client1 = getVM(3); + + hostName = getHostName(); + uniqueName = getClass().getSimpleName() + "_" + testName.getMethodName(); + regionName = uniqueName + "_region"; + } + + @Test + public void jtaCanFailoverAfterDoneBeforeCompletion() { + server3.invoke(() -> createServerRegion(1, false)); + server3.invoke(() -> doPut(key, value)); + port1 = server1.invoke(() -> createServerRegion(1, true)); + port2 = server2.invoke(() -> createServerRegion(1, true)); + + client1.invoke(() -> createClientRegion(port1, port2)); + + Object[] beforeCompletionResults = client1.invoke(() -> doBeforeCompletion()); + + int port = (Integer) beforeCompletionResults[1]; + + if (port == port1) { + server1.invoke(() -> cacheRule.getCache().close()); + } else { + assert port == port2; + server2.invoke(() -> cacheRule.getCache().close()); + } + + client1.invoke(() -> doAfterCompletion((TransactionId) beforeCompletionResults[0], true)); + } + + private int createServerRegion(int totalNumBuckets, boolean isAccessor) throws Exception { + PartitionAttributesFactory factory = new PartitionAttributesFactory(); + factory.setTotalNumBuckets(totalNumBuckets); + if (isAccessor) { + factory.setLocalMaxMemory(0); + } + PartitionAttributes partitionAttributes = factory.create(); + cacheRule.getOrCreateCache().createRegionFactory(RegionShortcut.PARTITION) + .setPartitionAttributes(partitionAttributes).create(regionName); + + CacheServer server = cacheRule.getCache().addCacheServer(); + server.setPort(0); + server.start(); + return server.getPort(); + } + + private void createClientRegion(int... ports) { + clientCacheRule.createClientCache(); + + CacheServerTestUtil.disableShufflingOfEndpoints(); + PoolImpl pool; + try { + pool = getPool(ports); + } finally { + CacheServerTestUtil.enableShufflingOfEndpoints(); + } + + ClientRegionFactory crf = + clientCacheRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.LOCAL); + crf.setPoolName(pool.getName()); + crf.create(regionName); + + if (ports.length > 1) { + pool.acquireConnection(new ServerLocation(hostName, port1)); + } + } + + private PoolImpl getPool(int... ports) { + PoolFactory factory = PoolManager.createFactory(); + for (int port : ports) { + factory.addServer(hostName, port); + } + return (PoolImpl) factory.setReadTimeout(2000).setSocketBufferSize(1000) + .setMinConnections(4).create(uniqueName); + } + + private void doPut(int key, String value) { + cacheRule.getCache().getRegion(regionName).put(key, value); + } + + private Object[] doBeforeCompletion() { + Object[] results = new Object[2]; + InternalClientCache cache = clientCacheRule.getClientCache(); + Region region = cache.getRegion(regionName); + TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager(); + txManager.begin(); + region.put(key, newValue); + + TXStateProxyImpl txStateProxy = (TXStateProxyImpl) txManager.getTXState(); + ClientTXStateStub clientTXStateStub = (ClientTXStateStub) txStateProxy.getRealDeal(null, null); + clientTXStateStub.beforeCompletion(); + TransactionId transactionId = txManager.suspend(); + int port = clientTXStateStub.getServerAffinityLocation().getPort(); + results[0] = transactionId; + results[1] = port; + return results; + } + + private void doAfterCompletion(TransactionId transactionId, boolean isCommit) { + InternalClientCache cache = clientCacheRule.getClientCache(); + Region region = cache.getRegion(regionName); + TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager(); + txManager.resume(transactionId); + + TXStateProxyImpl txStateProxy = (TXStateProxyImpl) txManager.getTXState(); + ClientTXStateStub clientTXStateStub = (ClientTXStateStub) txStateProxy.getRealDeal(null, null); + try { + clientTXStateStub + .afterCompletion(isCommit ? Status.STATUS_COMMITTED : Status.STATUS_ROLLEDBACK); + } catch (Exception exception) { + LogService.getLogger().info("exception stack ", exception); + throw exception; + } + if (isCommit) { + assertEquals(newValue, region.get(key)); + } else { + assertEquals(value, region.get(key)); + } + } + + @Test + public void jtaCanFailoverToJTAHostAfterDoneBeforeCompletion() { + port2 = server2.invoke(() -> createServerRegion(1, false)); + server2.invoke(() -> doPut(key, value)); + port1 = server1.invoke(() -> createServerRegion(1, true)); + + client1.invoke(() -> createClientRegion(port1, port2)); + Object[] beforeCompletionResults = client1.invoke(() -> doBeforeCompletion()); + + server1.invoke(() -> cacheRule.getCache().close()); + + client1.invoke(() -> doAfterCompletion((TransactionId) beforeCompletionResults[0], true)); + } + + @Test + public void jtaCanFailoverWithRollbackAfterDoneBeforeCompletion() { + server3.invoke(() -> createServerRegion(1, false)); + server3.invoke(() -> doPut(key, value)); + port1 = server1.invoke(() -> createServerRegion(1, true)); + port2 = server2.invoke(() -> createServerRegion(1, true)); + + client1.invoke(() -> createClientRegion(port1, port2)); + + Object[] beforeCompletionResults = client1.invoke(() -> doBeforeCompletion()); + + int port = (Integer) beforeCompletionResults[1]; + + if (port == port1) { + server1.invoke(() -> cacheRule.getCache().close()); + } else { + assert port == port2; + server2.invoke(() -> cacheRule.getCache().close()); + } + + client1.invoke(() -> doAfterCompletion((TransactionId) beforeCompletionResults[0], false)); + + createClientRegion(port == port1 ? port2 : port1); + doPutTransaction(true); + } + + private void doPutTransaction(boolean isClient) { + Region region; + TXManagerImpl txManager; + if (isClient) { + InternalClientCache cache = clientCacheRule.getClientCache(); + region = cache.getRegion(regionName); + txManager = (TXManagerImpl) cache.getCacheTransactionManager(); + } else { + InternalCache cache = cacheRule.getCache(); + region = cache.getRegion(regionName); + txManager = (TXManagerImpl) cache.getCacheTransactionManager(); + + Awaitility.await().atMost(30, TimeUnit.SECONDS).pollInterval(10, TimeUnit.MILLISECONDS) + .until(() -> txManager.isHostedTXStatesEmpty()); + } + txManager.begin(); + region.put(key, newValue); + txManager.commit(); + assertEquals(newValue, region.get(key)); + } + +}
