This is an automated email from the ASF dual-hosted git repository. eshu11 pushed a commit to branch feature/GEODE-5376 in repository https://gitbox.apache.org/repos/asf/geode.git
commit e9d855841d0fb1a6bc31b1103c67d6cbaa81e9c2 Author: eshu <[email protected]> AuthorDate: Tue Jul 17 15:19:32 2018 -0700 Remove the syncRunable. --- .../org/apache/geode/internal/cache/TXState.java | 87 +++++----------------- .../cache/TXStateSynchronizationRunnable.java | 0 .../apache/geode/internal/cache/TXStateTest.java | 71 ++++-------------- 3 files changed, 35 insertions(+), 123 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java index d1f586a..f85b98d 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java @@ -24,12 +24,10 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.Executor; import java.util.concurrent.locks.ReentrantLock; import javax.transaction.Status; -import edu.umd.cs.findbugs.annotations.SuppressWarnings; import org.apache.logging.log4j.Logger; import org.apache.geode.CancelException; @@ -100,12 +98,7 @@ public class TXState implements TXStateInterface { */ private int modSerialNum; private final List<EntryEventImpl> pendingCallbacks = new ArrayList<EntryEventImpl>(); - /** - * for client/server JTA transactions we need to have a single thread handle both beforeCompletion - * and afterCompletion so that beforeCompletion can obtain locks for the afterCompletion step. - * This is that thread - */ - private volatile TXStateSynchronizationRunnable syncRunnable; + private volatile boolean beforeCompletionCalled; // Internal testing hooks private Runnable internalAfterReservation; @@ -859,10 +852,6 @@ public class TXState implements TXStateInterface { } protected void cleanup() { - cleanup(false); - } - - protected void cleanup(boolean isBeforeCompletion) { IllegalArgumentException iae = null; try { this.closed = true; @@ -916,9 +905,6 @@ public class TXState implements TXStateInterface { synchronized (this.completionGuard) { this.completionGuard.notifyAll(); } - if (this.syncRunnable != null && !isBeforeCompletion) { - this.syncRunnable.abort(); - } if (iae != null && !this.proxy.getCache().isClosed()) { throw iae; } @@ -1018,35 +1004,15 @@ public class TXState implements TXStateInterface { if (this.closed) { throw new TXManagerCancelledException(); } - TXStateSynchronizationRunnable sync = createTxStateSynchronizationRunnable(); - setSynchronizationRunnable(sync); - - Executor exec = getExecutor(); - exec.execute(sync); - sync.waitForFirstExecution(); - } - - TXStateSynchronizationRunnable createTxStateSynchronizationRunnable() { - Runnable beforeCompletion = new Runnable() { - @SuppressWarnings("synthetic-access") - public void run() { - doBeforeCompletion(); - } - }; - - return new TXStateSynchronizationRunnable(getCache().getCancelCriterion(), - beforeCompletion); - } - - Executor getExecutor() { - return getCache().getDistributionManager().getWaitingThreadPool(); - } - - private void setSynchronizationRunnable(TXStateSynchronizationRunnable synchronizationRunnable) { - syncRunnable = synchronizationRunnable; + if (beforeCompletionCalled) { + // do not re-execute beforeCompletion again + return; + } + beforeCompletionCalled = true; + doBeforeCompletion(); } - void doBeforeCompletion() { + private void doBeforeCompletion() { final long opStart = CachePerfStats.getStatTime(); this.jtaLifeTime = opStart - getBeginTime(); try { @@ -1085,13 +1051,12 @@ public class TXState implements TXStateInterface { } } } catch (CommitConflictException commitConflict) { - cleanup(true); + cleanup(); proxy.getTxMgr().noteCommitFailure(opStart, this.jtaLifeTime, this); - getSynchronizationRunnable() - .setBeforeCompletionException(new SynchronizationCommitConflictException( - LocalizedStrings.TXState_CONFLICT_DETECTED_IN_GEMFIRE_TRANSACTION_0 - .toLocalizedString(getTransactionId()), - commitConflict)); + throw new SynchronizationCommitConflictException( + LocalizedStrings.TXState_CONFLICT_DETECTED_IN_GEMFIRE_TRANSACTION_0 + .toLocalizedString(getTransactionId()), + commitConflict); } } @@ -1103,19 +1068,11 @@ public class TXState implements TXStateInterface { @Override public void afterCompletion(int status) { this.proxy.getTxMgr().setTXState(null); - - Runnable afterCompletion = new Runnable() { - @SuppressWarnings("synthetic-access") - public void run() { - doAfterCompletion(status); - } - }; // if there was a beforeCompletion call then there will be a thread // sitting in the waiting pool to execute afterCompletion. Otherwise // throw FailedSynchronizationException(). - TXStateSynchronizationRunnable sync = getSynchronizationRunnable(); - if (sync != null) { - sync.runSecondRunnable(afterCompletion); + if (wasBeforeCompletionCalled()) { + doAfterCompletion(status); } else { // rollback does not run beforeCompletion. if (status != Status.STATUS_ROLLEDBACK) { @@ -1126,11 +1083,7 @@ public class TXState implements TXStateInterface { } } - TXStateSynchronizationRunnable getSynchronizationRunnable() { - return this.syncRunnable; - } - - void doAfterCompletion(int status) { + private void doAfterCompletion(int status) { final long opStart = CachePerfStats.getStatTime(); try { switch (status) { @@ -1157,13 +1110,13 @@ public class TXState implements TXStateInterface { default: Assert.assertTrue(false, "Unknown JTA Synchronization status " + status); } - } catch (RuntimeException exception) { - getSynchronizationRunnable().setAfterCompletionException(exception); } catch (InternalGemFireError error) { - TransactionException exception = new TransactionException(error); - getSynchronizationRunnable().setAfterCompletionException(exception); + throw new TransactionException(error); } + } + boolean wasBeforeCompletionCalled() { + return beforeCompletionCalled; } void saveTXCommitMessageForClientFailover() { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java deleted file mode 100644 index e69de29..0000000 diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java index b176df4..0fee670 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java @@ -17,7 +17,6 @@ package org.apache.geode.internal.cache; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -26,115 +25,75 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import java.util.concurrent.Executor; - import javax.transaction.Status; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.apache.geode.CancelCriterion; import org.apache.geode.cache.CommitConflictException; import org.apache.geode.cache.SynchronizationCommitConflictException; import org.apache.geode.cache.TransactionDataNodeHasDepartedException; +import org.apache.geode.cache.TransactionException; import org.apache.geode.test.junit.categories.UnitTest; @Category(UnitTest.class) public class TXStateTest { - private CancelCriterion cancelCriterion; - private TXStateProxyImpl txStateProxy; private CommitConflictException exception; - private TXStateSynchronizationRunnable txStateSynch; - private SynchronizationCommitConflictException synchronizationCommitConflictException; - private RuntimeException runtimeException; private TransactionDataNodeHasDepartedException transactionDataNodeHasDepartedException; @Before public void setup() { txStateProxy = mock(TXStateProxyImpl.class); - - cancelCriterion = mock(CancelCriterion.class); exception = new CommitConflictException(""); - txStateSynch = mock(TXStateSynchronizationRunnable.class); - synchronizationCommitConflictException = new SynchronizationCommitConflictException(""); - runtimeException = new RuntimeException(); transactionDataNodeHasDepartedException = new TransactionDataNodeHasDepartedException(""); when(txStateProxy.getTxMgr()).thenReturn(mock(TXManagerImpl.class)); } - @Test - public void beforeCompletionThrowsSynchronizationCommitConflictExceptionIfBeforeCompletionExceptionIsSet() { - TXState txState = spy(new TXState(txStateProxy, true)); - - doReturn(mock(Executor.class)).when(txState).getExecutor(); - doReturn(txStateSynch).when(txState).createTxStateSynchronizationRunnable(); - doThrow(synchronizationCommitConflictException).when(txStateSynch).waitForFirstExecution(); - - assertThatThrownBy(() -> txState.beforeCompletion()) - .isSameAs(synchronizationCommitConflictException); - } @Test - public void beforeCompletionExceptionIsSetWhenDoBeforeCompletionCouldNotLockKeys() { + public void beforeCompletionThrowsIfReserveAndCheckFails() { TXState txState = spy(new TXState(txStateProxy, true)); doThrow(exception).when(txState).reserveAndCheck(); - doReturn(txStateSynch).when(txState).getSynchronizationRunnable(); - txState.doBeforeCompletion(); - verify(txStateSynch, times(1)).setBeforeCompletionException(any()); + assertThatThrownBy(() -> txState.beforeCompletion()) + .isInstanceOf(SynchronizationCommitConflictException.class); } @Test - public void afterCompletionThrowsExceptionIfAfterCompletionExceptionIsSet() { - TXState txState = spy(new TXState(txStateProxy, true)); - - doReturn(txStateSynch).when(txState).getSynchronizationRunnable(); - doThrow(runtimeException).when(txStateSynch).runSecondRunnable(any()); - - assertThatThrownBy(() -> txState.afterCompletion(Status.STATUS_COMMITTED)) - .isSameAs(runtimeException); - } - - @Test - public void afterCompletionExceptionIsSetWhenCommitFailedWithTransactionDataNodeHasDepartedException() { + public void afterCompletionThrowsIfCommitFails() { TXState txState = spy(new TXState(txStateProxy, true)); doReturn(mock(InternalCache.class)).when(txState).getCache(); - // txState.reserveAndCheck(); - doReturn(txStateSynch).when(txState).getSynchronizationRunnable(); + doReturn(true).when(txState).wasBeforeCompletionCalled(); + txState.reserveAndCheck(); doThrow(transactionDataNodeHasDepartedException).when(txState).commit(); - txState.doAfterCompletion(Status.STATUS_COMMITTED); - verify(txStateSynch, times(1)).setAfterCompletionException(any()); + assertThatThrownBy(() -> txState.afterCompletion(Status.STATUS_COMMITTED)) + .isSameAs(transactionDataNodeHasDepartedException); } @Test - public void afterCompletionExceptionIsSetToTransactionExceptionWhenCommitFailedWithCommitConflictException() { + public void afterCompletionThrowsTransactionExceptionIfCommitFailedCommitConflictException() { TXState txState = spy(new TXState(txStateProxy, true)); doReturn(mock(InternalCache.class)).when(txState).getCache(); - doReturn(txStateSynch).when(txState).getSynchronizationRunnable(); + doReturn(true).when(txState).wasBeforeCompletionCalled(); doThrow(exception).when(txState).commit(); - txState.doAfterCompletion(Status.STATUS_COMMITTED); - - verify(txStateSynch, times(1)).setAfterCompletionException(any()); - // TODO check InternalGemFireError - // TransactionException transactionException = - // (TransactionException) txState.getAfterCompletionException(); - // assertThat(transactionException.getCause()).isInstanceOf(InternalGemFireError.class); + assertThatThrownBy(() -> txState.afterCompletion(Status.STATUS_COMMITTED)) + .isInstanceOf(TransactionException.class); } - @Test public void afterCompletionCanCommitJTA() { TXState txState = spy(new TXState(txStateProxy, false)); doReturn(mock(InternalCache.class)).when(txState).getCache(); txState.reserveAndCheck(); txState.closed = true; - txState.doAfterCompletion(Status.STATUS_COMMITTED); + doReturn(true).when(txState).wasBeforeCompletionCalled(); + txState.afterCompletion(Status.STATUS_COMMITTED); assertThat(txState.locks).isNull(); verify(txState, times(1)).saveTXCommitMessageForClientFailover();
