This is an automated email from the ASF dual-hosted git repository. eshu11 pushed a commit to branch feature/GEODE-5376 in repository https://gitbox.apache.org/repos/asf/geode.git
commit c5bf1c7b027874b31d661f068ded61c7bd22df07 Author: eshu <[email protected]> AuthorDate: Tue Jul 17 15:19:32 2018 -0700 Remove the syncRunable. --- .../org/apache/geode/internal/cache/TXState.java | 87 +++-------- .../cache/TXStateSynchronizationRunnable.java | 171 --------------------- .../cache/TXStateSynchronizationRunnableTest.java | 90 ----------- .../apache/geode/internal/cache/TXStateTest.java | 71 ++------- 4 files changed, 35 insertions(+), 384 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java index d1f586a..f85b98d 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java @@ -24,12 +24,10 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.Executor; import java.util.concurrent.locks.ReentrantLock; import javax.transaction.Status; -import edu.umd.cs.findbugs.annotations.SuppressWarnings; import org.apache.logging.log4j.Logger; import org.apache.geode.CancelException; @@ -100,12 +98,7 @@ public class TXState implements TXStateInterface { */ private int modSerialNum; private final List<EntryEventImpl> pendingCallbacks = new ArrayList<EntryEventImpl>(); - /** - * for client/server JTA transactions we need to have a single thread handle both beforeCompletion - * and afterCompletion so that beforeCompletion can obtain locks for the afterCompletion step. - * This is that thread - */ - private volatile TXStateSynchronizationRunnable syncRunnable; + private volatile boolean beforeCompletionCalled; // Internal testing hooks private Runnable internalAfterReservation; @@ -859,10 +852,6 @@ public class TXState implements TXStateInterface { } protected void cleanup() { - cleanup(false); - } - - protected void cleanup(boolean isBeforeCompletion) { IllegalArgumentException iae = null; try { this.closed = true; @@ -916,9 +905,6 @@ public class TXState implements TXStateInterface { synchronized (this.completionGuard) { this.completionGuard.notifyAll(); } - if (this.syncRunnable != null && !isBeforeCompletion) { - this.syncRunnable.abort(); - } if (iae != null && !this.proxy.getCache().isClosed()) { throw iae; } @@ -1018,35 +1004,15 @@ public class TXState implements TXStateInterface { if (this.closed) { throw new TXManagerCancelledException(); } - TXStateSynchronizationRunnable sync = createTxStateSynchronizationRunnable(); - setSynchronizationRunnable(sync); - - Executor exec = getExecutor(); - exec.execute(sync); - sync.waitForFirstExecution(); - } - - TXStateSynchronizationRunnable createTxStateSynchronizationRunnable() { - Runnable beforeCompletion = new Runnable() { - @SuppressWarnings("synthetic-access") - public void run() { - doBeforeCompletion(); - } - }; - - return new TXStateSynchronizationRunnable(getCache().getCancelCriterion(), - beforeCompletion); - } - - Executor getExecutor() { - return getCache().getDistributionManager().getWaitingThreadPool(); - } - - private void setSynchronizationRunnable(TXStateSynchronizationRunnable synchronizationRunnable) { - syncRunnable = synchronizationRunnable; + if (beforeCompletionCalled) { + // do not re-execute beforeCompletion again + return; + } + beforeCompletionCalled = true; + doBeforeCompletion(); } - void doBeforeCompletion() { + private void doBeforeCompletion() { final long opStart = CachePerfStats.getStatTime(); this.jtaLifeTime = opStart - getBeginTime(); try { @@ -1085,13 +1051,12 @@ public class TXState implements TXStateInterface { } } } catch (CommitConflictException commitConflict) { - cleanup(true); + cleanup(); proxy.getTxMgr().noteCommitFailure(opStart, this.jtaLifeTime, this); - getSynchronizationRunnable() - .setBeforeCompletionException(new SynchronizationCommitConflictException( - LocalizedStrings.TXState_CONFLICT_DETECTED_IN_GEMFIRE_TRANSACTION_0 - .toLocalizedString(getTransactionId()), - commitConflict)); + throw new SynchronizationCommitConflictException( + LocalizedStrings.TXState_CONFLICT_DETECTED_IN_GEMFIRE_TRANSACTION_0 + .toLocalizedString(getTransactionId()), + commitConflict); } } @@ -1103,19 +1068,11 @@ public class TXState implements TXStateInterface { @Override public void afterCompletion(int status) { this.proxy.getTxMgr().setTXState(null); - - Runnable afterCompletion = new Runnable() { - @SuppressWarnings("synthetic-access") - public void run() { - doAfterCompletion(status); - } - }; // if there was a beforeCompletion call then there will be a thread // sitting in the waiting pool to execute afterCompletion. Otherwise // throw FailedSynchronizationException(). - TXStateSynchronizationRunnable sync = getSynchronizationRunnable(); - if (sync != null) { - sync.runSecondRunnable(afterCompletion); + if (wasBeforeCompletionCalled()) { + doAfterCompletion(status); } else { // rollback does not run beforeCompletion. if (status != Status.STATUS_ROLLEDBACK) { @@ -1126,11 +1083,7 @@ public class TXState implements TXStateInterface { } } - TXStateSynchronizationRunnable getSynchronizationRunnable() { - return this.syncRunnable; - } - - void doAfterCompletion(int status) { + private void doAfterCompletion(int status) { final long opStart = CachePerfStats.getStatTime(); try { switch (status) { @@ -1157,13 +1110,13 @@ public class TXState implements TXStateInterface { default: Assert.assertTrue(false, "Unknown JTA Synchronization status " + status); } - } catch (RuntimeException exception) { - getSynchronizationRunnable().setAfterCompletionException(exception); } catch (InternalGemFireError error) { - TransactionException exception = new TransactionException(error); - getSynchronizationRunnable().setAfterCompletionException(exception); + throw new TransactionException(error); } + } + boolean wasBeforeCompletionCalled() { + return beforeCompletionCalled; } void saveTXCommitMessageForClientFailover() { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java deleted file mode 100644 index d0670b1..0000000 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.internal.cache; - -import org.apache.logging.log4j.Logger; - -import org.apache.geode.CancelCriterion; -import org.apache.geode.cache.SynchronizationCommitConflictException; -import org.apache.geode.internal.logging.LogService; - -/** - * TXSynchronizationThread manages beforeCompletion and afterCompletion calls in TXState. - * The thread should be instantiated with a Runnable that invokes beforeCompletion behavior. - * Then you must invoke runSecondRunnable() with another Runnable that invokes afterCompletion - * behavior. - * - * @since Geode 1.8.0 - */ -public class TXStateSynchronizationRunnable implements Runnable { - private static final Logger logger = LogService.getLogger(); - - private final CancelCriterion cancelCriterion; - - private Runnable firstRunnable; - private final Object firstRunnableSync = new Object(); - private boolean firstRunnableCompleted; - - private Runnable secondRunnable; - private final Object secondRunnableSync = new Object(); - private boolean secondRunnableCompleted; - - private SynchronizationCommitConflictException beforeCompletionException; - private RuntimeException afterCompletionException; - private boolean abort; - - public TXStateSynchronizationRunnable(final CancelCriterion cancelCriterion, - final Runnable beforeCompletion) { - this.cancelCriterion = cancelCriterion; - this.firstRunnable = beforeCompletion; - } - - @Override - public void run() { - doSynchronizationOps(); - } - - private void doSynchronizationOps() { - synchronized (this.firstRunnableSync) { - try { - this.firstRunnable.run(); - } finally { - if (logger.isTraceEnabled()) { - logger.trace("beforeCompletion notification completed"); - } - this.firstRunnableCompleted = true; - this.firstRunnable = null; - this.firstRunnableSync.notifyAll(); - } - } - synchronized (this.secondRunnableSync) { - // TODO there should be a transaction timeout that keeps this thread - // from sitting around forever if the client goes away - final boolean isTraceEnabled = logger.isTraceEnabled(); - while (this.secondRunnable == null && !this.abort) { - try { - if (isTraceEnabled) { - logger.trace("waiting for afterCompletion notification"); - } - this.secondRunnableSync.wait(1000); - } catch (InterruptedException ignore) { - // eat the interrupt and check for exit conditions - } - } - if (isTraceEnabled) { - logger.trace("executing afterCompletion notification"); - } - try { - if (!this.abort) { - this.secondRunnable.run(); - } - } finally { - if (isTraceEnabled) { - logger.trace("afterCompletion notification completed"); - } - this.secondRunnableCompleted = true; - this.secondRunnable = null; - this.secondRunnableSync.notifyAll(); - } - } - } - - /** - * wait for the initial beforeCompletion step to finish - */ - public void waitForFirstExecution() { - synchronized (this.firstRunnableSync) { - while (!this.firstRunnableCompleted) { - try { - this.firstRunnableSync.wait(1000); - } catch (InterruptedException ignore) { - // eat the interrupt and check for exit conditions - } - cancelCriterion.checkCancelInProgress(null); - } - } - if (getBeforeCompletionException() != null) { - throw getBeforeCompletionException(); - } - } - - /** - * run the afterCompletion portion of synchronization. This method schedules execution of the - * given runnable and then waits for it to finish running - */ - public void runSecondRunnable(Runnable r) { - synchronized (this.secondRunnableSync) { - this.secondRunnable = r; - this.secondRunnableSync.notifyAll(); - while (!this.secondRunnableCompleted && !this.abort) { - try { - this.secondRunnableSync.wait(1000); - } catch (InterruptedException ignore) { - // eat the interrupt and check for exit conditions - } - cancelCriterion.checkCancelInProgress(null); - } - } - if (getAfterCompletionException() != null) { - throw getAfterCompletionException(); - } - } - - /** - * stop waiting for an afterCompletion to arrive and just exit - */ - public void abort() { - synchronized (this.secondRunnableSync) { - this.abort = true; - } - } - - public SynchronizationCommitConflictException getBeforeCompletionException() { - return beforeCompletionException; - } - - public void setBeforeCompletionException( - SynchronizationCommitConflictException beforeCompletionException) { - this.beforeCompletionException = beforeCompletionException; - } - - - public RuntimeException getAfterCompletionException() { - return afterCompletionException; - } - - public void setAfterCompletionException(RuntimeException afterCompletionException) { - this.afterCompletionException = afterCompletionException; - } -} diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnableTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnableTest.java deleted file mode 100644 index 05b811b..0000000 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnableTest.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.internal.cache; - -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import org.apache.geode.CancelCriterion; -import org.apache.geode.cache.CacheClosedException; -import org.apache.geode.test.junit.categories.UnitTest; - -@Category(UnitTest.class) -public class TXStateSynchronizationRunnableTest { - private CancelCriterion cancelCriterion; - private Runnable beforeCompletion; - private Runnable afterCompletion; - private CacheClosedException exception; - - @Before - public void setUp() { - exception = new CacheClosedException(); - - cancelCriterion = mock(CancelCriterion.class); - beforeCompletion = mock(Runnable.class); - afterCompletion = mock(Runnable.class); - } - - @Test - public void waitForFirstExecutionThrowsExceptionIfCacheClosed() { - doThrow(exception).when(cancelCriterion).checkCancelInProgress(any()); - TXStateSynchronizationRunnable runnable = - new TXStateSynchronizationRunnable(cancelCriterion, beforeCompletion); - assertThatThrownBy(() -> runnable.waitForFirstExecution()).isSameAs(exception); - } - - @Test - public void runSecondRunnableThrowsExceptionIfCacheClosed() { - doThrow(exception).when(cancelCriterion).checkCancelInProgress(any()); - TXStateSynchronizationRunnable runnable = - new TXStateSynchronizationRunnable(cancelCriterion, beforeCompletion); - assertThatThrownBy(() -> runnable.runSecondRunnable(afterCompletion)).isSameAs(exception); - } - - @Test - public void doSynchronizationOpsWaitsUntilRunSecondRunnable() { - TXStateSynchronizationRunnable runnable = - new TXStateSynchronizationRunnable(cancelCriterion, beforeCompletion); - new Thread(() -> { - runnable.runSecondRunnable(afterCompletion); - }).start(); - runnable.run(); - verify(beforeCompletion, times(1)).run(); - verify(afterCompletion, times(1)).run(); - } - - @Test - public void doSynchronizationOpsDoesNotRunSecondRunnableIfAborted() { - TXStateSynchronizationRunnable runnable = - new TXStateSynchronizationRunnable(cancelCriterion, beforeCompletion); - runnable.abort(); - new Thread(() -> { - runnable.runSecondRunnable(afterCompletion); - }).start(); - runnable.run(); - verify(beforeCompletion, times(1)).run(); - verify(afterCompletion, never()).run(); - } - -} diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java index b176df4..0fee670 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java @@ -17,7 +17,6 @@ package org.apache.geode.internal.cache; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -26,115 +25,75 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import java.util.concurrent.Executor; - import javax.transaction.Status; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.apache.geode.CancelCriterion; import org.apache.geode.cache.CommitConflictException; import org.apache.geode.cache.SynchronizationCommitConflictException; import org.apache.geode.cache.TransactionDataNodeHasDepartedException; +import org.apache.geode.cache.TransactionException; import org.apache.geode.test.junit.categories.UnitTest; @Category(UnitTest.class) public class TXStateTest { - private CancelCriterion cancelCriterion; - private TXStateProxyImpl txStateProxy; private CommitConflictException exception; - private TXStateSynchronizationRunnable txStateSynch; - private SynchronizationCommitConflictException synchronizationCommitConflictException; - private RuntimeException runtimeException; private TransactionDataNodeHasDepartedException transactionDataNodeHasDepartedException; @Before public void setup() { txStateProxy = mock(TXStateProxyImpl.class); - - cancelCriterion = mock(CancelCriterion.class); exception = new CommitConflictException(""); - txStateSynch = mock(TXStateSynchronizationRunnable.class); - synchronizationCommitConflictException = new SynchronizationCommitConflictException(""); - runtimeException = new RuntimeException(); transactionDataNodeHasDepartedException = new TransactionDataNodeHasDepartedException(""); when(txStateProxy.getTxMgr()).thenReturn(mock(TXManagerImpl.class)); } - @Test - public void beforeCompletionThrowsSynchronizationCommitConflictExceptionIfBeforeCompletionExceptionIsSet() { - TXState txState = spy(new TXState(txStateProxy, true)); - - doReturn(mock(Executor.class)).when(txState).getExecutor(); - doReturn(txStateSynch).when(txState).createTxStateSynchronizationRunnable(); - doThrow(synchronizationCommitConflictException).when(txStateSynch).waitForFirstExecution(); - - assertThatThrownBy(() -> txState.beforeCompletion()) - .isSameAs(synchronizationCommitConflictException); - } @Test - public void beforeCompletionExceptionIsSetWhenDoBeforeCompletionCouldNotLockKeys() { + public void beforeCompletionThrowsIfReserveAndCheckFails() { TXState txState = spy(new TXState(txStateProxy, true)); doThrow(exception).when(txState).reserveAndCheck(); - doReturn(txStateSynch).when(txState).getSynchronizationRunnable(); - txState.doBeforeCompletion(); - verify(txStateSynch, times(1)).setBeforeCompletionException(any()); + assertThatThrownBy(() -> txState.beforeCompletion()) + .isInstanceOf(SynchronizationCommitConflictException.class); } @Test - public void afterCompletionThrowsExceptionIfAfterCompletionExceptionIsSet() { - TXState txState = spy(new TXState(txStateProxy, true)); - - doReturn(txStateSynch).when(txState).getSynchronizationRunnable(); - doThrow(runtimeException).when(txStateSynch).runSecondRunnable(any()); - - assertThatThrownBy(() -> txState.afterCompletion(Status.STATUS_COMMITTED)) - .isSameAs(runtimeException); - } - - @Test - public void afterCompletionExceptionIsSetWhenCommitFailedWithTransactionDataNodeHasDepartedException() { + public void afterCompletionThrowsIfCommitFails() { TXState txState = spy(new TXState(txStateProxy, true)); doReturn(mock(InternalCache.class)).when(txState).getCache(); - // txState.reserveAndCheck(); - doReturn(txStateSynch).when(txState).getSynchronizationRunnable(); + doReturn(true).when(txState).wasBeforeCompletionCalled(); + txState.reserveAndCheck(); doThrow(transactionDataNodeHasDepartedException).when(txState).commit(); - txState.doAfterCompletion(Status.STATUS_COMMITTED); - verify(txStateSynch, times(1)).setAfterCompletionException(any()); + assertThatThrownBy(() -> txState.afterCompletion(Status.STATUS_COMMITTED)) + .isSameAs(transactionDataNodeHasDepartedException); } @Test - public void afterCompletionExceptionIsSetToTransactionExceptionWhenCommitFailedWithCommitConflictException() { + public void afterCompletionThrowsTransactionExceptionIfCommitFailedCommitConflictException() { TXState txState = spy(new TXState(txStateProxy, true)); doReturn(mock(InternalCache.class)).when(txState).getCache(); - doReturn(txStateSynch).when(txState).getSynchronizationRunnable(); + doReturn(true).when(txState).wasBeforeCompletionCalled(); doThrow(exception).when(txState).commit(); - txState.doAfterCompletion(Status.STATUS_COMMITTED); - - verify(txStateSynch, times(1)).setAfterCompletionException(any()); - // TODO check InternalGemFireError - // TransactionException transactionException = - // (TransactionException) txState.getAfterCompletionException(); - // assertThat(transactionException.getCause()).isInstanceOf(InternalGemFireError.class); + assertThatThrownBy(() -> txState.afterCompletion(Status.STATUS_COMMITTED)) + .isInstanceOf(TransactionException.class); } - @Test public void afterCompletionCanCommitJTA() { TXState txState = spy(new TXState(txStateProxy, false)); doReturn(mock(InternalCache.class)).when(txState).getCache(); txState.reserveAndCheck(); txState.closed = true; - txState.doAfterCompletion(Status.STATUS_COMMITTED); + doReturn(true).when(txState).wasBeforeCompletionCalled(); + txState.afterCompletion(Status.STATUS_COMMITTED); assertThat(txState.locks).isNull(); verify(txState, times(1)).saveTXCommitMessageForClientFailover();
