This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch feature/GEODE-5376
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 4919e3bb706e041ddbafb1bf6d880e9c6a8f3d16
Author: eshu <[email protected]>
AuthorDate: Tue Jul 17 15:19:32 2018 -0700

    Remove the syncRunable.
---
 .../org/apache/geode/internal/cache/TXState.java   |  87 +++--------
 .../cache/TXStateSynchronizationRunnable.java      | 171 ---------------------
 .../cache/TXStateSynchronizationRunnableTest.java  |  90 -----------
 .../apache/geode/internal/cache/TXStateTest.java   |  71 ++-------
 4 files changed, 35 insertions(+), 384 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
index d1f586a..f85b98d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
@@ -24,12 +24,10 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Executor;
 import java.util.concurrent.locks.ReentrantLock;
 
 import javax.transaction.Status;
 
-import edu.umd.cs.findbugs.annotations.SuppressWarnings;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.CancelException;
@@ -100,12 +98,7 @@ public class TXState implements TXStateInterface {
    */
   private int modSerialNum;
   private final List<EntryEventImpl> pendingCallbacks = new 
ArrayList<EntryEventImpl>();
-  /**
-   * for client/server JTA transactions we need to have a single thread handle 
both beforeCompletion
-   * and afterCompletion so that beforeCompletion can obtain locks for the 
afterCompletion step.
-   * This is that thread
-   */
-  private volatile TXStateSynchronizationRunnable syncRunnable;
+  private volatile boolean beforeCompletionCalled;
 
   // Internal testing hooks
   private Runnable internalAfterReservation;
@@ -859,10 +852,6 @@ public class TXState implements TXStateInterface {
   }
 
   protected void cleanup() {
-    cleanup(false);
-  }
-
-  protected void cleanup(boolean isBeforeCompletion) {
     IllegalArgumentException iae = null;
     try {
       this.closed = true;
@@ -916,9 +905,6 @@ public class TXState implements TXStateInterface {
       synchronized (this.completionGuard) {
         this.completionGuard.notifyAll();
       }
-      if (this.syncRunnable != null && !isBeforeCompletion) {
-        this.syncRunnable.abort();
-      }
       if (iae != null && !this.proxy.getCache().isClosed()) {
         throw iae;
       }
@@ -1018,35 +1004,15 @@ public class TXState implements TXStateInterface {
     if (this.closed) {
       throw new TXManagerCancelledException();
     }
-    TXStateSynchronizationRunnable sync = 
createTxStateSynchronizationRunnable();
-    setSynchronizationRunnable(sync);
-
-    Executor exec = getExecutor();
-    exec.execute(sync);
-    sync.waitForFirstExecution();
-  }
-
-  TXStateSynchronizationRunnable createTxStateSynchronizationRunnable() {
-    Runnable beforeCompletion = new Runnable() {
-      @SuppressWarnings("synthetic-access")
-      public void run() {
-        doBeforeCompletion();
-      }
-    };
-
-    return new TXStateSynchronizationRunnable(getCache().getCancelCriterion(),
-        beforeCompletion);
-  }
-
-  Executor getExecutor() {
-    return getCache().getDistributionManager().getWaitingThreadPool();
-  }
-
-  private void setSynchronizationRunnable(TXStateSynchronizationRunnable 
synchronizationRunnable) {
-    syncRunnable = synchronizationRunnable;
+    if (beforeCompletionCalled) {
+      // do not re-execute beforeCompletion again
+      return;
+    }
+    beforeCompletionCalled = true;
+    doBeforeCompletion();
   }
 
-  void doBeforeCompletion() {
+  private void doBeforeCompletion() {
     final long opStart = CachePerfStats.getStatTime();
     this.jtaLifeTime = opStart - getBeginTime();
     try {
@@ -1085,13 +1051,12 @@ public class TXState implements TXStateInterface {
         }
       }
     } catch (CommitConflictException commitConflict) {
-      cleanup(true);
+      cleanup();
       proxy.getTxMgr().noteCommitFailure(opStart, this.jtaLifeTime, this);
-      getSynchronizationRunnable()
-          .setBeforeCompletionException(new 
SynchronizationCommitConflictException(
-              
LocalizedStrings.TXState_CONFLICT_DETECTED_IN_GEMFIRE_TRANSACTION_0
-                  .toLocalizedString(getTransactionId()),
-              commitConflict));
+      throw new SynchronizationCommitConflictException(
+          LocalizedStrings.TXState_CONFLICT_DETECTED_IN_GEMFIRE_TRANSACTION_0
+              .toLocalizedString(getTransactionId()),
+          commitConflict);
     }
   }
 
@@ -1103,19 +1068,11 @@ public class TXState implements TXStateInterface {
   @Override
   public void afterCompletion(int status) {
     this.proxy.getTxMgr().setTXState(null);
-
-    Runnable afterCompletion = new Runnable() {
-      @SuppressWarnings("synthetic-access")
-      public void run() {
-        doAfterCompletion(status);
-      }
-    };
     // if there was a beforeCompletion call then there will be a thread
     // sitting in the waiting pool to execute afterCompletion. Otherwise
     // throw FailedSynchronizationException().
-    TXStateSynchronizationRunnable sync = getSynchronizationRunnable();
-    if (sync != null) {
-      sync.runSecondRunnable(afterCompletion);
+    if (wasBeforeCompletionCalled()) {
+      doAfterCompletion(status);
     } else {
       // rollback does not run beforeCompletion.
       if (status != Status.STATUS_ROLLEDBACK) {
@@ -1126,11 +1083,7 @@ public class TXState implements TXStateInterface {
     }
   }
 
-  TXStateSynchronizationRunnable getSynchronizationRunnable() {
-    return this.syncRunnable;
-  }
-
-  void doAfterCompletion(int status) {
+  private void doAfterCompletion(int status) {
     final long opStart = CachePerfStats.getStatTime();
     try {
       switch (status) {
@@ -1157,13 +1110,13 @@ public class TXState implements TXStateInterface {
         default:
           Assert.assertTrue(false, "Unknown JTA Synchronization status " + 
status);
       }
-    } catch (RuntimeException exception) {
-      getSynchronizationRunnable().setAfterCompletionException(exception);
     } catch (InternalGemFireError error) {
-      TransactionException exception = new TransactionException(error);
-      getSynchronizationRunnable().setAfterCompletionException(exception);
+      throw new TransactionException(error);
     }
+  }
 
+  boolean wasBeforeCompletionCalled() {
+    return beforeCompletionCalled;
   }
 
   void saveTXCommitMessageForClientFailover() {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java
deleted file mode 100644
index d0670b1..0000000
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
- * agreements. See the NOTICE file distributed with this work for additional 
information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the 
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
- * or implied. See the License for the specific language governing permissions 
and limitations under
- * the License.
- */
-package org.apache.geode.internal.cache;
-
-import org.apache.logging.log4j.Logger;
-
-import org.apache.geode.CancelCriterion;
-import org.apache.geode.cache.SynchronizationCommitConflictException;
-import org.apache.geode.internal.logging.LogService;
-
-/**
- * TXSynchronizationThread manages beforeCompletion and afterCompletion calls 
in TXState.
- * The thread should be instantiated with a Runnable that invokes 
beforeCompletion behavior.
- * Then you must invoke runSecondRunnable() with another Runnable that invokes 
afterCompletion
- * behavior.
- *
- * @since Geode 1.8.0
- */
-public class TXStateSynchronizationRunnable implements Runnable {
-  private static final Logger logger = LogService.getLogger();
-
-  private final CancelCriterion cancelCriterion;
-
-  private Runnable firstRunnable;
-  private final Object firstRunnableSync = new Object();
-  private boolean firstRunnableCompleted;
-
-  private Runnable secondRunnable;
-  private final Object secondRunnableSync = new Object();
-  private boolean secondRunnableCompleted;
-
-  private SynchronizationCommitConflictException beforeCompletionException;
-  private RuntimeException afterCompletionException;
-  private boolean abort;
-
-  public TXStateSynchronizationRunnable(final CancelCriterion cancelCriterion,
-      final Runnable beforeCompletion) {
-    this.cancelCriterion = cancelCriterion;
-    this.firstRunnable = beforeCompletion;
-  }
-
-  @Override
-  public void run() {
-    doSynchronizationOps();
-  }
-
-  private void doSynchronizationOps() {
-    synchronized (this.firstRunnableSync) {
-      try {
-        this.firstRunnable.run();
-      } finally {
-        if (logger.isTraceEnabled()) {
-          logger.trace("beforeCompletion notification completed");
-        }
-        this.firstRunnableCompleted = true;
-        this.firstRunnable = null;
-        this.firstRunnableSync.notifyAll();
-      }
-    }
-    synchronized (this.secondRunnableSync) {
-      // TODO there should be a transaction timeout that keeps this thread
-      // from sitting around forever if the client goes away
-      final boolean isTraceEnabled = logger.isTraceEnabled();
-      while (this.secondRunnable == null && !this.abort) {
-        try {
-          if (isTraceEnabled) {
-            logger.trace("waiting for afterCompletion notification");
-          }
-          this.secondRunnableSync.wait(1000);
-        } catch (InterruptedException ignore) {
-          // eat the interrupt and check for exit conditions
-        }
-      }
-      if (isTraceEnabled) {
-        logger.trace("executing afterCompletion notification");
-      }
-      try {
-        if (!this.abort) {
-          this.secondRunnable.run();
-        }
-      } finally {
-        if (isTraceEnabled) {
-          logger.trace("afterCompletion notification completed");
-        }
-        this.secondRunnableCompleted = true;
-        this.secondRunnable = null;
-        this.secondRunnableSync.notifyAll();
-      }
-    }
-  }
-
-  /**
-   * wait for the initial beforeCompletion step to finish
-   */
-  public void waitForFirstExecution() {
-    synchronized (this.firstRunnableSync) {
-      while (!this.firstRunnableCompleted) {
-        try {
-          this.firstRunnableSync.wait(1000);
-        } catch (InterruptedException ignore) {
-          // eat the interrupt and check for exit conditions
-        }
-        cancelCriterion.checkCancelInProgress(null);
-      }
-    }
-    if (getBeforeCompletionException() != null) {
-      throw getBeforeCompletionException();
-    }
-  }
-
-  /**
-   * run the afterCompletion portion of synchronization. This method schedules 
execution of the
-   * given runnable and then waits for it to finish running
-   */
-  public void runSecondRunnable(Runnable r) {
-    synchronized (this.secondRunnableSync) {
-      this.secondRunnable = r;
-      this.secondRunnableSync.notifyAll();
-      while (!this.secondRunnableCompleted && !this.abort) {
-        try {
-          this.secondRunnableSync.wait(1000);
-        } catch (InterruptedException ignore) {
-          // eat the interrupt and check for exit conditions
-        }
-        cancelCriterion.checkCancelInProgress(null);
-      }
-    }
-    if (getAfterCompletionException() != null) {
-      throw getAfterCompletionException();
-    }
-  }
-
-  /**
-   * stop waiting for an afterCompletion to arrive and just exit
-   */
-  public void abort() {
-    synchronized (this.secondRunnableSync) {
-      this.abort = true;
-    }
-  }
-
-  public SynchronizationCommitConflictException getBeforeCompletionException() 
{
-    return beforeCompletionException;
-  }
-
-  public void setBeforeCompletionException(
-      SynchronizationCommitConflictException beforeCompletionException) {
-    this.beforeCompletionException = beforeCompletionException;
-  }
-
-
-  public RuntimeException getAfterCompletionException() {
-    return afterCompletionException;
-  }
-
-  public void setAfterCompletionException(RuntimeException 
afterCompletionException) {
-    this.afterCompletionException = afterCompletionException;
-  }
-}
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnableTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnableTest.java
deleted file mode 100644
index 05b811b..0000000
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnableTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
- * agreements. See the NOTICE file distributed with this work for additional 
information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the 
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
- * or implied. See the License for the specific language governing permissions 
and limitations under
- * the License.
- */
-package org.apache.geode.internal.cache;
-
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.geode.CancelCriterion;
-import org.apache.geode.cache.CacheClosedException;
-import org.apache.geode.test.junit.categories.UnitTest;
-
-@Category(UnitTest.class)
-public class TXStateSynchronizationRunnableTest {
-  private CancelCriterion cancelCriterion;
-  private Runnable beforeCompletion;
-  private Runnable afterCompletion;
-  private CacheClosedException exception;
-
-  @Before
-  public void setUp() {
-    exception = new CacheClosedException();
-
-    cancelCriterion = mock(CancelCriterion.class);
-    beforeCompletion = mock(Runnable.class);
-    afterCompletion = mock(Runnable.class);
-  }
-
-  @Test
-  public void waitForFirstExecutionThrowsExceptionIfCacheClosed() {
-    doThrow(exception).when(cancelCriterion).checkCancelInProgress(any());
-    TXStateSynchronizationRunnable runnable =
-        new TXStateSynchronizationRunnable(cancelCriterion, beforeCompletion);
-    assertThatThrownBy(() -> 
runnable.waitForFirstExecution()).isSameAs(exception);
-  }
-
-  @Test
-  public void runSecondRunnableThrowsExceptionIfCacheClosed() {
-    doThrow(exception).when(cancelCriterion).checkCancelInProgress(any());
-    TXStateSynchronizationRunnable runnable =
-        new TXStateSynchronizationRunnable(cancelCriterion, beforeCompletion);
-    assertThatThrownBy(() -> 
runnable.runSecondRunnable(afterCompletion)).isSameAs(exception);
-  }
-
-  @Test
-  public void doSynchronizationOpsWaitsUntilRunSecondRunnable() {
-    TXStateSynchronizationRunnable runnable =
-        new TXStateSynchronizationRunnable(cancelCriterion, beforeCompletion);
-    new Thread(() -> {
-      runnable.runSecondRunnable(afterCompletion);
-    }).start();
-    runnable.run();
-    verify(beforeCompletion, times(1)).run();
-    verify(afterCompletion, times(1)).run();
-  }
-
-  @Test
-  public void doSynchronizationOpsDoesNotRunSecondRunnableIfAborted() {
-    TXStateSynchronizationRunnable runnable =
-        new TXStateSynchronizationRunnable(cancelCriterion, beforeCompletion);
-    runnable.abort();
-    new Thread(() -> {
-      runnable.runSecondRunnable(afterCompletion);
-    }).start();
-    runnable.run();
-    verify(beforeCompletion, times(1)).run();
-    verify(afterCompletion, never()).run();
-  }
-
-}
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java 
b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
index b176df4..0fee670 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
@@ -17,7 +17,6 @@ package org.apache.geode.internal.cache;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
@@ -26,115 +25,75 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import java.util.concurrent.Executor;
-
 import javax.transaction.Status;
 
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import org.apache.geode.CancelCriterion;
 import org.apache.geode.cache.CommitConflictException;
 import org.apache.geode.cache.SynchronizationCommitConflictException;
 import org.apache.geode.cache.TransactionDataNodeHasDepartedException;
+import org.apache.geode.cache.TransactionException;
 import org.apache.geode.test.junit.categories.UnitTest;
 
 @Category(UnitTest.class)
 public class TXStateTest {
-  private CancelCriterion cancelCriterion;
-
   private TXStateProxyImpl txStateProxy;
   private CommitConflictException exception;
-  private TXStateSynchronizationRunnable txStateSynch;
-  private SynchronizationCommitConflictException 
synchronizationCommitConflictException;
-  private RuntimeException runtimeException;
   private TransactionDataNodeHasDepartedException 
transactionDataNodeHasDepartedException;
 
   @Before
   public void setup() {
     txStateProxy = mock(TXStateProxyImpl.class);
-
-    cancelCriterion = mock(CancelCriterion.class);
     exception = new CommitConflictException("");
-    txStateSynch = mock(TXStateSynchronizationRunnable.class);
-    synchronizationCommitConflictException = new 
SynchronizationCommitConflictException("");
-    runtimeException = new RuntimeException();
     transactionDataNodeHasDepartedException = new 
TransactionDataNodeHasDepartedException("");
 
     when(txStateProxy.getTxMgr()).thenReturn(mock(TXManagerImpl.class));
   }
 
-  @Test
-  public void 
beforeCompletionThrowsSynchronizationCommitConflictExceptionIfBeforeCompletionExceptionIsSet()
 {
-    TXState txState = spy(new TXState(txStateProxy, true));
-
-    doReturn(mock(Executor.class)).when(txState).getExecutor();
-    
doReturn(txStateSynch).when(txState).createTxStateSynchronizationRunnable();
-    
doThrow(synchronizationCommitConflictException).when(txStateSynch).waitForFirstExecution();
-
-    assertThatThrownBy(() -> txState.beforeCompletion())
-        .isSameAs(synchronizationCommitConflictException);
-  }
 
   @Test
-  public void 
beforeCompletionExceptionIsSetWhenDoBeforeCompletionCouldNotLockKeys() {
+  public void beforeCompletionThrowsIfReserveAndCheckFails() {
     TXState txState = spy(new TXState(txStateProxy, true));
     doThrow(exception).when(txState).reserveAndCheck();
-    doReturn(txStateSynch).when(txState).getSynchronizationRunnable();
 
-    txState.doBeforeCompletion();
-    verify(txStateSynch, times(1)).setBeforeCompletionException(any());
+    assertThatThrownBy(() -> txState.beforeCompletion())
+        .isInstanceOf(SynchronizationCommitConflictException.class);
   }
 
 
   @Test
-  public void afterCompletionThrowsExceptionIfAfterCompletionExceptionIsSet() {
-    TXState txState = spy(new TXState(txStateProxy, true));
-
-    doReturn(txStateSynch).when(txState).getSynchronizationRunnable();
-    doThrow(runtimeException).when(txStateSynch).runSecondRunnable(any());
-
-    assertThatThrownBy(() -> txState.afterCompletion(Status.STATUS_COMMITTED))
-        .isSameAs(runtimeException);
-  }
-
-  @Test
-  public void 
afterCompletionExceptionIsSetWhenCommitFailedWithTransactionDataNodeHasDepartedException()
 {
+  public void afterCompletionThrowsIfCommitFails() {
     TXState txState = spy(new TXState(txStateProxy, true));
     doReturn(mock(InternalCache.class)).when(txState).getCache();
-    // txState.reserveAndCheck();
-    doReturn(txStateSynch).when(txState).getSynchronizationRunnable();
+    doReturn(true).when(txState).wasBeforeCompletionCalled();
+    txState.reserveAndCheck();
     doThrow(transactionDataNodeHasDepartedException).when(txState).commit();
 
-    txState.doAfterCompletion(Status.STATUS_COMMITTED);
-    verify(txStateSynch, times(1)).setAfterCompletionException(any());
+    assertThatThrownBy(() -> txState.afterCompletion(Status.STATUS_COMMITTED))
+        .isSameAs(transactionDataNodeHasDepartedException);
   }
 
   @Test
-  public void 
afterCompletionExceptionIsSetToTransactionExceptionWhenCommitFailedWithCommitConflictException()
 {
+  public void 
afterCompletionThrowsTransactionExceptionIfCommitFailedCommitConflictException()
 {
     TXState txState = spy(new TXState(txStateProxy, true));
     doReturn(mock(InternalCache.class)).when(txState).getCache();
-    doReturn(txStateSynch).when(txState).getSynchronizationRunnable();
+    doReturn(true).when(txState).wasBeforeCompletionCalled();
     doThrow(exception).when(txState).commit();
 
-    txState.doAfterCompletion(Status.STATUS_COMMITTED);
-
-    verify(txStateSynch, times(1)).setAfterCompletionException(any());
-    // TODO check InternalGemFireError
-    // TransactionException transactionException =
-    // (TransactionException) txState.getAfterCompletionException();
-    // 
assertThat(transactionException.getCause()).isInstanceOf(InternalGemFireError.class);
+    assertThatThrownBy(() -> txState.afterCompletion(Status.STATUS_COMMITTED))
+        .isInstanceOf(TransactionException.class);
   }
 
-
   @Test
   public void afterCompletionCanCommitJTA() {
     TXState txState = spy(new TXState(txStateProxy, false));
     doReturn(mock(InternalCache.class)).when(txState).getCache();
     txState.reserveAndCheck();
     txState.closed = true;
-    txState.doAfterCompletion(Status.STATUS_COMMITTED);
+    doReturn(true).when(txState).wasBeforeCompletionCalled();
+    txState.afterCompletion(Status.STATUS_COMMITTED);
 
     assertThat(txState.locks).isNull();
     verify(txState, times(1)).saveTXCommitMessageForClientFailover();

Reply via email to