This is an automated email from the ASF dual-hosted git repository.
eshu11 pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new fdc09b6 GEODE-5376: Remove SynchronizationRunnable (#2122)
fdc09b6 is described below
commit fdc09b6e03d3be789f5d4eae0c48a491a766b387
Author: pivotal-eshu <[email protected]>
AuthorDate: Fri Jul 20 10:05:35 2018 -0700
GEODE-5376: Remove SynchronizationRunnable (#2122)
Remove TXSynchronizationRunnable to handle JTA beforeCompletion and
afterCompletion can be executed on different member after client failover.
---
.../ClientServerJTAFailoverDistributedTest.java | 270 +++++++++++++++++++++
.../cache/DistTXStateProxyImplOnCoordinator.java | 6 -
.../internal/cache/PausedTXStateProxyImpl.java | 8 -
.../apache/geode/internal/cache/TXManagerImpl.java | 4 +
.../org/apache/geode/internal/cache/TXState.java | 99 +++++---
.../apache/geode/internal/cache/TXStateProxy.java | 9 -
.../geode/internal/cache/TXStateProxyImpl.java | 25 --
.../internal/cache/TXSynchronizationRunnable.java | 152 ------------
.../sockets/command/TXSynchronizationCommand.java | 235 +++++++-----------
.../geode/internal/cache/tx/ClientTXStateStub.java | 4 +
.../apache/geode/internal/cache/TXStateTest.java | 108 +++++++++
.../cache/TXSynchronizationRunnableTest.java | 61 -----
.../command/TXSynchronizationCommandTest.java | 116 +++++++++
13 files changed, 650 insertions(+), 447 deletions(-)
diff --git
a/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java
b/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java
new file mode 100644
index 0000000..cb27de2
--- /dev/null
+++
b/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.jta;
+
+import static org.apache.geode.test.dunit.VM.getHostName;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.junit.Assert.assertEquals;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+
+import javax.transaction.Status;
+
+import org.awaitility.Awaitility;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.PartitionAttributes;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.TransactionId;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.client.PoolFactory;
+import org.apache.geode.cache.client.PoolManager;
+import org.apache.geode.cache.client.internal.InternalClientCache;
+import org.apache.geode.cache.client.internal.PoolImpl;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.distributed.internal.ServerLocation;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXStateProxyImpl;
+import org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil;
+import org.apache.geode.internal.cache.tx.ClientTXStateStub;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.ClientCacheRule;
+import org.apache.geode.test.dunit.rules.DistributedTestRule;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+public class ClientServerJTAFailoverDistributedTest implements Serializable {
+ private String hostName;
+ private String uniqueName;
+ private String regionName;
+ private VM server1;
+ private VM server2;
+ private VM server3;
+ private VM client1;
+ private int port1;
+ private int port2;
+
+ private final int key = 1;
+ private final String value = "value1";
+ private final String newValue = "value2";
+
+ @Rule
+ public DistributedTestRule distributedTestRule = new DistributedTestRule();
+
+ @Rule
+ public CacheRule cacheRule = new CacheRule();
+
+ @Rule
+ public ClientCacheRule clientCacheRule = new ClientCacheRule();
+
+ @Rule
+ public SerializableTestName testName = new SerializableTestName();
+
+ @Before
+ public void setUp() {
+ server1 = getVM(0);
+ server2 = getVM(1);
+ server3 = getVM(2);
+ client1 = getVM(3);
+
+ hostName = getHostName();
+ uniqueName = getClass().getSimpleName() + "_" + testName.getMethodName();
+ regionName = uniqueName + "_region";
+ }
+
+ @Test
+ public void jtaCanFailoverAfterDoneBeforeCompletion() {
+ server3.invoke(() -> createServerRegion(1, false));
+ server3.invoke(() -> doPut(key, value));
+ port1 = server1.invoke(() -> createServerRegion(1, true));
+ port2 = server2.invoke(() -> createServerRegion(1, true));
+
+ client1.invoke(() -> createClientRegion(port1, port2));
+
+ Object[] beforeCompletionResults = client1.invoke(() ->
doBeforeCompletion());
+
+ int port = (Integer) beforeCompletionResults[1];
+
+ if (port == port1) {
+ server1.invoke(() -> cacheRule.getCache().close());
+ } else {
+ assert port == port2;
+ server2.invoke(() -> cacheRule.getCache().close());
+ }
+
+ client1.invoke(() -> doAfterCompletion((TransactionId)
beforeCompletionResults[0], true));
+ }
+
+ private int createServerRegion(int totalNumBuckets, boolean isAccessor)
throws Exception {
+ PartitionAttributesFactory factory = new PartitionAttributesFactory();
+ factory.setTotalNumBuckets(totalNumBuckets);
+ if (isAccessor) {
+ factory.setLocalMaxMemory(0);
+ }
+ PartitionAttributes partitionAttributes = factory.create();
+ cacheRule.getOrCreateCache().createRegionFactory(RegionShortcut.PARTITION)
+ .setPartitionAttributes(partitionAttributes).create(regionName);
+
+ CacheServer server = cacheRule.getCache().addCacheServer();
+ server.setPort(0);
+ server.start();
+ return server.getPort();
+ }
+
+ private void createClientRegion(int... ports) {
+ clientCacheRule.createClientCache();
+
+ CacheServerTestUtil.disableShufflingOfEndpoints();
+ PoolImpl pool;
+ try {
+ pool = getPool(ports);
+ } finally {
+ CacheServerTestUtil.enableShufflingOfEndpoints();
+ }
+
+ ClientRegionFactory crf =
+
clientCacheRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.LOCAL);
+ crf.setPoolName(pool.getName());
+ crf.create(regionName);
+
+ if (ports.length > 1) {
+ pool.acquireConnection(new ServerLocation(hostName, port1));
+ }
+ }
+
+ private PoolImpl getPool(int... ports) {
+ PoolFactory factory = PoolManager.createFactory();
+ for (int port : ports) {
+ factory.addServer(hostName, port);
+ }
+ return (PoolImpl) factory.setReadTimeout(2000).setSocketBufferSize(1000)
+ .setMinConnections(4).create(uniqueName);
+ }
+
+ private void doPut(int key, String value) {
+ cacheRule.getCache().getRegion(regionName).put(key, value);
+ }
+
+ private Object[] doBeforeCompletion() {
+ Object[] results = new Object[2];
+ InternalClientCache cache = clientCacheRule.getClientCache();
+ Region region = cache.getRegion(regionName);
+ TXManagerImpl txManager = (TXManagerImpl)
cache.getCacheTransactionManager();
+ txManager.begin();
+ region.put(key, newValue);
+
+ TXStateProxyImpl txStateProxy = (TXStateProxyImpl) txManager.getTXState();
+ ClientTXStateStub clientTXStateStub = (ClientTXStateStub)
txStateProxy.getRealDeal(null, null);
+ clientTXStateStub.beforeCompletion();
+ TransactionId transactionId = txManager.suspend();
+ int port = clientTXStateStub.getServerAffinityLocation().getPort();
+ results[0] = transactionId;
+ results[1] = port;
+ return results;
+ }
+
+ private void doAfterCompletion(TransactionId transactionId, boolean
isCommit) {
+ InternalClientCache cache = clientCacheRule.getClientCache();
+ Region region = cache.getRegion(regionName);
+ TXManagerImpl txManager = (TXManagerImpl)
cache.getCacheTransactionManager();
+ txManager.resume(transactionId);
+
+ TXStateProxyImpl txStateProxy = (TXStateProxyImpl) txManager.getTXState();
+ ClientTXStateStub clientTXStateStub = (ClientTXStateStub)
txStateProxy.getRealDeal(null, null);
+ try {
+ clientTXStateStub
+ .afterCompletion(isCommit ? Status.STATUS_COMMITTED :
Status.STATUS_ROLLEDBACK);
+ } catch (Exception exception) {
+ LogService.getLogger().info("exception stack ", exception);
+ throw exception;
+ }
+ if (isCommit) {
+ assertEquals(newValue, region.get(key));
+ } else {
+ assertEquals(value, region.get(key));
+ }
+ }
+
+ @Test
+ public void jtaCanFailoverToJTAHostAfterDoneBeforeCompletion() {
+ port2 = server2.invoke(() -> createServerRegion(1, false));
+ server2.invoke(() -> doPut(key, value));
+ port1 = server1.invoke(() -> createServerRegion(1, true));
+
+ client1.invoke(() -> createClientRegion(port1, port2));
+ Object[] beforeCompletionResults = client1.invoke(() ->
doBeforeCompletion());
+
+ server1.invoke(() -> cacheRule.getCache().close());
+
+ client1.invoke(() -> doAfterCompletion((TransactionId)
beforeCompletionResults[0], true));
+ }
+
+ @Test
+ public void jtaCanFailoverWithRollbackAfterDoneBeforeCompletion() {
+ server3.invoke(() -> createServerRegion(1, false));
+ server3.invoke(() -> doPut(key, value));
+ port1 = server1.invoke(() -> createServerRegion(1, true));
+ port2 = server2.invoke(() -> createServerRegion(1, true));
+
+ client1.invoke(() -> createClientRegion(port1, port2));
+
+ Object[] beforeCompletionResults = client1.invoke(() ->
doBeforeCompletion());
+
+ int port = (Integer) beforeCompletionResults[1];
+
+ if (port == port1) {
+ server1.invoke(() -> cacheRule.getCache().close());
+ } else {
+ assert port == port2;
+ server2.invoke(() -> cacheRule.getCache().close());
+ }
+
+ client1.invoke(() -> doAfterCompletion((TransactionId)
beforeCompletionResults[0], false));
+
+ createClientRegion(port == port1 ? port2 : port1);
+ doPutTransaction(true);
+ }
+
+ private void doPutTransaction(boolean isClient) {
+ Region region;
+ TXManagerImpl txManager;
+ if (isClient) {
+ InternalClientCache cache = clientCacheRule.getClientCache();
+ region = cache.getRegion(regionName);
+ txManager = (TXManagerImpl) cache.getCacheTransactionManager();
+ } else {
+ InternalCache cache = cacheRule.getCache();
+ region = cache.getRegion(regionName);
+ txManager = (TXManagerImpl) cache.getCacheTransactionManager();
+
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).pollInterval(10,
TimeUnit.MILLISECONDS)
+ .until(() -> txManager.isHostedTXStatesEmpty());
+ }
+ txManager.begin();
+ region.put(key, newValue);
+ txManager.commit();
+ assertEquals(newValue, region.get(key));
+ }
+
+}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java
index c088c0f..031de9e 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java
@@ -117,9 +117,6 @@ public class DistTXStateProxyImplOnCoordinator extends
DistTXStateProxyImpl {
}
inProgress = preserveTx;
- if (this.synchRunnable != null) {
- this.synchRunnable.abort();
- }
}
}
@@ -281,9 +278,6 @@ public class DistTXStateProxyImplOnCoordinator extends
DistTXStateProxyImpl {
} finally {
inProgress = false;
- if (this.synchRunnable != null) {
- this.synchRunnable.abort();
- }
}
/*
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/PausedTXStateProxyImpl.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/PausedTXStateProxyImpl.java
index a796d5c..2e52551 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/PausedTXStateProxyImpl.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/PausedTXStateProxyImpl.java
@@ -390,14 +390,6 @@ public class PausedTXStateProxyImpl implements
TXStateProxy {
public void setJCATransaction() {}
@Override
- public void setSynchronizationRunnable(TXSynchronizationRunnable sync) {}
-
- @Override
- public TXSynchronizationRunnable getSynchronizationRunnable() {
- return null;
- }
-
- @Override
public void suspend() {}
@Override
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
index c5c7653..639656f 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
@@ -1875,4 +1875,8 @@ public class TXManagerImpl implements
CacheTransactionManager, MembershipListene
return hostedTXStates;
}
+ public boolean isHostedTXStatesEmpty() {
+ return hostedTXStates.isEmpty();
+ }
+
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
index 9768fb8..389d0e7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
@@ -31,16 +31,19 @@ import javax.transaction.Status;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
+import org.apache.geode.InternalGemFireError;
import org.apache.geode.SystemFailure;
import org.apache.geode.cache.CommitConflictException;
import org.apache.geode.cache.DiskAccessException;
import org.apache.geode.cache.EntryNotFoundException;
+import org.apache.geode.cache.FailedSynchronizationException;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.Region.Entry;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.SynchronizationCommitConflictException;
import org.apache.geode.cache.TransactionDataRebalancedException;
+import org.apache.geode.cache.TransactionException;
import org.apache.geode.cache.TransactionId;
import org.apache.geode.cache.TransactionWriter;
import org.apache.geode.cache.TransactionWriterException;
@@ -95,6 +98,8 @@ public class TXState implements TXStateInterface {
*/
private int modSerialNum;
private final List<EntryEventImpl> pendingCallbacks = new
ArrayList<EntryEventImpl>();
+ // Access this variable should be in synchronized block.
+ private boolean beforeCompletionCalled;
// Internal testing hooks
private Runnable internalAfterReservation;
@@ -996,15 +1001,21 @@ public class TXState implements TXStateInterface {
* @see org.apache.geode.internal.cache.TXStateInterface#beforeCompletion()
*/
@Override
- public void beforeCompletion() throws SynchronizationCommitConflictException
{
+ public synchronized void beforeCompletion() throws
SynchronizationCommitConflictException {
if (this.closed) {
throw new TXManagerCancelledException();
}
- this.proxy.getTxMgr().setTXState(null);
+ if (beforeCompletionCalled) {
+ // do not re-execute beforeCompletion again
+ return;
+ }
+ beforeCompletionCalled = true;
+ doBeforeCompletion();
+ }
+
+ private void doBeforeCompletion() {
final long opStart = CachePerfStats.getStatTime();
this.jtaLifeTime = opStart - getBeginTime();
-
-
try {
reserveAndCheck();
/*
@@ -1042,7 +1053,7 @@ public class TXState implements TXStateInterface {
}
} catch (CommitConflictException commitConflict) {
cleanup();
- this.proxy.getTxMgr().noteCommitFailure(opStart, this.jtaLifeTime, this);
+ proxy.getTxMgr().noteCommitFailure(opStart, this.jtaLifeTime, this);
throw new SynchronizationCommitConflictException(
LocalizedStrings.TXState_CONFLICT_DETECTED_IN_GEMFIRE_TRANSACTION_0
.toLocalizedString(getTransactionId()),
@@ -1056,41 +1067,59 @@ public class TXState implements TXStateInterface {
* @see org.apache.geode.internal.cache.TXStateInterface#afterCompletion(int)
*/
@Override
- public void afterCompletion(int status) {
- // System.err.println("start afterCompletion");
+ public synchronized void afterCompletion(int status) {
+ this.proxy.getTxMgr().setTXState(null);
+ // For commit, beforeCompletion should be called. Otherwise
+ // throw FailedSynchronizationException().
+ if (wasBeforeCompletionCalled()) {
+ doAfterCompletion(status);
+ } else {
+ // rollback does not run beforeCompletion.
+ if (status != Status.STATUS_ROLLEDBACK) {
+ throw new FailedSynchronizationException(
+ "Could not execute afterCompletion when beforeCompletion was not
executed");
+ }
+ doAfterCompletion(status);
+ }
+ }
+
+ private void doAfterCompletion(int status) {
final long opStart = CachePerfStats.getStatTime();
- switch (status) {
- case Status.STATUS_COMMITTED:
- // System.err.println("begin commit in afterCompletion");
- Assert.assertTrue(this.locks != null,
- "Gemfire Transaction afterCompletion called with illegal state.");
- try {
- proxy.getTxMgr().setTXState(null);
- commit();
- saveTXCommitMessageForClientFailover();
- } catch (CommitConflictException error) {
- Assert.assertTrue(false, "Gemfire Transaction " + getTransactionId()
- + " afterCompletion failed.due to CommitConflictException: " +
error);
- }
+ try {
+ switch (status) {
+ case Status.STATUS_COMMITTED:
+ Assert.assertTrue(this.locks != null,
+ "Gemfire Transaction afterCompletion called with illegal
state.");
+ try {
+ commit();
+ saveTXCommitMessageForClientFailover();
+ } catch (CommitConflictException error) {
+ Assert.assertTrue(false, "Gemfire Transaction " +
getTransactionId()
+ + " afterCompletion failed.due to CommitConflictException: " +
error);
+ }
- this.proxy.getTxMgr().noteCommitSuccess(opStart, this.jtaLifeTime,
this);
- this.locks = null;
- // System.err.println("end commit in afterCompletion");
- break;
- case Status.STATUS_ROLLEDBACK:
- this.jtaLifeTime = opStart - getBeginTime();
- this.proxy.getTxMgr().setTXState(null);
- rollback();
- saveTXCommitMessageForClientFailover();
- this.proxy.getTxMgr().noteRollbackSuccess(opStart, this.jtaLifeTime,
this);
- break;
- default:
- Assert.assertTrue(false, "Unknown JTA Synchronization status " +
status);
+ this.proxy.getTxMgr().noteCommitSuccess(opStart, this.jtaLifeTime,
this);
+ this.locks = null;
+ break;
+ case Status.STATUS_ROLLEDBACK:
+ this.jtaLifeTime = opStart - getBeginTime();
+ rollback();
+ saveTXCommitMessageForClientFailover();
+ this.proxy.getTxMgr().noteRollbackSuccess(opStart, this.jtaLifeTime,
this);
+ break;
+ default:
+ Assert.assertTrue(false, "Unknown JTA Synchronization status " +
status);
+ }
+ } catch (InternalGemFireError error) {
+ throw new TransactionException(error);
}
- // System.err.println("end afterCompletion");
}
- private void saveTXCommitMessageForClientFailover() {
+ boolean wasBeforeCompletionCalled() {
+ return beforeCompletionCalled;
+ }
+
+ void saveTXCommitMessageForClientFailover() {
proxy.getTxMgr().saveTXStateForClientFailover(proxy);
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxy.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxy.java
index f037a02..7a79914 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxy.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxy.java
@@ -54,15 +54,6 @@ public interface TXStateProxy extends TXStateInterface {
void setJCATransaction();
/**
- * establishes the synchronization thread used for client/server
beforeCompletion/afterCompletion
- * processing
- *
- */
- void setSynchronizationRunnable(TXSynchronizationRunnable sync);
-
- TXSynchronizationRunnable getSynchronizationRunnable();
-
- /**
* Perform additional tasks required by the proxy to suspend a transaction
*/
void suspend();
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java
index f9aa2d4..00f15c3 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java
@@ -57,12 +57,6 @@ public class TXStateProxyImpl implements TXStateProxy {
private boolean commitRequestedByOwner;
private boolean isJCATransaction;
- /**
- * for client/server JTA transactions we need to have a single thread handle
both beforeCompletion
- * and afterCompletion so that beforeC can obtain locks for the afterC step.
This is that thread
- */
- protected volatile TXSynchronizationRunnable synchRunnable;
-
private final ReentrantLock lock = new ReentrantLock();
/** number of operations in this transaction */
@@ -99,16 +93,6 @@ public class TXStateProxyImpl implements TXStateProxy {
}
@Override
- public void setSynchronizationRunnable(TXSynchronizationRunnable synch) {
- this.synchRunnable = synch;
- }
-
- @Override
- public TXSynchronizationRunnable getSynchronizationRunnable() {
- return this.synchRunnable;
- }
-
- @Override
public ReentrantLock getLock() {
return this.lock;
}
@@ -230,9 +214,6 @@ public class TXStateProxyImpl implements TXStateProxy {
throw e;
} finally {
inProgress = preserveTx;
- if (this.synchRunnable != null) {
- this.synchRunnable.abort();
- }
}
}
@@ -410,9 +391,6 @@ public class TXStateProxyImpl implements TXStateProxy {
getRealDeal(null, null).rollback();
} finally {
inProgress = false;
- if (this.synchRunnable != null) {
- this.synchRunnable.abort();
- }
}
}
@@ -472,9 +450,6 @@ public class TXStateProxyImpl implements TXStateProxy {
getRealDeal(null, null).afterCompletion(status);
} finally {
this.inProgress = false;
- if (this.synchRunnable != null) {
- this.synchRunnable.abort();
- }
}
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXSynchronizationRunnable.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXSynchronizationRunnable.java
deleted file mode 100644
index 4603d93..0000000
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXSynchronizationRunnable.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
- * agreements. See the NOTICE file distributed with this work for additional
information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
- * or implied. See the License for the specific language governing permissions
and limitations under
- * the License.
- */
-package org.apache.geode.internal.cache;
-
-import org.apache.logging.log4j.Logger;
-
-import org.apache.geode.CancelCriterion;
-import org.apache.geode.internal.cache.tier.sockets.CommBufferPool;
-import org.apache.geode.internal.logging.LogService;
-
-/**
- * TXSynchronizationThread manages beforeCompletion and afterCompletion calls
on behalf of a client
- * cache. The thread should be instantiated with a Runnable that invokes
beforeCompletion behavior.
- * Then you must invoke runSecondRunnable() with another Runnable that invokes
afterCompletion
- * behavior.
- *
- * @since GemFire 6.6
- */
-public class TXSynchronizationRunnable implements Runnable {
- private static final Logger logger = LogService.getLogger();
-
- private final CancelCriterion cancelCriterion;
- private final CommBufferPool commBufferPool;
-
- private Runnable firstRunnable;
- private final Object firstRunnableSync = new Object();
- private boolean firstRunnableCompleted;
-
- private Runnable secondRunnable;
- private final Object secondRunnableSync = new Object();
- private boolean secondRunnableCompleted;
-
- private boolean abort;
-
- public TXSynchronizationRunnable(final CancelCriterion cancelCriterion,
- final CommBufferPool commBufferPool, final Runnable beforeCompletion) {
- this.cancelCriterion = cancelCriterion;
- this.commBufferPool = commBufferPool;
- this.firstRunnable = beforeCompletion;
- }
-
- @Override
- public void run() {
- commBufferPool.setTLCommBuffer();
- try {
- doSynchronizationOps();
- } finally {
- commBufferPool.releaseTLCommBuffer();
- }
- }
-
- private void doSynchronizationOps() {
- synchronized (this.firstRunnableSync) {
- try {
- this.firstRunnable.run();
- } finally {
- if (logger.isTraceEnabled()) {
- logger.trace("beforeCompletion notification completed");
- }
- this.firstRunnableCompleted = true;
- this.firstRunnable = null;
- this.firstRunnableSync.notifyAll();
- }
- }
- synchronized (this.secondRunnableSync) {
- // TODO there should be a transaction timeout that keeps this thread
- // from sitting around forever if the client goes away
- final boolean isTraceEnabled = logger.isTraceEnabled();
- while (this.secondRunnable == null && !this.abort) {
- try {
- if (isTraceEnabled) {
- logger.trace("waiting for afterCompletion notification");
- }
- this.secondRunnableSync.wait(1000);
- } catch (InterruptedException ignore) {
- // eat the interrupt and check for exit conditions
- }
- }
- if (isTraceEnabled) {
- logger.trace("executing afterCompletion notification");
- }
- try {
- if (!this.abort) {
- this.secondRunnable.run();
- }
- } finally {
- if (isTraceEnabled) {
- logger.trace("afterCompletion notification completed");
- }
- this.secondRunnableCompleted = true;
- this.secondRunnable = null;
- this.secondRunnableSync.notifyAll();
- }
- }
- }
-
- /**
- * wait for the initial beforeCompletion step to finish
- */
- public void waitForFirstExecution() {
- synchronized (this.firstRunnableSync) {
- while (!this.firstRunnableCompleted) {
- try {
- this.firstRunnableSync.wait(1000);
- } catch (InterruptedException ignore) {
- // eat the interrupt and check for exit conditions
- }
- cancelCriterion.checkCancelInProgress(null);
- }
- }
- }
-
- /**
- * run the afterCompletion portion of synchronization. This method schedules
execution of the
- * given runnable and then waits for it to finish running
- */
- public void runSecondRunnable(Runnable r) {
- synchronized (this.secondRunnableSync) {
- this.secondRunnable = r;
- this.secondRunnableSync.notifyAll();
- while (!this.secondRunnableCompleted && !this.abort) {
- try {
- this.secondRunnableSync.wait(1000);
- } catch (InterruptedException ignore) {
- // eat the interrupt and check for exit conditions
- }
- cancelCriterion.checkCancelInProgress(null);
- }
- }
- }
-
- /**
- * stop waiting for an afterCompletion to arrive and just exit
- */
- public void abort() {
- synchronized (this.secondRunnableSync) {
- this.abort = true;
- }
- }
-}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
index fd9c17f..037702a 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
@@ -16,19 +16,14 @@
package org.apache.geode.internal.cache.tier.sockets.command;
import java.io.IOException;
-import java.util.concurrent.Executor;
-
-import javax.transaction.Status;
import
org.apache.geode.cache.client.internal.TXSynchronizationOp.CompletionType;
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.ReplyException;
import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.TXCommitMessage;
import org.apache.geode.internal.cache.TXId;
import org.apache.geode.internal.cache.TXManagerImpl;
import org.apache.geode.internal.cache.TXStateProxy;
-import org.apache.geode.internal.cache.TXSynchronizationRunnable;
import org.apache.geode.internal.cache.tier.Command;
import org.apache.geode.internal.cache.tier.MessageType;
import org.apache.geode.internal.cache.tier.sockets.BaseCommand;
@@ -50,21 +45,6 @@ public class TXSynchronizationCommand extends BaseCommand {
* (non-Javadoc)
*
* @see
- *
org.apache.geode.internal.cache.tier.sockets.BaseCommand#shouldMasqueradeForTx(org.apache.geode
- * .internal.cache.tier.sockets.Message,
- * org.apache.geode.internal.cache.tier.sockets.ServerConnection)
- */
- @Override
- protected boolean shouldMasqueradeForTx(Message clientMessage,
- ServerConnection serverConnection) {
- // masquerading is done in the waiting thread pool
- return false;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
*
org.apache.geode.internal.cache.tier.sockets.BaseCommand#cmdExecute(org.apache.geode.internal.
* cache.tier.sockets.Message,
org.apache.geode.internal.cache.tier.sockets.ServerConnection,
* long)
@@ -86,20 +66,18 @@ public class TXSynchronizationCommand extends BaseCommand {
statusPart = null;
}
- final TXManagerImpl txMgr =
- (TXManagerImpl)
serverConnection.getCache().getCacheTransactionManager();
- final InternalDistributedMember member =
- (InternalDistributedMember)
serverConnection.getProxyID().getDistributedMember();
+ final TXManagerImpl txMgr = getTXManager(serverConnection);
+ final InternalDistributedMember member =
getDistributedMember(serverConnection);
- // get the tx state without associating it with this thread. That's done
later
- final TXStateProxy txProxy = txMgr.masqueradeAs(clientMessage, member,
true);
+ final TXStateProxy txProxy = txMgr.getTXState();
+ assert txProxy != null;
final TXId txId = txProxy.getTxId();
TXCommitMessage commitMessage = txMgr.getRecentlyCompletedMessage(txId);
if (commitMessage != null && commitMessage !=
TXCommitMessage.ROLLBACK_MSG) {
assert type == CompletionType.AFTER_COMPLETION;
try {
- CommitCommand.writeCommitResponse(commitMessage, clientMessage,
serverConnection);
+ writeCommitResponse(clientMessage, serverConnection, commitMessage);
} catch (IOException e) {
if (isDebugEnabled) {
logger.debug("Problem writing reply to client", e);
@@ -117,137 +95,92 @@ public class TXSynchronizationCommand extends BaseCommand
{
return;
}
- // we have to run beforeCompletion and afterCompletion in the same thread
- // because beforeCompletion obtains locks for the thread and
afterCompletion
- // releases them
- if (txProxy != null) {
- try {
- if (type == CompletionType.BEFORE_COMPLETION) {
- Runnable beforeCompletion = new Runnable() {
- @SuppressWarnings("synthetic-access")
- public void run() {
- TXStateProxy txState = null;
- Throwable failureException = null;
- try {
- txState = txMgr.masqueradeAs(clientMessage, member, false);
- if (isDebugEnabled) {
- logger.debug("Executing beforeCompletion() notification for
transaction {}",
- clientMessage.getTransactionId());
- }
- txState.setIsJTA(true);
- txState.beforeCompletion();
- try {
- writeReply(clientMessage, serverConnection);
- } catch (IOException e) {
- if (isDebugEnabled) {
- logger.debug("Problem writing reply to client", e);
- }
- }
- serverConnection.setAsTrue(RESPONDED);
- } catch (ReplyException e) {
- failureException = e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } catch (Exception e) {
- failureException = e;
- } finally {
- txMgr.unmasquerade(txState);
- }
- if (failureException != null) {
- try {
- writeException(clientMessage, failureException, false,
serverConnection);
- } catch (IOException ioe) {
- if (isDebugEnabled) {
- logger.debug("Problem writing reply to client", ioe);
- }
- }
- serverConnection.setAsTrue(RESPONDED);
- }
+ try {
+ if (type == CompletionType.BEFORE_COMPLETION) {
+ if (isDebugEnabled) {
+ logger.debug("Executing beforeCompletion() notification for
transaction {}",
+ clientMessage.getTransactionId());
+ }
+ Throwable failureException = null;
+ try {
+ txProxy.setIsJTA(true);
+ txProxy.beforeCompletion();
+ try {
+ writeReply(clientMessage, serverConnection);
+ } catch (IOException e) {
+ if (isDebugEnabled) {
+ logger.debug("Problem writing reply to client", e);
+ }
+ }
+ serverConnection.setAsTrue(RESPONDED);
+ } catch (ReplyException e) {
+ failureException = e.getCause();
+ } catch (Exception e) {
+ failureException = e;
+ }
+ if (failureException != null) {
+ try {
+ writeException(clientMessage, failureException, false,
serverConnection);
+ } catch (IOException ioe) {
+ if (isDebugEnabled) {
+ logger.debug("Problem writing reply to client", ioe);
}
- };
- TXSynchronizationRunnable sync =
- new
TXSynchronizationRunnable(serverConnection.getCache().getCancelCriterion(),
- serverConnection.getAcceptor(), beforeCompletion);
- txProxy.setSynchronizationRunnable(sync);
- Executor exec =
InternalDistributedSystem.getConnectedInstance().getDistributionManager()
- .getWaitingThreadPool();
- exec.execute(sync);
- sync.waitForFirstExecution();
- } else {
- Runnable afterCompletion = new Runnable() {
- @SuppressWarnings("synthetic-access")
- public void run() {
- TXStateProxy txState = null;
- try {
- txState = txMgr.masqueradeAs(clientMessage, member, false);
- int status = statusPart.getInt();
- if (isDebugEnabled) {
- logger.debug("Executing afterCompletion({}) notification for
transaction {}",
- status, clientMessage.getTransactionId());
- }
- txState.setIsJTA(true);
- txState.afterCompletion(status);
- // GemFire commits during afterCompletion - send the commit
info back to the client
- // where it can be applied to the local cache
- TXCommitMessage cmsg = txState.getCommitMessage();
- try {
- CommitCommand.writeCommitResponse(cmsg, clientMessage,
serverConnection);
- txMgr.removeHostedTXState(txState.getTxId());
- } catch (IOException e) {
- // not much can be done here
- if (isDebugEnabled || (e instanceof
MessageTooLargeException)) {
- logger.warn("Problem writing reply to client", e);
- }
- }
- serverConnection.setAsTrue(RESPONDED);
- } catch (RuntimeException e) {
- try {
- writeException(clientMessage, e, false, serverConnection);
- } catch (IOException ioe) {
- if (isDebugEnabled) {
- logger.debug("Problem writing reply to client", ioe);
- }
- }
- serverConnection.setAsTrue(RESPONDED);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } finally {
- txMgr.unmasquerade(txState);
- }
+ }
+ serverConnection.setAsTrue(RESPONDED);
+ }
+ } else {
+ try {
+ int status = statusPart.getInt();
+ if (isDebugEnabled) {
+ logger.debug("Executing afterCompletion({}) notification for
transaction {}",
+ status, clientMessage.getTransactionId());
+ }
+ txProxy.setIsJTA(true);
+ txProxy.setCommitOnBehalfOfRemoteStub(true);
+ txProxy.afterCompletion(status);
+ // GemFire commits during afterCompletion - send the commit info
back to the client
+ // where it can be applied to the local cache
+ TXCommitMessage cmsg = txProxy.getCommitMessage();
+ try {
+ writeCommitResponse(clientMessage, serverConnection, cmsg);
+ txMgr.removeHostedTXState(txProxy.getTxId());
+ } catch (IOException e) {
+ // not much can be done here
+ if (isDebugEnabled || (e instanceof MessageTooLargeException)) {
+ logger.warn("Problem writing reply to client", e);
}
- };
- // if there was a beforeCompletion call then there will be a thread
- // sitting in the waiting pool to execute afterCompletion. Otherwise
- // we have failed-over and may need to do beforeCompletion & hope
that it works
- TXSynchronizationRunnable sync =
txProxy.getSynchronizationRunnable();
- if (sync != null) {
- sync.runSecondRunnable(afterCompletion);
- } else {
- if (statusPart.getInt() == Status.STATUS_COMMITTED) {
- TXStateProxy txState = txMgr.masqueradeAs(clientMessage, member,
false);
- try {
- if (isDebugEnabled) {
- logger.debug(
- "Executing beforeCompletion() notification for
transaction {} after failover",
- clientMessage.getTransactionId());
- }
- txState.setIsJTA(true);
- txState.beforeCompletion();
- } finally {
- txMgr.unmasquerade(txState);
- }
+ }
+ serverConnection.setAsTrue(RESPONDED);
+ } catch (RuntimeException e) {
+ try {
+ writeException(clientMessage, e, false, serverConnection);
+ } catch (IOException ioe) {
+ if (isDebugEnabled) {
+ logger.debug("Problem writing reply to client", ioe);
}
- afterCompletion.run();
}
+ serverConnection.setAsTrue(RESPONDED);
}
- } catch (Exception e) {
- writeException(clientMessage, MessageType.EXCEPTION, e, false,
serverConnection);
- serverConnection.setAsTrue(RESPONDED);
- }
- if (isDebugEnabled) {
- logger.debug("Sent tx synchronization response");
}
+ } catch (Exception e) {
+ writeException(clientMessage, MessageType.EXCEPTION, e, false,
serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
+ }
+ if (isDebugEnabled) {
+ logger.debug("Sent tx synchronization response");
}
}
+ void writeCommitResponse(Message clientMessage, ServerConnection
serverConnection,
+ TXCommitMessage commitMessage) throws IOException {
+ CommitCommand.writeCommitResponse(commitMessage, clientMessage,
serverConnection);
+ }
+
+ InternalDistributedMember getDistributedMember(ServerConnection
serverConnection) {
+ return (InternalDistributedMember)
serverConnection.getProxyID().getDistributedMember();
+ }
+
+ TXManagerImpl getTXManager(ServerConnection serverConnection) {
+ return (TXManagerImpl)
serverConnection.getCache().getCacheTransactionManager();
+ }
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java
index c97d9f0..4469b71 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java
@@ -322,4 +322,8 @@ public class ClientTXStateStub extends TXStateStub {
public void setAfterLocalLocks(Runnable afterLocalLocks) {
this.internalAfterLocalLocks = afterLocalLocks;
}
+
+ public ServerLocation getServerAffinityLocation() {
+ return serverAffinityLocation;
+ }
}
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
new file mode 100644
index 0000000..1770782
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import javax.transaction.Status;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.cache.CommitConflictException;
+import org.apache.geode.cache.SynchronizationCommitConflictException;
+import org.apache.geode.cache.TransactionDataNodeHasDepartedException;
+import org.apache.geode.cache.TransactionException;
+
+public class TXStateTest {
+ private TXStateProxyImpl txStateProxy;
+ private CommitConflictException exception;
+ private TransactionDataNodeHasDepartedException
transactionDataNodeHasDepartedException;
+
+ @Before
+ public void setup() {
+ txStateProxy = mock(TXStateProxyImpl.class);
+ exception = new CommitConflictException("");
+ transactionDataNodeHasDepartedException = new
TransactionDataNodeHasDepartedException("");
+
+ when(txStateProxy.getTxMgr()).thenReturn(mock(TXManagerImpl.class));
+ }
+
+
+ @Test
+ public void beforeCompletionThrowsIfReserveAndCheckFails() {
+ TXState txState = spy(new TXState(txStateProxy, true));
+ doThrow(exception).when(txState).reserveAndCheck();
+
+ assertThatThrownBy(() -> txState.beforeCompletion())
+ .isInstanceOf(SynchronizationCommitConflictException.class);
+ }
+
+
+ @Test
+ public void afterCompletionThrowsIfCommitFails() {
+ TXState txState = spy(new TXState(txStateProxy, true));
+ doReturn(mock(InternalCache.class)).when(txState).getCache();
+ doReturn(true).when(txState).wasBeforeCompletionCalled();
+ txState.reserveAndCheck();
+ doThrow(transactionDataNodeHasDepartedException).when(txState).commit();
+
+ assertThatThrownBy(() -> txState.afterCompletion(Status.STATUS_COMMITTED))
+ .isSameAs(transactionDataNodeHasDepartedException);
+ }
+
+ @Test
+ public void
afterCompletionThrowsTransactionExceptionIfCommitFailedCommitConflictException()
{
+ TXState txState = spy(new TXState(txStateProxy, true));
+ doReturn(mock(InternalCache.class)).when(txState).getCache();
+ doReturn(true).when(txState).wasBeforeCompletionCalled();
+ doThrow(exception).when(txState).commit();
+
+ assertThatThrownBy(() -> txState.afterCompletion(Status.STATUS_COMMITTED))
+ .isInstanceOf(TransactionException.class);
+ }
+
+ @Test
+ public void afterCompletionCanCommitJTA() {
+ TXState txState = spy(new TXState(txStateProxy, false));
+ doReturn(mock(InternalCache.class)).when(txState).getCache();
+ txState.reserveAndCheck();
+ txState.closed = true;
+ doReturn(true).when(txState).wasBeforeCompletionCalled();
+ txState.afterCompletion(Status.STATUS_COMMITTED);
+
+ assertThat(txState.locks).isNull();
+ verify(txState, times(1)).saveTXCommitMessageForClientFailover();
+ }
+
+ @Test
+ public void afterCompletionCanRollbackJTA() {
+ TXState txState = spy(new TXState(txStateProxy, true));
+ txState.afterCompletion(Status.STATUS_ROLLEDBACK);
+
+ verify(txState, times(1)).rollback();
+ verify(txState, times(1)).saveTXCommitMessageForClientFailover();
+ }
+
+}
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/TXSynchronizationRunnableTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/TXSynchronizationRunnableTest.java
deleted file mode 100644
index 0210874..0000000
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/TXSynchronizationRunnableTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
- * agreements. See the NOTICE file distributed with this work for additional
information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
- * or implied. See the License for the specific language governing permissions
and limitations under
- * the License.
- */
-package org.apache.geode.internal.cache;
-
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.geode.CancelCriterion;
-import org.apache.geode.cache.CacheClosedException;
-import org.apache.geode.internal.cache.tier.sockets.CommBufferPool;
-
-public class TXSynchronizationRunnableTest {
-
- private CancelCriterion cancelCriterion;
- private CommBufferPool commBufferPool;
- private Runnable beforeCompletion;
- private CacheClosedException exception;
-
-
- @Before
- public void setUp() {
- exception = new CacheClosedException();
-
- cancelCriterion = mock(CancelCriterion.class);
- commBufferPool = mock(CommBufferPool.class);
- beforeCompletion = mock(Runnable.class);
-
- doThrow(exception).when(cancelCriterion).checkCancelInProgress(any());
- }
-
- @Test
- public void test() {
- TXSynchronizationRunnable runnable =
- new TXSynchronizationRunnable(cancelCriterion, commBufferPool,
beforeCompletion);
- assertThatThrownBy(() ->
runnable.waitForFirstExecution()).isSameAs(exception);
- }
-
- @Test
- public void test1() {
- TXSynchronizationRunnable runnable =
- new TXSynchronizationRunnable(cancelCriterion, commBufferPool,
beforeCompletion);
- assertThatThrownBy(() ->
runnable.runSecondRunnable(mock(Runnable.class))).isSameAs(exception);
- }
-}
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommandTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommandTest.java
new file mode 100644
index 0000000..7bfb557
--- /dev/null
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommandTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets.command;
+
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import javax.transaction.Status;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.cache.client.internal.TXSynchronizationOp;
+import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.TXCommitMessage;
+import org.apache.geode.internal.cache.TXId;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXStateProxyImpl;
+import org.apache.geode.internal.cache.tier.Command;
+import org.apache.geode.internal.cache.tier.sockets.Message;
+import org.apache.geode.internal.cache.tier.sockets.Part;
+import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
+
+public class TXSynchronizationCommandTest {
+ private Message clientMessage;
+ private ServerConnection serverConnection;
+ private TXManagerImpl txManager;
+ private TXStateProxyImpl txStateProxy;
+ private TXId txId;
+ private TXCommitMessage txCommitMessage;
+ private InternalDistributedMember member;
+ private Part part0;
+ private Part part1;
+ private Part part2;
+ private RuntimeException exception;
+ private TXSynchronizationCommand command;
+
+ @Before
+ public void setup() {
+ clientMessage = mock(Message.class);
+ serverConnection = mock(ServerConnection.class);
+ txManager = mock(TXManagerImpl.class);
+ member = mock(InternalDistributedMember.class);
+ txStateProxy = mock(TXStateProxyImpl.class);
+ txId = mock(TXId.class);
+ txCommitMessage = mock(TXCommitMessage.class);
+ part0 = mock(Part.class);
+ part1 = mock(Part.class);
+ part2 = mock(Part.class);
+ exception = new RuntimeException();
+ command = mock(TXSynchronizationCommand.class);
+
+ when(clientMessage.getPart(0)).thenReturn(part0);
+ when(clientMessage.getPart(1)).thenReturn(part1);
+ when(clientMessage.getPart(2)).thenReturn(part2);
+ doReturn(txManager).when(command).getTXManager(serverConnection);
+ doReturn(member).when(command).getDistributedMember(serverConnection);
+ when(txManager.getTXState()).thenReturn(txStateProxy);
+ when(txStateProxy.getTxId()).thenReturn(txId);
+ }
+
+ @Test
+ public void commandCanSendBackCommitMessageIfAlreadyCommitted() throws
Exception {
+
when(part0.getInt()).thenReturn(TXSynchronizationOp.CompletionType.AFTER_COMPLETION.ordinal());
+
when(txManager.getRecentlyCompletedMessage(txId)).thenReturn(txCommitMessage);
+ doNothing().when(command).writeCommitResponse(clientMessage,
serverConnection, txCommitMessage);
+
+ doCallRealMethod().when(command).cmdExecute(clientMessage,
serverConnection, null, 1);
+ command.cmdExecute(clientMessage, serverConnection, null, 1);
+
+ verify(command, times(1)).writeCommitResponse(clientMessage,
serverConnection, txCommitMessage);
+ verify(serverConnection, times(1)).setAsTrue(Command.RESPONDED);
+ }
+
+ @Test
+ public void commandCanInvokeBeforeCompletion() throws Exception {
+
when(part0.getInt()).thenReturn(TXSynchronizationOp.CompletionType.BEFORE_COMPLETION.ordinal());
+
+ doCallRealMethod().when(command).cmdExecute(clientMessage,
serverConnection, null, 1);
+ command.cmdExecute(clientMessage, serverConnection, null, 1);
+
+ verify(txStateProxy, times(1)).beforeCompletion();
+ verify(serverConnection, times(1)).setAsTrue(Command.RESPONDED);
+ }
+
+ @Test
+ public void commandCanSendBackCommitMessageAfterInvokeAfterCompletion()
throws Exception {
+
when(part0.getInt()).thenReturn(TXSynchronizationOp.CompletionType.AFTER_COMPLETION.ordinal());
+ when(part2.getInt()).thenReturn(Status.STATUS_COMMITTED);
+ when(txStateProxy.getCommitMessage()).thenReturn(txCommitMessage);
+
+ doCallRealMethod().when(command).cmdExecute(clientMessage,
serverConnection, null, 1);
+ command.cmdExecute(clientMessage, serverConnection, null, 1);
+
+ verify(txStateProxy, times(1)).afterCompletion(Status.STATUS_COMMITTED);
+ verify(command, times(1)).writeCommitResponse(clientMessage,
serverConnection, txCommitMessage);
+ verify(serverConnection, times(1)).setAsTrue(Command.RESPONDED);
+ }
+}