This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new fdc09b6   GEODE-5376: Remove SynchronizationRunnable (#2122)
fdc09b6 is described below

commit fdc09b6e03d3be789f5d4eae0c48a491a766b387
Author: pivotal-eshu <[email protected]>
AuthorDate: Fri Jul 20 10:05:35 2018 -0700

     GEODE-5376: Remove SynchronizationRunnable (#2122)
    
    
      Remove TXSynchronizationRunnable to handle JTA beforeCompletion and 
afterCompletion can be executed on different member after client failover.
---
 .../ClientServerJTAFailoverDistributedTest.java    | 270 +++++++++++++++++++++
 .../cache/DistTXStateProxyImplOnCoordinator.java   |   6 -
 .../internal/cache/PausedTXStateProxyImpl.java     |   8 -
 .../apache/geode/internal/cache/TXManagerImpl.java |   4 +
 .../org/apache/geode/internal/cache/TXState.java   |  99 +++++---
 .../apache/geode/internal/cache/TXStateProxy.java  |   9 -
 .../geode/internal/cache/TXStateProxyImpl.java     |  25 --
 .../internal/cache/TXSynchronizationRunnable.java  | 152 ------------
 .../sockets/command/TXSynchronizationCommand.java  | 235 +++++++-----------
 .../geode/internal/cache/tx/ClientTXStateStub.java |   4 +
 .../apache/geode/internal/cache/TXStateTest.java   | 108 +++++++++
 .../cache/TXSynchronizationRunnableTest.java       |  61 -----
 .../command/TXSynchronizationCommandTest.java      | 116 +++++++++
 13 files changed, 650 insertions(+), 447 deletions(-)

diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java
new file mode 100644
index 0000000..cb27de2
--- /dev/null
+++ 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.jta;
+
+import static org.apache.geode.test.dunit.VM.getHostName;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.junit.Assert.assertEquals;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+
+import javax.transaction.Status;
+
+import org.awaitility.Awaitility;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.PartitionAttributes;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.TransactionId;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.client.PoolFactory;
+import org.apache.geode.cache.client.PoolManager;
+import org.apache.geode.cache.client.internal.InternalClientCache;
+import org.apache.geode.cache.client.internal.PoolImpl;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.distributed.internal.ServerLocation;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXStateProxyImpl;
+import org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil;
+import org.apache.geode.internal.cache.tx.ClientTXStateStub;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.ClientCacheRule;
+import org.apache.geode.test.dunit.rules.DistributedTestRule;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+public class ClientServerJTAFailoverDistributedTest implements Serializable {
+  private String hostName;
+  private String uniqueName;
+  private String regionName;
+  private VM server1;
+  private VM server2;
+  private VM server3;
+  private VM client1;
+  private int port1;
+  private int port2;
+
+  private final int key = 1;
+  private final String value = "value1";
+  private final String newValue = "value2";
+
+  @Rule
+  public DistributedTestRule distributedTestRule = new DistributedTestRule();
+
+  @Rule
+  public CacheRule cacheRule = new CacheRule();
+
+  @Rule
+  public ClientCacheRule clientCacheRule = new ClientCacheRule();
+
+  @Rule
+  public SerializableTestName testName = new SerializableTestName();
+
+  @Before
+  public void setUp() {
+    server1 = getVM(0);
+    server2 = getVM(1);
+    server3 = getVM(2);
+    client1 = getVM(3);
+
+    hostName = getHostName();
+    uniqueName = getClass().getSimpleName() + "_" + testName.getMethodName();
+    regionName = uniqueName + "_region";
+  }
+
+  @Test
+  public void jtaCanFailoverAfterDoneBeforeCompletion() {
+    server3.invoke(() -> createServerRegion(1, false));
+    server3.invoke(() -> doPut(key, value));
+    port1 = server1.invoke(() -> createServerRegion(1, true));
+    port2 = server2.invoke(() -> createServerRegion(1, true));
+
+    client1.invoke(() -> createClientRegion(port1, port2));
+
+    Object[] beforeCompletionResults = client1.invoke(() -> 
doBeforeCompletion());
+
+    int port = (Integer) beforeCompletionResults[1];
+
+    if (port == port1) {
+      server1.invoke(() -> cacheRule.getCache().close());
+    } else {
+      assert port == port2;
+      server2.invoke(() -> cacheRule.getCache().close());
+    }
+
+    client1.invoke(() -> doAfterCompletion((TransactionId) 
beforeCompletionResults[0], true));
+  }
+
+  private int createServerRegion(int totalNumBuckets, boolean isAccessor) 
throws Exception {
+    PartitionAttributesFactory factory = new PartitionAttributesFactory();
+    factory.setTotalNumBuckets(totalNumBuckets);
+    if (isAccessor) {
+      factory.setLocalMaxMemory(0);
+    }
+    PartitionAttributes partitionAttributes = factory.create();
+    cacheRule.getOrCreateCache().createRegionFactory(RegionShortcut.PARTITION)
+        .setPartitionAttributes(partitionAttributes).create(regionName);
+
+    CacheServer server = cacheRule.getCache().addCacheServer();
+    server.setPort(0);
+    server.start();
+    return server.getPort();
+  }
+
+  private void createClientRegion(int... ports) {
+    clientCacheRule.createClientCache();
+
+    CacheServerTestUtil.disableShufflingOfEndpoints();
+    PoolImpl pool;
+    try {
+      pool = getPool(ports);
+    } finally {
+      CacheServerTestUtil.enableShufflingOfEndpoints();
+    }
+
+    ClientRegionFactory crf =
+        
clientCacheRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.LOCAL);
+    crf.setPoolName(pool.getName());
+    crf.create(regionName);
+
+    if (ports.length > 1) {
+      pool.acquireConnection(new ServerLocation(hostName, port1));
+    }
+  }
+
+  private PoolImpl getPool(int... ports) {
+    PoolFactory factory = PoolManager.createFactory();
+    for (int port : ports) {
+      factory.addServer(hostName, port);
+    }
+    return (PoolImpl) factory.setReadTimeout(2000).setSocketBufferSize(1000)
+        .setMinConnections(4).create(uniqueName);
+  }
+
+  private void doPut(int key, String value) {
+    cacheRule.getCache().getRegion(regionName).put(key, value);
+  }
+
+  private Object[] doBeforeCompletion() {
+    Object[] results = new Object[2];
+    InternalClientCache cache = clientCacheRule.getClientCache();
+    Region region = cache.getRegion(regionName);
+    TXManagerImpl txManager = (TXManagerImpl) 
cache.getCacheTransactionManager();
+    txManager.begin();
+    region.put(key, newValue);
+
+    TXStateProxyImpl txStateProxy = (TXStateProxyImpl) txManager.getTXState();
+    ClientTXStateStub clientTXStateStub = (ClientTXStateStub) 
txStateProxy.getRealDeal(null, null);
+    clientTXStateStub.beforeCompletion();
+    TransactionId transactionId = txManager.suspend();
+    int port = clientTXStateStub.getServerAffinityLocation().getPort();
+    results[0] = transactionId;
+    results[1] = port;
+    return results;
+  }
+
+  private void doAfterCompletion(TransactionId transactionId, boolean 
isCommit) {
+    InternalClientCache cache = clientCacheRule.getClientCache();
+    Region region = cache.getRegion(regionName);
+    TXManagerImpl txManager = (TXManagerImpl) 
cache.getCacheTransactionManager();
+    txManager.resume(transactionId);
+
+    TXStateProxyImpl txStateProxy = (TXStateProxyImpl) txManager.getTXState();
+    ClientTXStateStub clientTXStateStub = (ClientTXStateStub) 
txStateProxy.getRealDeal(null, null);
+    try {
+      clientTXStateStub
+          .afterCompletion(isCommit ? Status.STATUS_COMMITTED : 
Status.STATUS_ROLLEDBACK);
+    } catch (Exception exception) {
+      LogService.getLogger().info("exception stack ", exception);
+      throw exception;
+    }
+    if (isCommit) {
+      assertEquals(newValue, region.get(key));
+    } else {
+      assertEquals(value, region.get(key));
+    }
+  }
+
+  @Test
+  public void jtaCanFailoverToJTAHostAfterDoneBeforeCompletion() {
+    port2 = server2.invoke(() -> createServerRegion(1, false));
+    server2.invoke(() -> doPut(key, value));
+    port1 = server1.invoke(() -> createServerRegion(1, true));
+
+    client1.invoke(() -> createClientRegion(port1, port2));
+    Object[] beforeCompletionResults = client1.invoke(() -> 
doBeforeCompletion());
+
+    server1.invoke(() -> cacheRule.getCache().close());
+
+    client1.invoke(() -> doAfterCompletion((TransactionId) 
beforeCompletionResults[0], true));
+  }
+
+  @Test
+  public void jtaCanFailoverWithRollbackAfterDoneBeforeCompletion() {
+    server3.invoke(() -> createServerRegion(1, false));
+    server3.invoke(() -> doPut(key, value));
+    port1 = server1.invoke(() -> createServerRegion(1, true));
+    port2 = server2.invoke(() -> createServerRegion(1, true));
+
+    client1.invoke(() -> createClientRegion(port1, port2));
+
+    Object[] beforeCompletionResults = client1.invoke(() -> 
doBeforeCompletion());
+
+    int port = (Integer) beforeCompletionResults[1];
+
+    if (port == port1) {
+      server1.invoke(() -> cacheRule.getCache().close());
+    } else {
+      assert port == port2;
+      server2.invoke(() -> cacheRule.getCache().close());
+    }
+
+    client1.invoke(() -> doAfterCompletion((TransactionId) 
beforeCompletionResults[0], false));
+
+    createClientRegion(port == port1 ? port2 : port1);
+    doPutTransaction(true);
+  }
+
+  private void doPutTransaction(boolean isClient) {
+    Region region;
+    TXManagerImpl txManager;
+    if (isClient) {
+      InternalClientCache cache = clientCacheRule.getClientCache();
+      region = cache.getRegion(regionName);
+      txManager = (TXManagerImpl) cache.getCacheTransactionManager();
+    } else {
+      InternalCache cache = cacheRule.getCache();
+      region = cache.getRegion(regionName);
+      txManager = (TXManagerImpl) cache.getCacheTransactionManager();
+
+      Awaitility.await().atMost(30, TimeUnit.SECONDS).pollInterval(10, 
TimeUnit.MILLISECONDS)
+          .until(() -> txManager.isHostedTXStatesEmpty());
+    }
+    txManager.begin();
+    region.put(key, newValue);
+    txManager.commit();
+    assertEquals(newValue, region.get(key));
+  }
+
+}
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java
index c088c0f..031de9e 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java
@@ -117,9 +117,6 @@ public class DistTXStateProxyImplOnCoordinator extends 
DistTXStateProxyImpl {
       }
 
       inProgress = preserveTx;
-      if (this.synchRunnable != null) {
-        this.synchRunnable.abort();
-      }
     }
   }
 
@@ -281,9 +278,6 @@ public class DistTXStateProxyImplOnCoordinator extends 
DistTXStateProxyImpl {
 
     } finally {
       inProgress = false;
-      if (this.synchRunnable != null) {
-        this.synchRunnable.abort();
-      }
     }
 
     /*
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PausedTXStateProxyImpl.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PausedTXStateProxyImpl.java
index a796d5c..2e52551 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PausedTXStateProxyImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PausedTXStateProxyImpl.java
@@ -390,14 +390,6 @@ public class PausedTXStateProxyImpl implements 
TXStateProxy {
   public void setJCATransaction() {}
 
   @Override
-  public void setSynchronizationRunnable(TXSynchronizationRunnable sync) {}
-
-  @Override
-  public TXSynchronizationRunnable getSynchronizationRunnable() {
-    return null;
-  }
-
-  @Override
   public void suspend() {}
 
   @Override
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
index c5c7653..639656f 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
@@ -1875,4 +1875,8 @@ public class TXManagerImpl implements 
CacheTransactionManager, MembershipListene
     return hostedTXStates;
   }
 
+  public boolean isHostedTXStatesEmpty() {
+    return hostedTXStates.isEmpty();
+  }
+
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
index 9768fb8..389d0e7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
@@ -31,16 +31,19 @@ import javax.transaction.Status;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.CancelException;
+import org.apache.geode.InternalGemFireError;
 import org.apache.geode.SystemFailure;
 import org.apache.geode.cache.CommitConflictException;
 import org.apache.geode.cache.DiskAccessException;
 import org.apache.geode.cache.EntryNotFoundException;
+import org.apache.geode.cache.FailedSynchronizationException;
 import org.apache.geode.cache.Operation;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.Region.Entry;
 import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.cache.SynchronizationCommitConflictException;
 import org.apache.geode.cache.TransactionDataRebalancedException;
+import org.apache.geode.cache.TransactionException;
 import org.apache.geode.cache.TransactionId;
 import org.apache.geode.cache.TransactionWriter;
 import org.apache.geode.cache.TransactionWriterException;
@@ -95,6 +98,8 @@ public class TXState implements TXStateInterface {
    */
   private int modSerialNum;
   private final List<EntryEventImpl> pendingCallbacks = new 
ArrayList<EntryEventImpl>();
+  // Access this variable should be in synchronized block.
+  private boolean beforeCompletionCalled;
 
   // Internal testing hooks
   private Runnable internalAfterReservation;
@@ -996,15 +1001,21 @@ public class TXState implements TXStateInterface {
    * @see org.apache.geode.internal.cache.TXStateInterface#beforeCompletion()
    */
   @Override
-  public void beforeCompletion() throws SynchronizationCommitConflictException 
{
+  public synchronized void beforeCompletion() throws 
SynchronizationCommitConflictException {
     if (this.closed) {
       throw new TXManagerCancelledException();
     }
-    this.proxy.getTxMgr().setTXState(null);
+    if (beforeCompletionCalled) {
+      // do not re-execute beforeCompletion again
+      return;
+    }
+    beforeCompletionCalled = true;
+    doBeforeCompletion();
+  }
+
+  private void doBeforeCompletion() {
     final long opStart = CachePerfStats.getStatTime();
     this.jtaLifeTime = opStart - getBeginTime();
-
-
     try {
       reserveAndCheck();
       /*
@@ -1042,7 +1053,7 @@ public class TXState implements TXStateInterface {
       }
     } catch (CommitConflictException commitConflict) {
       cleanup();
-      this.proxy.getTxMgr().noteCommitFailure(opStart, this.jtaLifeTime, this);
+      proxy.getTxMgr().noteCommitFailure(opStart, this.jtaLifeTime, this);
       throw new SynchronizationCommitConflictException(
           LocalizedStrings.TXState_CONFLICT_DETECTED_IN_GEMFIRE_TRANSACTION_0
               .toLocalizedString(getTransactionId()),
@@ -1056,41 +1067,59 @@ public class TXState implements TXStateInterface {
    * @see org.apache.geode.internal.cache.TXStateInterface#afterCompletion(int)
    */
   @Override
-  public void afterCompletion(int status) {
-    // System.err.println("start afterCompletion");
+  public synchronized void afterCompletion(int status) {
+    this.proxy.getTxMgr().setTXState(null);
+    // For commit, beforeCompletion should be called. Otherwise
+    // throw FailedSynchronizationException().
+    if (wasBeforeCompletionCalled()) {
+      doAfterCompletion(status);
+    } else {
+      // rollback does not run beforeCompletion.
+      if (status != Status.STATUS_ROLLEDBACK) {
+        throw new FailedSynchronizationException(
+            "Could not execute afterCompletion when beforeCompletion was not 
executed");
+      }
+      doAfterCompletion(status);
+    }
+  }
+
+  private void doAfterCompletion(int status) {
     final long opStart = CachePerfStats.getStatTime();
-    switch (status) {
-      case Status.STATUS_COMMITTED:
-        // System.err.println("begin commit in afterCompletion");
-        Assert.assertTrue(this.locks != null,
-            "Gemfire Transaction afterCompletion called with illegal state.");
-        try {
-          proxy.getTxMgr().setTXState(null);
-          commit();
-          saveTXCommitMessageForClientFailover();
-        } catch (CommitConflictException error) {
-          Assert.assertTrue(false, "Gemfire Transaction " + getTransactionId()
-              + " afterCompletion failed.due to CommitConflictException: " + 
error);
-        }
+    try {
+      switch (status) {
+        case Status.STATUS_COMMITTED:
+          Assert.assertTrue(this.locks != null,
+              "Gemfire Transaction afterCompletion called with illegal 
state.");
+          try {
+            commit();
+            saveTXCommitMessageForClientFailover();
+          } catch (CommitConflictException error) {
+            Assert.assertTrue(false, "Gemfire Transaction " + 
getTransactionId()
+                + " afterCompletion failed.due to CommitConflictException: " + 
error);
+          }
 
-        this.proxy.getTxMgr().noteCommitSuccess(opStart, this.jtaLifeTime, 
this);
-        this.locks = null;
-        // System.err.println("end commit in afterCompletion");
-        break;
-      case Status.STATUS_ROLLEDBACK:
-        this.jtaLifeTime = opStart - getBeginTime();
-        this.proxy.getTxMgr().setTXState(null);
-        rollback();
-        saveTXCommitMessageForClientFailover();
-        this.proxy.getTxMgr().noteRollbackSuccess(opStart, this.jtaLifeTime, 
this);
-        break;
-      default:
-        Assert.assertTrue(false, "Unknown JTA Synchronization status " + 
status);
+          this.proxy.getTxMgr().noteCommitSuccess(opStart, this.jtaLifeTime, 
this);
+          this.locks = null;
+          break;
+        case Status.STATUS_ROLLEDBACK:
+          this.jtaLifeTime = opStart - getBeginTime();
+          rollback();
+          saveTXCommitMessageForClientFailover();
+          this.proxy.getTxMgr().noteRollbackSuccess(opStart, this.jtaLifeTime, 
this);
+          break;
+        default:
+          Assert.assertTrue(false, "Unknown JTA Synchronization status " + 
status);
+      }
+    } catch (InternalGemFireError error) {
+      throw new TransactionException(error);
     }
-    // System.err.println("end afterCompletion");
   }
 
-  private void saveTXCommitMessageForClientFailover() {
+  boolean wasBeforeCompletionCalled() {
+    return beforeCompletionCalled;
+  }
+
+  void saveTXCommitMessageForClientFailover() {
     proxy.getTxMgr().saveTXStateForClientFailover(proxy);
   }
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxy.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxy.java
index f037a02..7a79914 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxy.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxy.java
@@ -54,15 +54,6 @@ public interface TXStateProxy extends TXStateInterface {
   void setJCATransaction();
 
   /**
-   * establishes the synchronization thread used for client/server 
beforeCompletion/afterCompletion
-   * processing
-   *
-   */
-  void setSynchronizationRunnable(TXSynchronizationRunnable sync);
-
-  TXSynchronizationRunnable getSynchronizationRunnable();
-
-  /**
    * Perform additional tasks required by the proxy to suspend a transaction
    */
   void suspend();
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java
index f9aa2d4..00f15c3 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java
@@ -57,12 +57,6 @@ public class TXStateProxyImpl implements TXStateProxy {
   private boolean commitRequestedByOwner;
   private boolean isJCATransaction;
 
-  /**
-   * for client/server JTA transactions we need to have a single thread handle 
both beforeCompletion
-   * and afterCompletion so that beforeC can obtain locks for the afterC step. 
This is that thread
-   */
-  protected volatile TXSynchronizationRunnable synchRunnable;
-
   private final ReentrantLock lock = new ReentrantLock();
 
   /** number of operations in this transaction */
@@ -99,16 +93,6 @@ public class TXStateProxyImpl implements TXStateProxy {
   }
 
   @Override
-  public void setSynchronizationRunnable(TXSynchronizationRunnable synch) {
-    this.synchRunnable = synch;
-  }
-
-  @Override
-  public TXSynchronizationRunnable getSynchronizationRunnable() {
-    return this.synchRunnable;
-  }
-
-  @Override
   public ReentrantLock getLock() {
     return this.lock;
   }
@@ -230,9 +214,6 @@ public class TXStateProxyImpl implements TXStateProxy {
       throw e;
     } finally {
       inProgress = preserveTx;
-      if (this.synchRunnable != null) {
-        this.synchRunnable.abort();
-      }
     }
   }
 
@@ -410,9 +391,6 @@ public class TXStateProxyImpl implements TXStateProxy {
       getRealDeal(null, null).rollback();
     } finally {
       inProgress = false;
-      if (this.synchRunnable != null) {
-        this.synchRunnable.abort();
-      }
     }
   }
 
@@ -472,9 +450,6 @@ public class TXStateProxyImpl implements TXStateProxy {
       getRealDeal(null, null).afterCompletion(status);
     } finally {
       this.inProgress = false;
-      if (this.synchRunnable != null) {
-        this.synchRunnable.abort();
-      }
     }
   }
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXSynchronizationRunnable.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXSynchronizationRunnable.java
deleted file mode 100644
index 4603d93..0000000
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXSynchronizationRunnable.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
- * agreements. See the NOTICE file distributed with this work for additional 
information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the 
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
- * or implied. See the License for the specific language governing permissions 
and limitations under
- * the License.
- */
-package org.apache.geode.internal.cache;
-
-import org.apache.logging.log4j.Logger;
-
-import org.apache.geode.CancelCriterion;
-import org.apache.geode.internal.cache.tier.sockets.CommBufferPool;
-import org.apache.geode.internal.logging.LogService;
-
-/**
- * TXSynchronizationThread manages beforeCompletion and afterCompletion calls 
on behalf of a client
- * cache. The thread should be instantiated with a Runnable that invokes 
beforeCompletion behavior.
- * Then you must invoke runSecondRunnable() with another Runnable that invokes 
afterCompletion
- * behavior.
- *
- * @since GemFire 6.6
- */
-public class TXSynchronizationRunnable implements Runnable {
-  private static final Logger logger = LogService.getLogger();
-
-  private final CancelCriterion cancelCriterion;
-  private final CommBufferPool commBufferPool;
-
-  private Runnable firstRunnable;
-  private final Object firstRunnableSync = new Object();
-  private boolean firstRunnableCompleted;
-
-  private Runnable secondRunnable;
-  private final Object secondRunnableSync = new Object();
-  private boolean secondRunnableCompleted;
-
-  private boolean abort;
-
-  public TXSynchronizationRunnable(final CancelCriterion cancelCriterion,
-      final CommBufferPool commBufferPool, final Runnable beforeCompletion) {
-    this.cancelCriterion = cancelCriterion;
-    this.commBufferPool = commBufferPool;
-    this.firstRunnable = beforeCompletion;
-  }
-
-  @Override
-  public void run() {
-    commBufferPool.setTLCommBuffer();
-    try {
-      doSynchronizationOps();
-    } finally {
-      commBufferPool.releaseTLCommBuffer();
-    }
-  }
-
-  private void doSynchronizationOps() {
-    synchronized (this.firstRunnableSync) {
-      try {
-        this.firstRunnable.run();
-      } finally {
-        if (logger.isTraceEnabled()) {
-          logger.trace("beforeCompletion notification completed");
-        }
-        this.firstRunnableCompleted = true;
-        this.firstRunnable = null;
-        this.firstRunnableSync.notifyAll();
-      }
-    }
-    synchronized (this.secondRunnableSync) {
-      // TODO there should be a transaction timeout that keeps this thread
-      // from sitting around forever if the client goes away
-      final boolean isTraceEnabled = logger.isTraceEnabled();
-      while (this.secondRunnable == null && !this.abort) {
-        try {
-          if (isTraceEnabled) {
-            logger.trace("waiting for afterCompletion notification");
-          }
-          this.secondRunnableSync.wait(1000);
-        } catch (InterruptedException ignore) {
-          // eat the interrupt and check for exit conditions
-        }
-      }
-      if (isTraceEnabled) {
-        logger.trace("executing afterCompletion notification");
-      }
-      try {
-        if (!this.abort) {
-          this.secondRunnable.run();
-        }
-      } finally {
-        if (isTraceEnabled) {
-          logger.trace("afterCompletion notification completed");
-        }
-        this.secondRunnableCompleted = true;
-        this.secondRunnable = null;
-        this.secondRunnableSync.notifyAll();
-      }
-    }
-  }
-
-  /**
-   * wait for the initial beforeCompletion step to finish
-   */
-  public void waitForFirstExecution() {
-    synchronized (this.firstRunnableSync) {
-      while (!this.firstRunnableCompleted) {
-        try {
-          this.firstRunnableSync.wait(1000);
-        } catch (InterruptedException ignore) {
-          // eat the interrupt and check for exit conditions
-        }
-        cancelCriterion.checkCancelInProgress(null);
-      }
-    }
-  }
-
-  /**
-   * run the afterCompletion portion of synchronization. This method schedules 
execution of the
-   * given runnable and then waits for it to finish running
-   */
-  public void runSecondRunnable(Runnable r) {
-    synchronized (this.secondRunnableSync) {
-      this.secondRunnable = r;
-      this.secondRunnableSync.notifyAll();
-      while (!this.secondRunnableCompleted && !this.abort) {
-        try {
-          this.secondRunnableSync.wait(1000);
-        } catch (InterruptedException ignore) {
-          // eat the interrupt and check for exit conditions
-        }
-        cancelCriterion.checkCancelInProgress(null);
-      }
-    }
-  }
-
-  /**
-   * stop waiting for an afterCompletion to arrive and just exit
-   */
-  public void abort() {
-    synchronized (this.secondRunnableSync) {
-      this.abort = true;
-    }
-  }
-}
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
index fd9c17f..037702a 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
@@ -16,19 +16,14 @@
 package org.apache.geode.internal.cache.tier.sockets.command;
 
 import java.io.IOException;
-import java.util.concurrent.Executor;
-
-import javax.transaction.Status;
 
 import 
org.apache.geode.cache.client.internal.TXSynchronizationOp.CompletionType;
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.ReplyException;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.TXCommitMessage;
 import org.apache.geode.internal.cache.TXId;
 import org.apache.geode.internal.cache.TXManagerImpl;
 import org.apache.geode.internal.cache.TXStateProxy;
-import org.apache.geode.internal.cache.TXSynchronizationRunnable;
 import org.apache.geode.internal.cache.tier.Command;
 import org.apache.geode.internal.cache.tier.MessageType;
 import org.apache.geode.internal.cache.tier.sockets.BaseCommand;
@@ -50,21 +45,6 @@ public class TXSynchronizationCommand extends BaseCommand {
    * (non-Javadoc)
    *
    * @see
-   * 
org.apache.geode.internal.cache.tier.sockets.BaseCommand#shouldMasqueradeForTx(org.apache.geode
-   * .internal.cache.tier.sockets.Message,
-   * org.apache.geode.internal.cache.tier.sockets.ServerConnection)
-   */
-  @Override
-  protected boolean shouldMasqueradeForTx(Message clientMessage,
-      ServerConnection serverConnection) {
-    // masquerading is done in the waiting thread pool
-    return false;
-  }
-
-  /*
-   * (non-Javadoc)
-   *
-   * @see
    * 
org.apache.geode.internal.cache.tier.sockets.BaseCommand#cmdExecute(org.apache.geode.internal.
    * cache.tier.sockets.Message, 
org.apache.geode.internal.cache.tier.sockets.ServerConnection,
    * long)
@@ -86,20 +66,18 @@ public class TXSynchronizationCommand extends BaseCommand {
       statusPart = null;
     }
 
-    final TXManagerImpl txMgr =
-        (TXManagerImpl) 
serverConnection.getCache().getCacheTransactionManager();
-    final InternalDistributedMember member =
-        (InternalDistributedMember) 
serverConnection.getProxyID().getDistributedMember();
+    final TXManagerImpl txMgr = getTXManager(serverConnection);
+    final InternalDistributedMember member = 
getDistributedMember(serverConnection);
 
-    // get the tx state without associating it with this thread. That's done 
later
-    final TXStateProxy txProxy = txMgr.masqueradeAs(clientMessage, member, 
true);
+    final TXStateProxy txProxy = txMgr.getTXState();
+    assert txProxy != null;
 
     final TXId txId = txProxy.getTxId();
     TXCommitMessage commitMessage = txMgr.getRecentlyCompletedMessage(txId);
     if (commitMessage != null && commitMessage != 
TXCommitMessage.ROLLBACK_MSG) {
       assert type == CompletionType.AFTER_COMPLETION;
       try {
-        CommitCommand.writeCommitResponse(commitMessage, clientMessage, 
serverConnection);
+        writeCommitResponse(clientMessage, serverConnection, commitMessage);
       } catch (IOException e) {
         if (isDebugEnabled) {
           logger.debug("Problem writing reply to client", e);
@@ -117,137 +95,92 @@ public class TXSynchronizationCommand extends BaseCommand 
{
       return;
     }
 
-    // we have to run beforeCompletion and afterCompletion in the same thread
-    // because beforeCompletion obtains locks for the thread and 
afterCompletion
-    // releases them
-    if (txProxy != null) {
-      try {
-        if (type == CompletionType.BEFORE_COMPLETION) {
-          Runnable beforeCompletion = new Runnable() {
-            @SuppressWarnings("synthetic-access")
-            public void run() {
-              TXStateProxy txState = null;
-              Throwable failureException = null;
-              try {
-                txState = txMgr.masqueradeAs(clientMessage, member, false);
-                if (isDebugEnabled) {
-                  logger.debug("Executing beforeCompletion() notification for 
transaction {}",
-                      clientMessage.getTransactionId());
-                }
-                txState.setIsJTA(true);
-                txState.beforeCompletion();
-                try {
-                  writeReply(clientMessage, serverConnection);
-                } catch (IOException e) {
-                  if (isDebugEnabled) {
-                    logger.debug("Problem writing reply to client", e);
-                  }
-                }
-                serverConnection.setAsTrue(RESPONDED);
-              } catch (ReplyException e) {
-                failureException = e.getCause();
-              } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-              } catch (Exception e) {
-                failureException = e;
-              } finally {
-                txMgr.unmasquerade(txState);
-              }
-              if (failureException != null) {
-                try {
-                  writeException(clientMessage, failureException, false, 
serverConnection);
-                } catch (IOException ioe) {
-                  if (isDebugEnabled) {
-                    logger.debug("Problem writing reply to client", ioe);
-                  }
-                }
-                serverConnection.setAsTrue(RESPONDED);
-              }
+    try {
+      if (type == CompletionType.BEFORE_COMPLETION) {
+        if (isDebugEnabled) {
+          logger.debug("Executing beforeCompletion() notification for 
transaction {}",
+              clientMessage.getTransactionId());
+        }
+        Throwable failureException = null;
+        try {
+          txProxy.setIsJTA(true);
+          txProxy.beforeCompletion();
+          try {
+            writeReply(clientMessage, serverConnection);
+          } catch (IOException e) {
+            if (isDebugEnabled) {
+              logger.debug("Problem writing reply to client", e);
+            }
+          }
+          serverConnection.setAsTrue(RESPONDED);
+        } catch (ReplyException e) {
+          failureException = e.getCause();
+        } catch (Exception e) {
+          failureException = e;
+        }
+        if (failureException != null) {
+          try {
+            writeException(clientMessage, failureException, false, 
serverConnection);
+          } catch (IOException ioe) {
+            if (isDebugEnabled) {
+              logger.debug("Problem writing reply to client", ioe);
             }
-          };
-          TXSynchronizationRunnable sync =
-              new 
TXSynchronizationRunnable(serverConnection.getCache().getCancelCriterion(),
-                  serverConnection.getAcceptor(), beforeCompletion);
-          txProxy.setSynchronizationRunnable(sync);
-          Executor exec = 
InternalDistributedSystem.getConnectedInstance().getDistributionManager()
-              .getWaitingThreadPool();
-          exec.execute(sync);
-          sync.waitForFirstExecution();
-        } else {
-          Runnable afterCompletion = new Runnable() {
-            @SuppressWarnings("synthetic-access")
-            public void run() {
-              TXStateProxy txState = null;
-              try {
-                txState = txMgr.masqueradeAs(clientMessage, member, false);
-                int status = statusPart.getInt();
-                if (isDebugEnabled) {
-                  logger.debug("Executing afterCompletion({}) notification for 
transaction {}",
-                      status, clientMessage.getTransactionId());
-                }
-                txState.setIsJTA(true);
-                txState.afterCompletion(status);
-                // GemFire commits during afterCompletion - send the commit 
info back to the client
-                // where it can be applied to the local cache
-                TXCommitMessage cmsg = txState.getCommitMessage();
-                try {
-                  CommitCommand.writeCommitResponse(cmsg, clientMessage, 
serverConnection);
-                  txMgr.removeHostedTXState(txState.getTxId());
-                } catch (IOException e) {
-                  // not much can be done here
-                  if (isDebugEnabled || (e instanceof 
MessageTooLargeException)) {
-                    logger.warn("Problem writing reply to client", e);
-                  }
-                }
-                serverConnection.setAsTrue(RESPONDED);
-              } catch (RuntimeException e) {
-                try {
-                  writeException(clientMessage, e, false, serverConnection);
-                } catch (IOException ioe) {
-                  if (isDebugEnabled) {
-                    logger.debug("Problem writing reply to client", ioe);
-                  }
-                }
-                serverConnection.setAsTrue(RESPONDED);
-              } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-              } finally {
-                txMgr.unmasquerade(txState);
-              }
+          }
+          serverConnection.setAsTrue(RESPONDED);
+        }
+      } else {
+        try {
+          int status = statusPart.getInt();
+          if (isDebugEnabled) {
+            logger.debug("Executing afterCompletion({}) notification for 
transaction {}",
+                status, clientMessage.getTransactionId());
+          }
+          txProxy.setIsJTA(true);
+          txProxy.setCommitOnBehalfOfRemoteStub(true);
+          txProxy.afterCompletion(status);
+          // GemFire commits during afterCompletion - send the commit info 
back to the client
+          // where it can be applied to the local cache
+          TXCommitMessage cmsg = txProxy.getCommitMessage();
+          try {
+            writeCommitResponse(clientMessage, serverConnection, cmsg);
+            txMgr.removeHostedTXState(txProxy.getTxId());
+          } catch (IOException e) {
+            // not much can be done here
+            if (isDebugEnabled || (e instanceof MessageTooLargeException)) {
+              logger.warn("Problem writing reply to client", e);
             }
-          };
-          // if there was a beforeCompletion call then there will be a thread
-          // sitting in the waiting pool to execute afterCompletion. Otherwise
-          // we have failed-over and may need to do beforeCompletion & hope 
that it works
-          TXSynchronizationRunnable sync = 
txProxy.getSynchronizationRunnable();
-          if (sync != null) {
-            sync.runSecondRunnable(afterCompletion);
-          } else {
-            if (statusPart.getInt() == Status.STATUS_COMMITTED) {
-              TXStateProxy txState = txMgr.masqueradeAs(clientMessage, member, 
false);
-              try {
-                if (isDebugEnabled) {
-                  logger.debug(
-                      "Executing beforeCompletion() notification for 
transaction {} after failover",
-                      clientMessage.getTransactionId());
-                }
-                txState.setIsJTA(true);
-                txState.beforeCompletion();
-              } finally {
-                txMgr.unmasquerade(txState);
-              }
+          }
+          serverConnection.setAsTrue(RESPONDED);
+        } catch (RuntimeException e) {
+          try {
+            writeException(clientMessage, e, false, serverConnection);
+          } catch (IOException ioe) {
+            if (isDebugEnabled) {
+              logger.debug("Problem writing reply to client", ioe);
             }
-            afterCompletion.run();
           }
+          serverConnection.setAsTrue(RESPONDED);
         }
-      } catch (Exception e) {
-        writeException(clientMessage, MessageType.EXCEPTION, e, false, 
serverConnection);
-        serverConnection.setAsTrue(RESPONDED);
-      }
-      if (isDebugEnabled) {
-        logger.debug("Sent tx synchronization response");
       }
+    } catch (Exception e) {
+      writeException(clientMessage, MessageType.EXCEPTION, e, false, 
serverConnection);
+      serverConnection.setAsTrue(RESPONDED);
+    }
+    if (isDebugEnabled) {
+      logger.debug("Sent tx synchronization response");
     }
   }
 
+  void writeCommitResponse(Message clientMessage, ServerConnection 
serverConnection,
+      TXCommitMessage commitMessage) throws IOException {
+    CommitCommand.writeCommitResponse(commitMessage, clientMessage, 
serverConnection);
+  }
+
+  InternalDistributedMember getDistributedMember(ServerConnection 
serverConnection) {
+    return (InternalDistributedMember) 
serverConnection.getProxyID().getDistributedMember();
+  }
+
+  TXManagerImpl getTXManager(ServerConnection serverConnection) {
+    return (TXManagerImpl) 
serverConnection.getCache().getCacheTransactionManager();
+  }
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java
index c97d9f0..4469b71 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java
@@ -322,4 +322,8 @@ public class ClientTXStateStub extends TXStateStub {
   public void setAfterLocalLocks(Runnable afterLocalLocks) {
     this.internalAfterLocalLocks = afterLocalLocks;
   }
+
+  public ServerLocation getServerAffinityLocation() {
+    return serverAffinityLocation;
+  }
 }
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java 
b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
new file mode 100644
index 0000000..1770782
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import javax.transaction.Status;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.cache.CommitConflictException;
+import org.apache.geode.cache.SynchronizationCommitConflictException;
+import org.apache.geode.cache.TransactionDataNodeHasDepartedException;
+import org.apache.geode.cache.TransactionException;
+
+public class TXStateTest {
+  private TXStateProxyImpl txStateProxy;
+  private CommitConflictException exception;
+  private TransactionDataNodeHasDepartedException 
transactionDataNodeHasDepartedException;
+
+  @Before
+  public void setup() {
+    txStateProxy = mock(TXStateProxyImpl.class);
+    exception = new CommitConflictException("");
+    transactionDataNodeHasDepartedException = new 
TransactionDataNodeHasDepartedException("");
+
+    when(txStateProxy.getTxMgr()).thenReturn(mock(TXManagerImpl.class));
+  }
+
+
+  @Test
+  public void beforeCompletionThrowsIfReserveAndCheckFails() {
+    TXState txState = spy(new TXState(txStateProxy, true));
+    doThrow(exception).when(txState).reserveAndCheck();
+
+    assertThatThrownBy(() -> txState.beforeCompletion())
+        .isInstanceOf(SynchronizationCommitConflictException.class);
+  }
+
+
+  @Test
+  public void afterCompletionThrowsIfCommitFails() {
+    TXState txState = spy(new TXState(txStateProxy, true));
+    doReturn(mock(InternalCache.class)).when(txState).getCache();
+    doReturn(true).when(txState).wasBeforeCompletionCalled();
+    txState.reserveAndCheck();
+    doThrow(transactionDataNodeHasDepartedException).when(txState).commit();
+
+    assertThatThrownBy(() -> txState.afterCompletion(Status.STATUS_COMMITTED))
+        .isSameAs(transactionDataNodeHasDepartedException);
+  }
+
+  @Test
+  public void 
afterCompletionThrowsTransactionExceptionIfCommitFailedCommitConflictException()
 {
+    TXState txState = spy(new TXState(txStateProxy, true));
+    doReturn(mock(InternalCache.class)).when(txState).getCache();
+    doReturn(true).when(txState).wasBeforeCompletionCalled();
+    doThrow(exception).when(txState).commit();
+
+    assertThatThrownBy(() -> txState.afterCompletion(Status.STATUS_COMMITTED))
+        .isInstanceOf(TransactionException.class);
+  }
+
+  @Test
+  public void afterCompletionCanCommitJTA() {
+    TXState txState = spy(new TXState(txStateProxy, false));
+    doReturn(mock(InternalCache.class)).when(txState).getCache();
+    txState.reserveAndCheck();
+    txState.closed = true;
+    doReturn(true).when(txState).wasBeforeCompletionCalled();
+    txState.afterCompletion(Status.STATUS_COMMITTED);
+
+    assertThat(txState.locks).isNull();
+    verify(txState, times(1)).saveTXCommitMessageForClientFailover();
+  }
+
+  @Test
+  public void afterCompletionCanRollbackJTA() {
+    TXState txState = spy(new TXState(txStateProxy, true));
+    txState.afterCompletion(Status.STATUS_ROLLEDBACK);
+
+    verify(txState, times(1)).rollback();
+    verify(txState, times(1)).saveTXCommitMessageForClientFailover();
+  }
+
+}
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/TXSynchronizationRunnableTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/TXSynchronizationRunnableTest.java
deleted file mode 100644
index 0210874..0000000
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/TXSynchronizationRunnableTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
- * agreements. See the NOTICE file distributed with this work for additional 
information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the 
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
- * or implied. See the License for the specific language governing permissions 
and limitations under
- * the License.
- */
-package org.apache.geode.internal.cache;
-
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.geode.CancelCriterion;
-import org.apache.geode.cache.CacheClosedException;
-import org.apache.geode.internal.cache.tier.sockets.CommBufferPool;
-
-public class TXSynchronizationRunnableTest {
-
-  private CancelCriterion cancelCriterion;
-  private CommBufferPool commBufferPool;
-  private Runnable beforeCompletion;
-  private CacheClosedException exception;
-
-
-  @Before
-  public void setUp() {
-    exception = new CacheClosedException();
-
-    cancelCriterion = mock(CancelCriterion.class);
-    commBufferPool = mock(CommBufferPool.class);
-    beforeCompletion = mock(Runnable.class);
-
-    doThrow(exception).when(cancelCriterion).checkCancelInProgress(any());
-  }
-
-  @Test
-  public void test() {
-    TXSynchronizationRunnable runnable =
-        new TXSynchronizationRunnable(cancelCriterion, commBufferPool, 
beforeCompletion);
-    assertThatThrownBy(() -> 
runnable.waitForFirstExecution()).isSameAs(exception);
-  }
-
-  @Test
-  public void test1() {
-    TXSynchronizationRunnable runnable =
-        new TXSynchronizationRunnable(cancelCriterion, commBufferPool, 
beforeCompletion);
-    assertThatThrownBy(() -> 
runnable.runSecondRunnable(mock(Runnable.class))).isSameAs(exception);
-  }
-}
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommandTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommandTest.java
new file mode 100644
index 0000000..7bfb557
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommandTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets.command;
+
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import javax.transaction.Status;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.cache.client.internal.TXSynchronizationOp;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.TXCommitMessage;
+import org.apache.geode.internal.cache.TXId;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXStateProxyImpl;
+import org.apache.geode.internal.cache.tier.Command;
+import org.apache.geode.internal.cache.tier.sockets.Message;
+import org.apache.geode.internal.cache.tier.sockets.Part;
+import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
+
+public class TXSynchronizationCommandTest {
+  private Message clientMessage;
+  private ServerConnection serverConnection;
+  private TXManagerImpl txManager;
+  private TXStateProxyImpl txStateProxy;
+  private TXId txId;
+  private TXCommitMessage txCommitMessage;
+  private InternalDistributedMember member;
+  private Part part0;
+  private Part part1;
+  private Part part2;
+  private RuntimeException exception;
+  private TXSynchronizationCommand command;
+
+  @Before
+  public void setup() {
+    clientMessage = mock(Message.class);
+    serverConnection = mock(ServerConnection.class);
+    txManager = mock(TXManagerImpl.class);
+    member = mock(InternalDistributedMember.class);
+    txStateProxy = mock(TXStateProxyImpl.class);
+    txId = mock(TXId.class);
+    txCommitMessage = mock(TXCommitMessage.class);
+    part0 = mock(Part.class);
+    part1 = mock(Part.class);
+    part2 = mock(Part.class);
+    exception = new RuntimeException();
+    command = mock(TXSynchronizationCommand.class);
+
+    when(clientMessage.getPart(0)).thenReturn(part0);
+    when(clientMessage.getPart(1)).thenReturn(part1);
+    when(clientMessage.getPart(2)).thenReturn(part2);
+    doReturn(txManager).when(command).getTXManager(serverConnection);
+    doReturn(member).when(command).getDistributedMember(serverConnection);
+    when(txManager.getTXState()).thenReturn(txStateProxy);
+    when(txStateProxy.getTxId()).thenReturn(txId);
+  }
+
+  @Test
+  public void commandCanSendBackCommitMessageIfAlreadyCommitted() throws 
Exception {
+    
when(part0.getInt()).thenReturn(TXSynchronizationOp.CompletionType.AFTER_COMPLETION.ordinal());
+    
when(txManager.getRecentlyCompletedMessage(txId)).thenReturn(txCommitMessage);
+    doNothing().when(command).writeCommitResponse(clientMessage, 
serverConnection, txCommitMessage);
+
+    doCallRealMethod().when(command).cmdExecute(clientMessage, 
serverConnection, null, 1);
+    command.cmdExecute(clientMessage, serverConnection, null, 1);
+
+    verify(command, times(1)).writeCommitResponse(clientMessage, 
serverConnection, txCommitMessage);
+    verify(serverConnection, times(1)).setAsTrue(Command.RESPONDED);
+  }
+
+  @Test
+  public void commandCanInvokeBeforeCompletion() throws Exception {
+    
when(part0.getInt()).thenReturn(TXSynchronizationOp.CompletionType.BEFORE_COMPLETION.ordinal());
+
+    doCallRealMethod().when(command).cmdExecute(clientMessage, 
serverConnection, null, 1);
+    command.cmdExecute(clientMessage, serverConnection, null, 1);
+
+    verify(txStateProxy, times(1)).beforeCompletion();
+    verify(serverConnection, times(1)).setAsTrue(Command.RESPONDED);
+  }
+
+  @Test
+  public void commandCanSendBackCommitMessageAfterInvokeAfterCompletion() 
throws Exception {
+    
when(part0.getInt()).thenReturn(TXSynchronizationOp.CompletionType.AFTER_COMPLETION.ordinal());
+    when(part2.getInt()).thenReturn(Status.STATUS_COMMITTED);
+    when(txStateProxy.getCommitMessage()).thenReturn(txCommitMessage);
+
+    doCallRealMethod().when(command).cmdExecute(clientMessage, 
serverConnection, null, 1);
+    command.cmdExecute(clientMessage, serverConnection, null, 1);
+
+    verify(txStateProxy, times(1)).afterCompletion(Status.STATUS_COMMITTED);
+    verify(command, times(1)).writeCommitResponse(clientMessage, 
serverConnection, txCommitMessage);
+    verify(serverConnection, times(1)).setAsTrue(Command.RESPONDED);
+  }
+}

Reply via email to