Repository: geode Updated Branches: refs/heads/develop 60fc526d3 -> 41296c98d
GEODE-2429: Do not execute JTA afterCompletion if transaction is rolled back in beforeCompletion JTA afterCompletion should not be executed if the transaction is already rolled back. Add a dunit test case showing transaction is cleaned up on server if beforeCompletion failed on client cache. Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/41296c98 Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/41296c98 Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/41296c98 Branch: refs/heads/develop Commit: 41296c98dd69ff5e06c541d3e3b9c3cfa3e137b3 Parents: 60fc526 Author: eshu <[email protected]> Authored: Thu Feb 9 15:42:33 2017 -0800 Committer: eshu <[email protected]> Committed: Thu Feb 9 15:42:33 2017 -0800 ---------------------------------------------------------------------- .../internal/cache/tx/ClientTXStateStub.java | 6 + .../geode/internal/jta/TransactionImpl.java | 4 + .../cache/ClientServerTransactionDUnitTest.java | 132 +++++++++++++++++++ 3 files changed, 142 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/41296c98/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java index e79324d..ad3f103 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java @@ -79,6 +79,8 @@ public class ClientTXStateStub extends TXStateStub { return !DISABLE_CONFLICT_CHECK_ON_CLIENT || recordedTransactionalOperations != null; } + private boolean txRolledback = false; + /** * test hook * @@ -199,6 +201,7 @@ public class ClientTXStateStub extends TXStateStub { this.internalAfterSendRollback.run(); } try { + txRolledback = true; this.firstProxy.rollback(proxy.getTxId().getUniqId()); } finally { this.firstProxy.getPool().releaseServerAffinity(); @@ -208,6 +211,9 @@ public class ClientTXStateStub extends TXStateStub { @Override public void afterCompletion(int status) { try { + if (txRolledback) { + return; + } TXCommitMessage txcm = this.firstProxy.afterCompletion(status, proxy.getTxId().getUniqId()); if (status == Status.STATUS_COMMITTED) { if (txcm == null) { http://git-wip-us.apache.org/repos/asf/geode/blob/41296c98/geode-core/src/main/java/org/apache/geode/internal/jta/TransactionImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/jta/TransactionImpl.java b/geode-core/src/main/java/org/apache/geode/internal/jta/TransactionImpl.java index f36079a..a5e80b6 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/jta/TransactionImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/jta/TransactionImpl.java @@ -271,4 +271,8 @@ public class TransactionImpl implements Transaction { List getSyncList() { return syncList; } + + public boolean notifyBeforeCompletionForTest() { + return notifyBeforeCompletion(); + } } http://git-wip-us.apache.org/repos/asf/geode/blob/41296c98/geode-core/src/test/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java index 270fbce..3771fa8 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java @@ -35,6 +35,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import javax.naming.Context; +import javax.transaction.RollbackException; +import javax.transaction.Synchronization; import javax.transaction.UserTransaction; import org.awaitility.Awaitility; @@ -59,6 +61,10 @@ import org.apache.geode.internal.cache.execute.data.Customer; import org.apache.geode.internal.cache.execute.data.Order; import org.apache.geode.internal.cache.execute.data.OrderId; import org.apache.geode.internal.cache.tx.ClientTXStateStub; +import org.apache.geode.internal.jta.SyncImpl; +import org.apache.geode.internal.jta.TransactionImpl; +import org.apache.geode.internal.jta.TransactionManagerImpl; +import org.apache.geode.internal.jta.UserTransactionImpl; import org.apache.geode.test.dunit.*; import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; @@ -3924,4 +3930,130 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest getGemfireCache().getDistributedSystem().disconnect(); } } + + String key1 = "KEY-1"; + String v1 = "VALUE-1"; + + @Test + public void testTXStateCleanedUpIfJTABeforeCompletionFailedOnClient() { + int copies = 1; + int totalBuckets = 1; + + Host host = Host.getHost(0); + VM server1 = host.getVM(0); + VM server2 = host.getVM(1); + + VM client1 = host.getVM(2); + + String regionName = "aRegion"; + final int port1 = createRegionsAndStartServer(server1, false); + // Create PR + server1.invoke(() -> { + createSubscriptionRegion(false, regionName, copies, totalBuckets); + Region r = getCache().getRegion(regionName); + r.put(key1, v1); + }); + + createRegionsAndStartServer(server2, false); + server2.invoke(() -> createSubscriptionRegion(false, regionName, copies, totalBuckets)); + + // Create client 1 + client1.invoke(() -> createClient(port1, regionName)); + + client1.invoke(() -> verifyClientCacheData(regionName)); + + client1.invoke(new SerializableCallable() { + public Object call() throws Exception { + final CountDownLatch latch1 = new CountDownLatch(1); + final CountDownLatch latch2 = new CountDownLatch(1); + Thread t1 = new Thread(new Runnable() { + public void run() { + doJTATx1(regionName, latch1, latch2); + } + }); + t1.start(); + doJTATx2(regionName, latch1, latch2); + t1.join(); + Region r = getClientRegion(regionName); + assertTrue("region data has been changed", r.get(key1).equals(v1)); + return null; + } + }); + + final DistributedMember clientId = (DistributedMember) client1.invoke(getClientDM()); + + server1.invoke(() -> verifyTXStateEmpty(clientId)); + server2.invoke(() -> verifyTXStateEmpty(clientId)); + } + + private void verifyTXStateEmpty(DistributedMember clientId) { + TXManagerImpl txmgr = getGemfireCache().getTxManager(); + Set<TXId> states = txmgr.getTransactionsForClient((InternalDistributedMember) clientId); + assertEquals(0, states.size()); // both transactions should be rolled back. + } + + private SerializableCallable getClientDM() { + return new SerializableCallable("getClientDM") { + @Override + public Object call() { + return getClientDMID(); + } + }; + } + + private DistributedMember getClientDMID() { + ClientCache cCache = getClientCache(null); + return cCache.getDistributedSystem().getDistributedMember(); + } + + private Region getClientRegion(String regionName) { + ClientCache cCache = getClientCache(null); + return cCache.getRegion(regionName); + } + + private void verifyClientCacheData(String regionName) { + Region r = getClientRegion(regionName); + assertTrue("region size is not 1", r.size() == 1); + } + + private void doJTATx1(String regionName, CountDownLatch latch1, CountDownLatch latch2) { + TransactionManagerImpl tm = TransactionManagerImpl.getTransactionManager(); + Region r = getClientRegion(regionName); + try { + UserTransaction utx = new UserTransactionImpl(); + utx.begin(); + latch1.await(); + r.put(key1, "value2"); + utx.commit(); + fail("Do not get expected RollbackException"); + } catch (Exception e) { + if (e instanceof RollbackException) { + // expected exception. + } else { + Assert.fail("Unexpected exception while doing JTA Transaction1 ", e); + } + } finally { + latch2.countDown(); + } + } + + private void doJTATx2(String regionName, CountDownLatch latch1, CountDownLatch latch2) { + try { + TransactionManagerImpl tm = TransactionManagerImpl.getTransactionManager(); + UserTransaction utx = new UserTransactionImpl(); + Region r = getClientRegion(regionName); + utx.begin(); + r.put(key1, "value3"); + TransactionImpl txn = (TransactionImpl) tm.getTransaction(); + Synchronization sync = new SyncImpl(); + txn.registerSynchronization(sync); + txn.notifyBeforeCompletionForTest(); + latch1.countDown(); + latch2.await(); + utx.rollback(); + } catch (Exception e) { + latch1.countDown(); + Assert.fail("Unexpected exception while doing JTA Transaction2 ", e); + } + } }
