This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch feature/GEODE-5376
in repository https://gitbox.apache.org/repos/asf/geode.git

commit e9d855841d0fb1a6bc31b1103c67d6cbaa81e9c2
Author: eshu <[email protected]>
AuthorDate: Tue Jul 17 15:19:32 2018 -0700

    Remove the syncRunable.
---
 .../org/apache/geode/internal/cache/TXState.java   | 87 +++++-----------------
 .../cache/TXStateSynchronizationRunnable.java      |  0
 .../apache/geode/internal/cache/TXStateTest.java   | 71 ++++--------------
 3 files changed, 35 insertions(+), 123 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
index d1f586a..f85b98d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
@@ -24,12 +24,10 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Executor;
 import java.util.concurrent.locks.ReentrantLock;
 
 import javax.transaction.Status;
 
-import edu.umd.cs.findbugs.annotations.SuppressWarnings;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.CancelException;
@@ -100,12 +98,7 @@ public class TXState implements TXStateInterface {
    */
   private int modSerialNum;
   private final List<EntryEventImpl> pendingCallbacks = new 
ArrayList<EntryEventImpl>();
-  /**
-   * for client/server JTA transactions we need to have a single thread handle 
both beforeCompletion
-   * and afterCompletion so that beforeCompletion can obtain locks for the 
afterCompletion step.
-   * This is that thread
-   */
-  private volatile TXStateSynchronizationRunnable syncRunnable;
+  private volatile boolean beforeCompletionCalled;
 
   // Internal testing hooks
   private Runnable internalAfterReservation;
@@ -859,10 +852,6 @@ public class TXState implements TXStateInterface {
   }
 
   protected void cleanup() {
-    cleanup(false);
-  }
-
-  protected void cleanup(boolean isBeforeCompletion) {
     IllegalArgumentException iae = null;
     try {
       this.closed = true;
@@ -916,9 +905,6 @@ public class TXState implements TXStateInterface {
       synchronized (this.completionGuard) {
         this.completionGuard.notifyAll();
       }
-      if (this.syncRunnable != null && !isBeforeCompletion) {
-        this.syncRunnable.abort();
-      }
       if (iae != null && !this.proxy.getCache().isClosed()) {
         throw iae;
       }
@@ -1018,35 +1004,15 @@ public class TXState implements TXStateInterface {
     if (this.closed) {
       throw new TXManagerCancelledException();
     }
-    TXStateSynchronizationRunnable sync = 
createTxStateSynchronizationRunnable();
-    setSynchronizationRunnable(sync);
-
-    Executor exec = getExecutor();
-    exec.execute(sync);
-    sync.waitForFirstExecution();
-  }
-
-  TXStateSynchronizationRunnable createTxStateSynchronizationRunnable() {
-    Runnable beforeCompletion = new Runnable() {
-      @SuppressWarnings("synthetic-access")
-      public void run() {
-        doBeforeCompletion();
-      }
-    };
-
-    return new TXStateSynchronizationRunnable(getCache().getCancelCriterion(),
-        beforeCompletion);
-  }
-
-  Executor getExecutor() {
-    return getCache().getDistributionManager().getWaitingThreadPool();
-  }
-
-  private void setSynchronizationRunnable(TXStateSynchronizationRunnable 
synchronizationRunnable) {
-    syncRunnable = synchronizationRunnable;
+    if (beforeCompletionCalled) {
+      // do not re-execute beforeCompletion again
+      return;
+    }
+    beforeCompletionCalled = true;
+    doBeforeCompletion();
   }
 
-  void doBeforeCompletion() {
+  private void doBeforeCompletion() {
     final long opStart = CachePerfStats.getStatTime();
     this.jtaLifeTime = opStart - getBeginTime();
     try {
@@ -1085,13 +1051,12 @@ public class TXState implements TXStateInterface {
         }
       }
     } catch (CommitConflictException commitConflict) {
-      cleanup(true);
+      cleanup();
       proxy.getTxMgr().noteCommitFailure(opStart, this.jtaLifeTime, this);
-      getSynchronizationRunnable()
-          .setBeforeCompletionException(new 
SynchronizationCommitConflictException(
-              
LocalizedStrings.TXState_CONFLICT_DETECTED_IN_GEMFIRE_TRANSACTION_0
-                  .toLocalizedString(getTransactionId()),
-              commitConflict));
+      throw new SynchronizationCommitConflictException(
+          LocalizedStrings.TXState_CONFLICT_DETECTED_IN_GEMFIRE_TRANSACTION_0
+              .toLocalizedString(getTransactionId()),
+          commitConflict);
     }
   }
 
@@ -1103,19 +1068,11 @@ public class TXState implements TXStateInterface {
   @Override
   public void afterCompletion(int status) {
     this.proxy.getTxMgr().setTXState(null);
-
-    Runnable afterCompletion = new Runnable() {
-      @SuppressWarnings("synthetic-access")
-      public void run() {
-        doAfterCompletion(status);
-      }
-    };
     // if there was a beforeCompletion call then there will be a thread
     // sitting in the waiting pool to execute afterCompletion. Otherwise
     // throw FailedSynchronizationException().
-    TXStateSynchronizationRunnable sync = getSynchronizationRunnable();
-    if (sync != null) {
-      sync.runSecondRunnable(afterCompletion);
+    if (wasBeforeCompletionCalled()) {
+      doAfterCompletion(status);
     } else {
       // rollback does not run beforeCompletion.
       if (status != Status.STATUS_ROLLEDBACK) {
@@ -1126,11 +1083,7 @@ public class TXState implements TXStateInterface {
     }
   }
 
-  TXStateSynchronizationRunnable getSynchronizationRunnable() {
-    return this.syncRunnable;
-  }
-
-  void doAfterCompletion(int status) {
+  private void doAfterCompletion(int status) {
     final long opStart = CachePerfStats.getStatTime();
     try {
       switch (status) {
@@ -1157,13 +1110,13 @@ public class TXState implements TXStateInterface {
         default:
           Assert.assertTrue(false, "Unknown JTA Synchronization status " + 
status);
       }
-    } catch (RuntimeException exception) {
-      getSynchronizationRunnable().setAfterCompletionException(exception);
     } catch (InternalGemFireError error) {
-      TransactionException exception = new TransactionException(error);
-      getSynchronizationRunnable().setAfterCompletionException(exception);
+      throw new TransactionException(error);
     }
+  }
 
+  boolean wasBeforeCompletionCalled() {
+    return beforeCompletionCalled;
   }
 
   void saveTXCommitMessageForClientFailover() {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java
deleted file mode 100644
index e69de29..0000000
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java 
b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
index b176df4..0fee670 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
@@ -17,7 +17,6 @@ package org.apache.geode.internal.cache;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
@@ -26,115 +25,75 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import java.util.concurrent.Executor;
-
 import javax.transaction.Status;
 
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import org.apache.geode.CancelCriterion;
 import org.apache.geode.cache.CommitConflictException;
 import org.apache.geode.cache.SynchronizationCommitConflictException;
 import org.apache.geode.cache.TransactionDataNodeHasDepartedException;
+import org.apache.geode.cache.TransactionException;
 import org.apache.geode.test.junit.categories.UnitTest;
 
 @Category(UnitTest.class)
 public class TXStateTest {
-  private CancelCriterion cancelCriterion;
-
   private TXStateProxyImpl txStateProxy;
   private CommitConflictException exception;
-  private TXStateSynchronizationRunnable txStateSynch;
-  private SynchronizationCommitConflictException 
synchronizationCommitConflictException;
-  private RuntimeException runtimeException;
   private TransactionDataNodeHasDepartedException 
transactionDataNodeHasDepartedException;
 
   @Before
   public void setup() {
     txStateProxy = mock(TXStateProxyImpl.class);
-
-    cancelCriterion = mock(CancelCriterion.class);
     exception = new CommitConflictException("");
-    txStateSynch = mock(TXStateSynchronizationRunnable.class);
-    synchronizationCommitConflictException = new 
SynchronizationCommitConflictException("");
-    runtimeException = new RuntimeException();
     transactionDataNodeHasDepartedException = new 
TransactionDataNodeHasDepartedException("");
 
     when(txStateProxy.getTxMgr()).thenReturn(mock(TXManagerImpl.class));
   }
 
-  @Test
-  public void 
beforeCompletionThrowsSynchronizationCommitConflictExceptionIfBeforeCompletionExceptionIsSet()
 {
-    TXState txState = spy(new TXState(txStateProxy, true));
-
-    doReturn(mock(Executor.class)).when(txState).getExecutor();
-    
doReturn(txStateSynch).when(txState).createTxStateSynchronizationRunnable();
-    
doThrow(synchronizationCommitConflictException).when(txStateSynch).waitForFirstExecution();
-
-    assertThatThrownBy(() -> txState.beforeCompletion())
-        .isSameAs(synchronizationCommitConflictException);
-  }
 
   @Test
-  public void 
beforeCompletionExceptionIsSetWhenDoBeforeCompletionCouldNotLockKeys() {
+  public void beforeCompletionThrowsIfReserveAndCheckFails() {
     TXState txState = spy(new TXState(txStateProxy, true));
     doThrow(exception).when(txState).reserveAndCheck();
-    doReturn(txStateSynch).when(txState).getSynchronizationRunnable();
 
-    txState.doBeforeCompletion();
-    verify(txStateSynch, times(1)).setBeforeCompletionException(any());
+    assertThatThrownBy(() -> txState.beforeCompletion())
+        .isInstanceOf(SynchronizationCommitConflictException.class);
   }
 
 
   @Test
-  public void afterCompletionThrowsExceptionIfAfterCompletionExceptionIsSet() {
-    TXState txState = spy(new TXState(txStateProxy, true));
-
-    doReturn(txStateSynch).when(txState).getSynchronizationRunnable();
-    doThrow(runtimeException).when(txStateSynch).runSecondRunnable(any());
-
-    assertThatThrownBy(() -> txState.afterCompletion(Status.STATUS_COMMITTED))
-        .isSameAs(runtimeException);
-  }
-
-  @Test
-  public void 
afterCompletionExceptionIsSetWhenCommitFailedWithTransactionDataNodeHasDepartedException()
 {
+  public void afterCompletionThrowsIfCommitFails() {
     TXState txState = spy(new TXState(txStateProxy, true));
     doReturn(mock(InternalCache.class)).when(txState).getCache();
-    // txState.reserveAndCheck();
-    doReturn(txStateSynch).when(txState).getSynchronizationRunnable();
+    doReturn(true).when(txState).wasBeforeCompletionCalled();
+    txState.reserveAndCheck();
     doThrow(transactionDataNodeHasDepartedException).when(txState).commit();
 
-    txState.doAfterCompletion(Status.STATUS_COMMITTED);
-    verify(txStateSynch, times(1)).setAfterCompletionException(any());
+    assertThatThrownBy(() -> txState.afterCompletion(Status.STATUS_COMMITTED))
+        .isSameAs(transactionDataNodeHasDepartedException);
   }
 
   @Test
-  public void 
afterCompletionExceptionIsSetToTransactionExceptionWhenCommitFailedWithCommitConflictException()
 {
+  public void 
afterCompletionThrowsTransactionExceptionIfCommitFailedCommitConflictException()
 {
     TXState txState = spy(new TXState(txStateProxy, true));
     doReturn(mock(InternalCache.class)).when(txState).getCache();
-    doReturn(txStateSynch).when(txState).getSynchronizationRunnable();
+    doReturn(true).when(txState).wasBeforeCompletionCalled();
     doThrow(exception).when(txState).commit();
 
-    txState.doAfterCompletion(Status.STATUS_COMMITTED);
-
-    verify(txStateSynch, times(1)).setAfterCompletionException(any());
-    // TODO check InternalGemFireError
-    // TransactionException transactionException =
-    // (TransactionException) txState.getAfterCompletionException();
-    // 
assertThat(transactionException.getCause()).isInstanceOf(InternalGemFireError.class);
+    assertThatThrownBy(() -> txState.afterCompletion(Status.STATUS_COMMITTED))
+        .isInstanceOf(TransactionException.class);
   }
 
-
   @Test
   public void afterCompletionCanCommitJTA() {
     TXState txState = spy(new TXState(txStateProxy, false));
     doReturn(mock(InternalCache.class)).when(txState).getCache();
     txState.reserveAndCheck();
     txState.closed = true;
-    txState.doAfterCompletion(Status.STATUS_COMMITTED);
+    doReturn(true).when(txState).wasBeforeCompletionCalled();
+    txState.afterCompletion(Status.STATUS_COMMITTED);
 
     assertThat(txState.locks).isNull();
     verify(txState, times(1)).saveTXCommitMessageForClientFailover();

Reply via email to