This is an automated email from the ASF dual-hosted git repository. eshu11 pushed a commit to branch feature/GEODE-5624 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 4193fb719694615765fea70fd2169a1b6653c3ce Author: eshu <[email protected]> AuthorDate: Fri Aug 24 13:37:08 2018 -0700 wip -- refactor classes. --- .../ClientServerJTAFailoverDistributedTest.java | 14 +- .../internal/cache/SingleThreadJTAExecutor.java | 197 ++++++++++++++++++ .../org/apache/geode/internal/cache/TXState.java | 225 +++------------------ .../cache/TXStateSynchronizationRunnable.java | 144 ------------- .../cache/SingleThreadJTAExecutorTest.java | 109 ++++++++++ .../apache/geode/internal/cache/TXStateTest.java | 51 ++--- 6 files changed, 363 insertions(+), 377 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java index e4589fa..2ae4e9b 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java @@ -130,7 +130,8 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable { cacheRule.getOrCreateCache().createRegionFactory(RegionShortcut.PARTITION) .setPartitionAttributes(partitionAttributes).create(regionName); if (hasReplicateRegion) { - cacheRule.getOrCreateCache().createRegionFactory(RegionShortcut.REPLICATE).create(replicateRegionName); + cacheRule.getOrCreateCache().createRegionFactory(RegionShortcut.REPLICATE) + .create(replicateRegionName); } CacheServer server = cacheRule.getCache().addCacheServer(); @@ -154,7 +155,8 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable { clientCacheRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.LOCAL); crf.setPoolName(pool.getName()); crf.create(regionName); - if (hasReplicateRegion) crf.create(replicateRegionName); + if (hasReplicateRegion) + crf.create(replicateRegionName); if (ports.length > 1) { pool.acquireConnection(new ServerLocation(hostName, port1)); @@ -178,11 +180,12 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable { Object[] results = new Object[2]; InternalClientCache cache = clientCacheRule.getClientCache(); Region region = cache.getRegion(regionName); - Region replicateRegion = hasReplicateRegion? cache.getRegion(replicateRegionName) : null; + Region replicateRegion = hasReplicateRegion ? cache.getRegion(replicateRegionName) : null; TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager(); txManager.begin(); region.put(key, newValue); - if (hasReplicateRegion) replicateRegion.put(key, newValue); + if (hasReplicateRegion) + replicateRegion.put(key, newValue); TXStateProxyImpl txStateProxy = (TXStateProxyImpl) txManager.getTXState(); ClientTXStateStub clientTXStateStub = (ClientTXStateStub) txStateProxy.getRealDeal(null, null); @@ -212,7 +215,8 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable { } if (isCommit) { assertEquals(newValue, region.get(key)); - if (hasReplicateRegion) assertEquals(newValue, replicateRegion.get(key)); + if (hasReplicateRegion) + assertEquals(newValue, replicateRegion.get(key)); } else { assertEquals(value, region.get(key)); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java new file mode 100644 index 0000000..4df8ca4 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import java.util.concurrent.Executor; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.cache.SynchronizationCommitConflictException; +import org.apache.geode.internal.logging.LogService; + +/** + * TXStateSynchronizationThread manages beforeCompletion and afterCompletion calls. + * The thread should be instantiated with a Runnable that invokes beforeCompletion behavior. + * Then you must invoke executeAfterCompletion() with another Runnable that invokes afterCompletion + * behavior. + * + * @since Geode 1.6.0 + */ +public class SingleThreadJTAExecutor { + private static final Logger logger = LogService.getLogger(); + + private final Object beforeCompletionSync = new Object(); + private boolean beforeCompletionStarted; + private boolean beforeCompletionFinished; + private SynchronizationCommitConflictException beforeCompletionException; + + private final Object afterCompletionSync = new Object(); + private boolean afterCompletionStarted; + private boolean afterCompletionFinished; + private int afterCompletionStatus = -1; + private boolean afterCompletionCancelled; + private RuntimeException afterCompletionException; + + public SingleThreadJTAExecutor() {} + + void doOps(TXState txState) { + doBeforeCompletionOp(txState); + doAfterCompletionOp(txState); + } + + void doBeforeCompletionOp(TXState txState) { + synchronized (beforeCompletionSync) { + try { + txState.doBeforeCompletion(); + } catch (SynchronizationCommitConflictException exception) { + beforeCompletionException = exception; + } finally { + if (logger.isDebugEnabled()) { + logger.debug("beforeCompletion notification completed"); + } + beforeCompletionFinished = true; + beforeCompletionSync.notifyAll(); + } + } + } + + boolean isBeforeCompletionStarted() { + synchronized (beforeCompletionSync) { + return beforeCompletionStarted; + } + } + + boolean isAfterCompletionStarted() { + synchronized (afterCompletionSync) { + return afterCompletionStarted; + } + } + + boolean isBeforeCompletionFinished() { + synchronized (beforeCompletionSync) { + return beforeCompletionFinished; + } + } + + boolean isAfterCompletionFinished() { + synchronized (afterCompletionSync) { + return afterCompletionFinished; + } + } + + public void executeBeforeCompletion(TXState txState, Executor executor) { + executor.execute(() -> doOps(txState)); + + synchronized (beforeCompletionSync) { + beforeCompletionStarted = true; + while (!beforeCompletionFinished) { + try { + beforeCompletionSync.wait(1000); + } catch (InterruptedException ignore) { + // eat the interrupt and check for exit conditions + } + txState.getCache().getCancelCriterion().checkCancelInProgress(null); + } + if (getBeforeCompletionException() != null) { + throw getBeforeCompletionException(); + } + } + } + + SynchronizationCommitConflictException getBeforeCompletionException() { + return beforeCompletionException; + } + + private void doAfterCompletionOp(TXState txState) { + synchronized (afterCompletionSync) { + // there should be a transaction timeout that keeps this thread + // from sitting around forever if the client goes away + // The above was done by setting afterCompletionCancelled in txState + // during cleanup. When client departed, the transaction/JTA + // will be timed out and cleanup code will be executed. + final boolean isDebugEnabled = logger.isDebugEnabled(); + while (afterCompletionStatus == -1 && !afterCompletionCancelled) { + try { + if (isDebugEnabled) { + logger.debug("waiting for afterCompletion notification"); + } + afterCompletionSync.wait(1000); + } catch (InterruptedException ignore) { + // eat the interrupt and check for exit conditions + } + } + afterCompletionStarted = true; + if (isDebugEnabled) { + logger.debug("executing afterCompletion notification"); + } + try { + if (!afterCompletionCancelled) { + txState.doAfterCompletion(afterCompletionStatus); + } else { + txState.doCleanup(); + } + } catch (RuntimeException exception) { + afterCompletionException = exception; + } finally { + if (isDebugEnabled) { + logger.debug("afterCompletion notification completed"); + } + afterCompletionFinished = true; + afterCompletionSync.notifyAll(); + } + } + } + + public void executeAfterCompletion(TXState txState, int status) { + synchronized (afterCompletionSync) { + afterCompletionStatus = status; + afterCompletionSync.notifyAll(); + waitForAfterCompletionToFinish(txState); + if (getAfterCompletionException() != null) { + throw getAfterCompletionException(); + } + } + } + + private void waitForAfterCompletionToFinish(TXState txState) { + while (!afterCompletionFinished) { + try { + afterCompletionSync.wait(1000); + } catch (InterruptedException ignore) { + // eat the interrupt and check for exit conditions + } + txState.getCache().getCancelCriterion().checkCancelInProgress(null); + } + } + + RuntimeException getAfterCompletionException() { + return afterCompletionException; + } + + /** + * stop waiting for an afterCompletion to arrive and just exit + */ + public void cleanup(TXState txState) { + synchronized (afterCompletionSync) { + afterCompletionCancelled = true; + afterCompletionSync.notifyAll(); + waitForAfterCompletionToFinish(txState); + } + } + + public boolean shouldDoCleanup() { + return isBeforeCompletionStarted() && !isAfterCompletionStarted(); + } +} diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java index f33267e..bce1af4 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java @@ -32,7 +32,6 @@ import javax.transaction.Status; import org.apache.logging.log4j.Logger; import org.apache.geode.CancelException; -import org.apache.geode.InternalGemFireError; import org.apache.geode.SystemFailure; import org.apache.geode.cache.CommitConflictException; import org.apache.geode.cache.DiskAccessException; @@ -44,7 +43,6 @@ import org.apache.geode.cache.Region.Entry; import org.apache.geode.cache.RegionDestroyedException; import org.apache.geode.cache.SynchronizationCommitConflictException; import org.apache.geode.cache.TransactionDataRebalancedException; -import org.apache.geode.cache.TransactionException; import org.apache.geode.cache.TransactionId; import org.apache.geode.cache.TransactionWriter; import org.apache.geode.cache.TransactionWriterException; @@ -52,7 +50,6 @@ import org.apache.geode.cache.UnsupportedOperationInTransactionException; import org.apache.geode.cache.client.internal.ServerRegionDataAccess; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.TXManagerCancelledException; -import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.Assert; import org.apache.geode.internal.cache.control.MemoryThresholds; @@ -108,9 +105,7 @@ public class TXState implements TXStateInterface { * and afterCompletion so that beforeCompletion can obtain locks for the afterCompletion step. * This is that thread */ - protected volatile TXStateSynchronizationRunnable syncRunnable; - private volatile SynchronizationCommitConflictException beforeCompletionException; - private volatile RuntimeException afterCompletionException; + private final SingleThreadJTAExecutor singleThreadJTAExecutor; // Internal testing hooks private Runnable internalAfterReservation; @@ -153,6 +148,11 @@ public class TXState implements TXStateInterface { private volatile DistributedMember proxyServer; public TXState(TXStateProxy proxy, boolean onBehalfOfRemoteStub) { + this(proxy, onBehalfOfRemoteStub, new SingleThreadJTAExecutor()); + } + + public TXState(TXStateProxy proxy, boolean onBehalfOfRemoteStub, + SingleThreadJTAExecutor singleThreadJTAExecutor) { this.beginTime = CachePerfStats.getStatTime(); this.regions = new IdentityHashMap<>(); @@ -165,7 +165,7 @@ public class TXState implements TXStateInterface { this.internalAfterSend = null; this.proxy = proxy; this.onBehalfOfRemoteStub = onBehalfOfRemoteStub; - + this.singleThreadJTAExecutor = singleThreadJTAExecutor; } private boolean hasSeenEvent(EntryEventImpl event) { @@ -428,7 +428,7 @@ public class TXState implements TXStateInterface { } /* - * If there is a TransactionWriter plugged in, we need to to give it an opportunity to abort + * If there is a TransactionWriter plugged in, we need to to give it an opportunity to cleanup * the transaction. */ TransactionWriter writer = this.proxy.getTxMgr().getWriter(); @@ -868,6 +868,14 @@ public class TXState implements TXStateInterface { } protected void cleanup() { + if (singleThreadJTAExecutor.shouldDoCleanup()) { + singleThreadJTAExecutor.cleanup(this); + } else { + doCleanup(); + } + } + + protected void doCleanup() { IllegalArgumentException iae = null; try { this.closed = true; @@ -921,9 +929,7 @@ public class TXState implements TXStateInterface { synchronized (this.completionGuard) { this.completionGuard.notifyAll(); } - if (this.syncRunnable != null) { - this.syncRunnable.abort(); - } + if (iae != null && !this.proxy.getCache().isClosed()) { throw iae; } @@ -1010,75 +1016,6 @@ public class TXState implements TXStateInterface { } } -// ////////////////////////////////////////////////////////////////// -// // JTA Synchronization implementation // -// ////////////////////////////////////////////////////////////////// -// /* -// * (non-Javadoc) -// * -// * @see org.apache.geode.internal.cache.TXStateInterface#beforeCompletion() -// */ -// @Override -// public synchronized void beforeCompletion() throws SynchronizationCommitConflictException { -// if (this.closed) { -// throw new TXManagerCancelledException(); -// } -// if (beforeCompletionCalled) { -// // do not re-execute beforeCompletion again -// return; -// } -// beforeCompletionCalled = true; -// doBeforeCompletion(); -// } -// -// private void doBeforeCompletion() { -// final long opStart = CachePerfStats.getStatTime(); -// this.jtaLifeTime = opStart - getBeginTime(); -// try { -// reserveAndCheck(); -// /* -// * If there is a TransactionWriter plugged in, we need to to give it an opportunity to abort -// * the transaction. -// */ -// TransactionWriter writer = this.proxy.getTxMgr().getWriter(); -// if (writer != null) { -// try { -// // need to mark this so we don't fire again in commit -// firedWriter = true; -// TXEvent event = getEvent(); -// if (!event.hasOnlyInternalEvents()) { -// writer.beforeCommit(event); -// } -// } catch (TransactionWriterException twe) { -// throw new CommitConflictException(twe); -// } catch (VirtualMachineError err) { -// // cleanup(); this allocates objects so I don't think we can do it - that leaves the TX -// // open, but we are poison pilling so we should be ok?? -// -// SystemFailure.initiateFailure(err); -// // If this ever returns, rethrow the error. We're poisoned -// // now, so don't let this thread continue. -// throw err; -// } catch (Throwable t) { -// // Whenever you catch Error or Throwable, you must also -// // catch VirtualMachineError (see above). However, there is -// // _still_ a possibility that you are dealing with a cascading -// // error condition, so you also need to check to see if the JVM -// // is still usable: -// SystemFailure.checkFailure(); -// throw new CommitConflictException(t); -// } -// } -// } catch (CommitConflictException commitConflict) { -// cleanup(); -// proxy.getTxMgr().noteCommitFailure(opStart, this.jtaLifeTime, this); -// throw new SynchronizationCommitConflictException( -// LocalizedStrings.TXState_CONFLICT_DETECTED_IN_GEMFIRE_TRANSACTION_0 -// .toLocalizedString(getTransactionId()), -// commitConflict); -// } -// } - ////////////////////////////////////////////////////////////////// // JTA Synchronization implementation // ////////////////////////////////////////////////////////////////// @@ -1098,55 +1035,23 @@ public class TXState implements TXStateInterface { return; } beforeCompletionCalled = true; - - TXStateSynchronizationRunnable sync = createTxStateSynchronizationRunnable(); - setSynchronizationRunnable(sync); - - Executor exec = getExecutor(); - exec.execute(sync); - sync.waitForFirstExecution(); - if (getBeforeCompletionException() != null) { - throw getBeforeCompletionException(); - } - //doBeforeCompletion(); - } - - TXStateSynchronizationRunnable createTxStateSynchronizationRunnable() { - Runnable beforeCompletion = new Runnable() { - @SuppressWarnings("synthetic-access") - public void run() { - doBeforeCompletion(); - } - }; - - return new TXStateSynchronizationRunnable(getCache().getCancelCriterion(), - beforeCompletion); + singleThreadJTAExecutor.executeBeforeCompletion(this, + getExecutor()); } Executor getExecutor() { - return InternalDistributedSystem.getConnectedInstance().getDistributionManager() - .getWaitingThreadPool(); - } - - SynchronizationCommitConflictException getBeforeCompletionException() { - return beforeCompletionException; - } - - private void setSynchronizationRunnable(TXStateSynchronizationRunnable synchronizationRunnable) { - syncRunnable = synchronizationRunnable; + return getCache().getDistributionManager().getWaitingThreadPool(); } - - private void doBeforeCompletion() { + void doBeforeCompletion() { proxy.getTxMgr().setTXState(null); final long opStart = CachePerfStats.getStatTime(); this.jtaLifeTime = opStart - getBeginTime(); - try { reserveAndCheck(); /* - * If there is a TransactionWriter plugged in, we need to to give it an opportunity to abort + * If there is a TransactionWriter plugged in, we need to to give it an opportunity to cleanup * the transaction. */ TransactionWriter writer = this.proxy.getTxMgr().getWriter(); @@ -1189,28 +1094,18 @@ public class TXState implements TXStateInterface { } /* - * (non-Javadoc) - * - * @see org.apache.geode.internal.cache.TXStateInterface#afterCompletion(int) - */ + * (non-Javadoc) + * + * @see org.apache.geode.internal.cache.TXStateInterface#afterCompletion(int) + */ @Override public synchronized void afterCompletion(int status) { proxy.getTxMgr().setTXState(null); - Runnable afterCompletion = new Runnable() { - @SuppressWarnings("synthetic-access") - public void run() { - doAfterCompletion(status); - } - }; // if there was a beforeCompletion call then there will be a thread // sitting in the waiting pool to execute afterCompletion. Otherwise // throw FailedSynchronizationException(). - TXStateSynchronizationRunnable sync = getSynchronizationRunnable(); - if (sync != null) { - sync.runSecondRunnable(afterCompletion); - if (getAfterCompletionException() != null) { - throw getAfterCompletionException(); - } + if (beforeCompletionCalled) { + singleThreadJTAExecutor.executeAfterCompletion(this, status); } else { // rollback does not run beforeCompletion. if (status != Status.STATUS_ROLLEDBACK) { @@ -1221,15 +1116,7 @@ public class TXState implements TXStateInterface { } } - TXStateSynchronizationRunnable getSynchronizationRunnable() { - return this.syncRunnable; - } - - RuntimeException getAfterCompletionException() { - return afterCompletionException; - } - - private void doAfterCompletion(int status) { + void doAfterCompletion(int status) { final long opStart = CachePerfStats.getStatTime(); switch (status) { case Status.STATUS_COMMITTED: @@ -1259,60 +1146,6 @@ public class TXState implements TXStateInterface { } } -// /* -// * (non-Javadoc) -// * -// * @see org.apache.geode.internal.cache.TXStateInterface#afterCompletion(int) -// */ -// @Override -// public synchronized void afterCompletion(int status) { -// this.proxy.getTxMgr().setTXState(null); -// // For commit, beforeCompletion should be called. Otherwise -// // throw FailedSynchronizationException(). -// if (wasBeforeCompletionCalled()) { -// doAfterCompletion(status); -// } else { -// // rollback does not run beforeCompletion. -// if (status != Status.STATUS_ROLLEDBACK) { -// throw new FailedSynchronizationException( -// "Could not execute afterCompletion when beforeCompletion was not executed"); -// } -// doAfterCompletion(status); -// } -// } -// -// private void doAfterCompletion(int status) { -// final long opStart = CachePerfStats.getStatTime(); -// try { -// switch (status) { -// case Status.STATUS_COMMITTED: -// Assert.assertTrue(this.locks != null, -// "Gemfire Transaction afterCompletion called with illegal state."); -// try { -// commit(); -// saveTXCommitMessageForClientFailover(); -// } catch (CommitConflictException error) { -// Assert.assertTrue(false, "Gemfire Transaction " + getTransactionId() -// + " afterCompletion failed.due to CommitConflictException: " + error); -// } -// -// this.proxy.getTxMgr().noteCommitSuccess(opStart, this.jtaLifeTime, this); -// this.locks = null; -// break; -// case Status.STATUS_ROLLEDBACK: -// this.jtaLifeTime = opStart - getBeginTime(); -// rollback(); -// saveTXCommitMessageForClientFailover(); -// this.proxy.getTxMgr().noteRollbackSuccess(opStart, this.jtaLifeTime, this); -// break; -// default: -// Assert.assertTrue(false, "Unknown JTA Synchronization status " + status); -// } -// } catch (InternalGemFireError error) { -// throw new TransactionException(error); -// } -// } - boolean wasBeforeCompletionCalled() { return beforeCompletionCalled; } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java deleted file mode 100644 index 28f367b..0000000 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.internal.cache; - -import org.apache.logging.log4j.Logger; - -import org.apache.geode.CancelCriterion; -import org.apache.geode.internal.cache.tier.sockets.CommBufferPool; -import org.apache.geode.internal.logging.LogService; - -/** - * TXStateSynchronizationThread manages beforeCompletion and afterCompletion calls. - * The thread should be instantiated with a Runnable that invokes beforeCompletion behavior. - * Then you must invoke runSecondRunnable() with another Runnable that invokes afterCompletion - * behavior. - * - * @since Geode 1.6.0 - */ -public class TXStateSynchronizationRunnable implements Runnable { - private static final Logger logger = LogService.getLogger(); - - private final CancelCriterion cancelCriterion; - - private Runnable firstRunnable; - private final Object firstRunnableSync = new Object(); - private boolean firstRunnableCompleted; - - private Runnable secondRunnable; - private final Object secondRunnableSync = new Object(); - private boolean secondRunnableCompleted; - - private boolean abort; - - public TXStateSynchronizationRunnable(final CancelCriterion cancelCriterion, final Runnable beforeCompletion) { - this.cancelCriterion = cancelCriterion; - this.firstRunnable = beforeCompletion; - } - - @Override - public void run() { - doSynchronizationOps(); - } - - private void doSynchronizationOps() { - synchronized (this.firstRunnableSync) { - try { - this.firstRunnable.run(); - } finally { - if (logger.isTraceEnabled()) { - logger.trace("beforeCompletion notification completed"); - } - this.firstRunnableCompleted = true; - this.firstRunnable = null; - this.firstRunnableSync.notifyAll(); - } - } - synchronized (this.secondRunnableSync) { - // TODO there should be a transaction timeout that keeps this thread - // from sitting around forever if the client goes away - final boolean isTraceEnabled = logger.isTraceEnabled(); - while (this.secondRunnable == null && !this.abort) { - try { - if (isTraceEnabled) { - logger.trace("waiting for afterCompletion notification"); - } - this.secondRunnableSync.wait(1000); - } catch (InterruptedException ignore) { - // eat the interrupt and check for exit conditions - } - } - if (isTraceEnabled) { - logger.trace("executing afterCompletion notification"); - } - try { - if (!this.abort) { - this.secondRunnable.run(); - } - } finally { - if (isTraceEnabled) { - logger.trace("afterCompletion notification completed"); - } - this.secondRunnableCompleted = true; - this.secondRunnable = null; - this.secondRunnableSync.notifyAll(); - } - } - } - - /** - * wait for the initial beforeCompletion step to finish - */ - public void waitForFirstExecution() { - synchronized (this.firstRunnableSync) { - while (!this.firstRunnableCompleted) { - try { - this.firstRunnableSync.wait(1000); - } catch (InterruptedException ignore) { - // eat the interrupt and check for exit conditions - } - cancelCriterion.checkCancelInProgress(null); - } - } - } - - /** - * run the afterCompletion portion of synchronization. This method schedules execution of the - * given runnable and then waits for it to finish running - */ - public void runSecondRunnable(Runnable r) { - synchronized (this.secondRunnableSync) { - this.secondRunnable = r; - this.secondRunnableSync.notifyAll(); - while (!this.secondRunnableCompleted && !this.abort) { - try { - this.secondRunnableSync.wait(1000); - } catch (InterruptedException ignore) { - // eat the interrupt and check for exit conditions - } - cancelCriterion.checkCancelInProgress(null); - } - } - } - - /** - * stop waiting for an afterCompletion to arrive and just exit - */ - public void abort() { - synchronized (this.secondRunnableSync) { - this.abort = true; - } - } -} diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/SingleThreadJTAExecutorTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/SingleThreadJTAExecutorTest.java new file mode 100644 index 0000000..79f4324 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/SingleThreadJTAExecutorTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Java6Assertions.assertThat; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import javax.transaction.Status; + +import org.junit.Before; +import org.junit.Test; + +import org.apache.geode.cache.SynchronizationCommitConflictException; +import org.apache.geode.cache.TransactionException; + +public class SingleThreadJTAExecutorTest { + private TXState txState; + private SingleThreadJTAExecutor singleThreadJTAExecutor; + private ExecutorService executor; + + @Before + public void setup() { + txState = mock(TXState.class, RETURNS_DEEP_STUBS); + executor = Executors.newSingleThreadExecutor(); + } + + @Test + public void executeBeforeCompletionCallsDoBeforeCompletion() { + singleThreadJTAExecutor = new SingleThreadJTAExecutor(); + + singleThreadJTAExecutor.executeBeforeCompletion(txState, executor); + + verify(txState, times(1)).doBeforeCompletion(); + + assertThat(singleThreadJTAExecutor.isBeforeCompletionFinished()).isTrue(); + } + + @Test(expected = SynchronizationCommitConflictException.class) + public void executeBeforeCompletionThrowsExceptionIfBeforeCompletionFailed() { + singleThreadJTAExecutor = new SingleThreadJTAExecutor(); + doThrow(new SynchronizationCommitConflictException("")).when(txState).doBeforeCompletion(); + + singleThreadJTAExecutor.executeBeforeCompletion(txState, executor); + + verify(txState, times(1)).doBeforeCompletion(); + assertThat(singleThreadJTAExecutor.isBeforeCompletionFinished()).isTrue(); + } + + @Test + public void executeAfterCompletionCallsDoAfterCompletion() { + singleThreadJTAExecutor = new SingleThreadJTAExecutor(); + int status = Status.STATUS_COMMITTED; + + singleThreadJTAExecutor.executeBeforeCompletion(txState, executor); + singleThreadJTAExecutor.executeAfterCompletion(txState, status); + + verify(txState, times(1)).doBeforeCompletion(); + verify(txState, times(1)).doAfterCompletion(status); + assertThat(singleThreadJTAExecutor.isBeforeCompletionFinished()).isTrue(); + } + + @Test + public void executeAfterCompletionThrowsExceptionIfAfterCompletionFailed() { + singleThreadJTAExecutor = new SingleThreadJTAExecutor(); + int status = Status.STATUS_COMMITTED; + TransactionException exception = new TransactionException(""); + doThrow(exception).when(txState).doAfterCompletion(status); + + singleThreadJTAExecutor.executeBeforeCompletion(txState, executor); + + assertThatThrownBy(() -> singleThreadJTAExecutor.executeAfterCompletion(txState, status)) + .isSameAs(exception); + verify(txState, times(1)).doBeforeCompletion(); + verify(txState, times(1)).doAfterCompletion(status); + } + + @Test + public void executorThreadNoLongerWaitForAfterCompletionIfTXStateIsCleanedUp() { + singleThreadJTAExecutor = new SingleThreadJTAExecutor(); + + singleThreadJTAExecutor.executeBeforeCompletion(txState, executor); + singleThreadJTAExecutor.cleanup(txState); + + verify(txState, times(1)).doBeforeCompletion(); + assertThat(singleThreadJTAExecutor.isBeforeCompletionFinished()).isTrue(); + assertThat(singleThreadJTAExecutor.isAfterCompletionFinished()).isTrue(); + } + +} diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java index 5ec4cbd..c1e9acf 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java @@ -17,6 +17,7 @@ package org.apache.geode.internal.cache; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -26,16 +27,17 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.concurrent.Executor; + import javax.transaction.Status; import org.junit.Before; import org.junit.Test; import org.apache.geode.cache.CommitConflictException; +import org.apache.geode.cache.FailedSynchronizationException; import org.apache.geode.cache.SynchronizationCommitConflictException; import org.apache.geode.cache.TransactionDataNodeHasDepartedException; -import org.apache.geode.cache.TransactionException; -import org.apache.geode.distributed.internal.membership.InternalDistributedMember; public class TXStateTest { private TXStateProxyImpl txStateProxy; @@ -44,60 +46,54 @@ public class TXStateTest { @Before public void setup() { - txStateProxy = mock(TXStateProxyImpl.class); + txStateProxy = mock(TXStateProxyImpl.class, RETURNS_DEEP_STUBS); exception = new CommitConflictException(""); transactionDataNodeHasDepartedException = new TransactionDataNodeHasDepartedException(""); when(txStateProxy.getTxMgr()).thenReturn(mock(TXManagerImpl.class)); } - @Test - public void beforeCompletionThrowsIfReserveAndCheckFails() { + public void doBeforeCompletionThrowsIfReserveAndCheckFails() { TXState txState = spy(new TXState(txStateProxy, true)); + doReturn(mock(Executor.class)).when(txState).getExecutor(); doThrow(exception).when(txState).reserveAndCheck(); - assertThatThrownBy(() -> txState.beforeCompletion()) + assertThatThrownBy(() -> txState.doBeforeCompletion()) .isInstanceOf(SynchronizationCommitConflictException.class); } - @Test - public void afterCompletionThrowsIfCommitFails() { + public void doAfterCompletionThrowsIfCommitFails() { TXState txState = spy(new TXState(txStateProxy, true)); doReturn(mock(InternalCache.class)).when(txState).getCache(); doReturn(true).when(txState).wasBeforeCompletionCalled(); txState.reserveAndCheck(); doThrow(transactionDataNodeHasDepartedException).when(txState).commit(); - assertThatThrownBy(() -> txState.afterCompletion(Status.STATUS_COMMITTED)) + assertThatThrownBy(() -> txState.doAfterCompletion(Status.STATUS_COMMITTED)) .isSameAs(transactionDataNodeHasDepartedException); } @Test - public void afterCompletionThrowsTransactionExceptionIfCommitFailedCommitConflictException() { - TXState txState = spy(new TXState(txStateProxy, true)); - doReturn(mock(InternalCache.class)).when(txState).getCache(); - doReturn(true).when(txState).wasBeforeCompletionCalled(); - doThrow(exception).when(txState).commit(); - - assertThatThrownBy(() -> txState.afterCompletion(Status.STATUS_COMMITTED)) - .isInstanceOf(TransactionException.class); - } - - @Test - public void afterCompletionCanCommitJTA() { + public void doAfterCompletionCanCommitJTA() { TXState txState = spy(new TXState(txStateProxy, false)); doReturn(mock(InternalCache.class)).when(txState).getCache(); txState.reserveAndCheck(); txState.closed = true; doReturn(true).when(txState).wasBeforeCompletionCalled(); - txState.afterCompletion(Status.STATUS_COMMITTED); + txState.doAfterCompletion(Status.STATUS_COMMITTED); assertThat(txState.locks).isNull(); verify(txState, times(1)).saveTXCommitMessageForClientFailover(); } + @Test(expected = FailedSynchronizationException.class) + public void afterCompletionThrowsExceptionIfBeforeCompletionNotCalled() { + TXState txState = new TXState(txStateProxy, true); + txState.afterCompletion(Status.STATUS_COMMITTED); + } + @Test public void afterCompletionCanRollbackJTA() { TXState txState = spy(new TXState(txStateProxy, true)); @@ -153,16 +149,7 @@ public class TXStateTest { public void getOriginatingMemberReturnsNullIfNotOriginatedFromClient() { TXState txState = spy(new TXState(txStateProxy, false)); - assertThat(txState.getOriginatingMember()).isNull(); + assertThat(txState.getOriginatingMember()).isSameAs(txStateProxy.getOnBehalfOfClientMember()); } - @Test - public void getOriginatingMemberReturnsClientMemberIfOriginatedFromClient() { - InternalDistributedMember client = mock(InternalDistributedMember.class); - TXStateProxyImpl proxy = new TXStateProxyImpl(mock(InternalCache.class), - mock(TXManagerImpl.class), mock(TXId.class), client); - TXState txState = spy(new TXState(proxy, false)); - - assertThat(txState.getOriginatingMember()).isEqualTo(client); - } }
