This is an automated email from the ASF dual-hosted git repository. eshu11 pushed a commit to branch feature/GEODE-5376 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 376b0d4f4d76b3c0f6c5b0cb442ef5e882de54c6 Author: eshu <[email protected]> AuthorDate: Mon Jul 16 17:08:50 2018 -0700 wip fix review comments. --- .../org/apache/geode/internal/cache/TXState.java | 36 ++++++---------------- .../cache/TXStateSynchronizationRunnable.java | 27 ++++++++++++++++ .../apache/geode/internal/cache/TXStateTest.java | 28 ++++++++--------- 3 files changed, 50 insertions(+), 41 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java index 090d155..d1f586a 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java @@ -53,7 +53,6 @@ import org.apache.geode.cache.UnsupportedOperationInTransactionException; import org.apache.geode.cache.client.internal.ServerRegionDataAccess; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.TXManagerCancelledException; -import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.Assert; import org.apache.geode.internal.cache.control.MemoryThresholds; @@ -106,10 +105,7 @@ public class TXState implements TXStateInterface { * and afterCompletion so that beforeCompletion can obtain locks for the afterCompletion step. * This is that thread */ - protected volatile TXStateSynchronizationRunnable syncRunnable; - - private volatile SynchronizationCommitConflictException beforeCompletionException; - private volatile RuntimeException afterCompletionException; + private volatile TXStateSynchronizationRunnable syncRunnable; // Internal testing hooks private Runnable internalAfterReservation; @@ -1028,9 +1024,6 @@ public class TXState implements TXStateInterface { Executor exec = getExecutor(); exec.execute(sync); sync.waitForFirstExecution(); - if (getBeforeCompletionException() != null) { - throw getBeforeCompletionException(); - } } TXStateSynchronizationRunnable createTxStateSynchronizationRunnable() { @@ -1046,12 +1039,7 @@ public class TXState implements TXStateInterface { } Executor getExecutor() { - return InternalDistributedSystem.getConnectedInstance().getDistributionManager() - .getWaitingThreadPool(); - } - - SynchronizationCommitConflictException getBeforeCompletionException() { - return beforeCompletionException; + return getCache().getDistributionManager().getWaitingThreadPool(); } private void setSynchronizationRunnable(TXStateSynchronizationRunnable synchronizationRunnable) { @@ -1099,10 +1087,11 @@ public class TXState implements TXStateInterface { } catch (CommitConflictException commitConflict) { cleanup(true); proxy.getTxMgr().noteCommitFailure(opStart, this.jtaLifeTime, this); - beforeCompletionException = new SynchronizationCommitConflictException( - LocalizedStrings.TXState_CONFLICT_DETECTED_IN_GEMFIRE_TRANSACTION_0 - .toLocalizedString(getTransactionId()), - commitConflict); + getSynchronizationRunnable() + .setBeforeCompletionException(new SynchronizationCommitConflictException( + LocalizedStrings.TXState_CONFLICT_DETECTED_IN_GEMFIRE_TRANSACTION_0 + .toLocalizedString(getTransactionId()), + commitConflict)); } } @@ -1127,9 +1116,6 @@ public class TXState implements TXStateInterface { TXStateSynchronizationRunnable sync = getSynchronizationRunnable(); if (sync != null) { sync.runSecondRunnable(afterCompletion); - if (getAfterCompletionException() != null) { - throw getAfterCompletionException(); - } } else { // rollback does not run beforeCompletion. if (status != Status.STATUS_ROLLEDBACK) { @@ -1144,10 +1130,6 @@ public class TXState implements TXStateInterface { return this.syncRunnable; } - RuntimeException getAfterCompletionException() { - return afterCompletionException; - } - void doAfterCompletion(int status) { final long opStart = CachePerfStats.getStatTime(); try { @@ -1176,10 +1158,10 @@ public class TXState implements TXStateInterface { Assert.assertTrue(false, "Unknown JTA Synchronization status " + status); } } catch (RuntimeException exception) { - afterCompletionException = exception; + getSynchronizationRunnable().setAfterCompletionException(exception); } catch (InternalGemFireError error) { TransactionException exception = new TransactionException(error); - afterCompletionException = exception; + getSynchronizationRunnable().setAfterCompletionException(exception); } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java index 415bcdd..d0670b1 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java @@ -17,6 +17,7 @@ package org.apache.geode.internal.cache; import org.apache.logging.log4j.Logger; import org.apache.geode.CancelCriterion; +import org.apache.geode.cache.SynchronizationCommitConflictException; import org.apache.geode.internal.logging.LogService; /** @@ -40,6 +41,8 @@ public class TXStateSynchronizationRunnable implements Runnable { private final Object secondRunnableSync = new Object(); private boolean secondRunnableCompleted; + private SynchronizationCommitConflictException beforeCompletionException; + private RuntimeException afterCompletionException; private boolean abort; public TXStateSynchronizationRunnable(final CancelCriterion cancelCriterion, @@ -112,6 +115,9 @@ public class TXStateSynchronizationRunnable implements Runnable { cancelCriterion.checkCancelInProgress(null); } } + if (getBeforeCompletionException() != null) { + throw getBeforeCompletionException(); + } } /** @@ -131,6 +137,9 @@ public class TXStateSynchronizationRunnable implements Runnable { cancelCriterion.checkCancelInProgress(null); } } + if (getAfterCompletionException() != null) { + throw getAfterCompletionException(); + } } /** @@ -141,4 +150,22 @@ public class TXStateSynchronizationRunnable implements Runnable { this.abort = true; } } + + public SynchronizationCommitConflictException getBeforeCompletionException() { + return beforeCompletionException; + } + + public void setBeforeCompletionException( + SynchronizationCommitConflictException beforeCompletionException) { + this.beforeCompletionException = beforeCompletionException; + } + + + public RuntimeException getAfterCompletionException() { + return afterCompletionException; + } + + public void setAfterCompletionException(RuntimeException afterCompletionException) { + this.afterCompletionException = afterCompletionException; + } } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java index 0dce944..b176df4 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java @@ -17,6 +17,7 @@ package org.apache.geode.internal.cache; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -34,11 +35,9 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.apache.geode.CancelCriterion; -import org.apache.geode.InternalGemFireError; import org.apache.geode.cache.CommitConflictException; import org.apache.geode.cache.SynchronizationCommitConflictException; import org.apache.geode.cache.TransactionDataNodeHasDepartedException; -import org.apache.geode.cache.TransactionException; import org.apache.geode.test.junit.categories.UnitTest; @Category(UnitTest.class) @@ -72,7 +71,7 @@ public class TXStateTest { doReturn(mock(Executor.class)).when(txState).getExecutor(); doReturn(txStateSynch).when(txState).createTxStateSynchronizationRunnable(); - doReturn(synchronizationCommitConflictException).when(txState).getBeforeCompletionException(); + doThrow(synchronizationCommitConflictException).when(txStateSynch).waitForFirstExecution(); assertThatThrownBy(() -> txState.beforeCompletion()) .isSameAs(synchronizationCommitConflictException); @@ -82,10 +81,10 @@ public class TXStateTest { public void beforeCompletionExceptionIsSetWhenDoBeforeCompletionCouldNotLockKeys() { TXState txState = spy(new TXState(txStateProxy, true)); doThrow(exception).when(txState).reserveAndCheck(); + doReturn(txStateSynch).when(txState).getSynchronizationRunnable(); txState.doBeforeCompletion(); - assertThat(txState.getBeforeCompletionException()) - .isInstanceOf(SynchronizationCommitConflictException.class); + verify(txStateSynch, times(1)).setBeforeCompletionException(any()); } @@ -94,7 +93,7 @@ public class TXStateTest { TXState txState = spy(new TXState(txStateProxy, true)); doReturn(txStateSynch).when(txState).getSynchronizationRunnable(); - doReturn(runtimeException).when(txState).getAfterCompletionException(); + doThrow(runtimeException).when(txStateSynch).runSecondRunnable(any()); assertThatThrownBy(() -> txState.afterCompletion(Status.STATUS_COMMITTED)) .isSameAs(runtimeException); @@ -104,27 +103,28 @@ public class TXStateTest { public void afterCompletionExceptionIsSetWhenCommitFailedWithTransactionDataNodeHasDepartedException() { TXState txState = spy(new TXState(txStateProxy, true)); doReturn(mock(InternalCache.class)).when(txState).getCache(); - txState.reserveAndCheck(); + // txState.reserveAndCheck(); + doReturn(txStateSynch).when(txState).getSynchronizationRunnable(); doThrow(transactionDataNodeHasDepartedException).when(txState).commit(); txState.doAfterCompletion(Status.STATUS_COMMITTED); - assertThat(txState.getAfterCompletionException()) - .isSameAs(transactionDataNodeHasDepartedException); + verify(txStateSynch, times(1)).setAfterCompletionException(any()); } @Test public void afterCompletionExceptionIsSetToTransactionExceptionWhenCommitFailedWithCommitConflictException() { TXState txState = spy(new TXState(txStateProxy, true)); doReturn(mock(InternalCache.class)).when(txState).getCache(); - txState.reserveAndCheck(); + doReturn(txStateSynch).when(txState).getSynchronizationRunnable(); doThrow(exception).when(txState).commit(); txState.doAfterCompletion(Status.STATUS_COMMITTED); - assertThat(txState.getAfterCompletionException()).isInstanceOf(TransactionException.class); - TransactionException transactionException = - (TransactionException) txState.getAfterCompletionException(); - assertThat(transactionException.getCause()).isInstanceOf(InternalGemFireError.class); + verify(txStateSynch, times(1)).setAfterCompletionException(any()); + // TODO check InternalGemFireError + // TransactionException transactionException = + // (TransactionException) txState.getAfterCompletionException(); + // assertThat(transactionException.getCause()).isInstanceOf(InternalGemFireError.class); }
