Repository: geode
Updated Branches:
  refs/heads/develop 60fc526d3 -> 41296c98d


GEODE-2429: Do not execute JTA afterCompletion if transaction is rolled back in 
beforeCompletion

JTA afterCompletion should not be executed if
the transaction is already rolled back.
Add a dunit test case showing transaction is
cleaned up on server if beforeCompletion failed
on client cache.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/41296c98
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/41296c98
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/41296c98

Branch: refs/heads/develop
Commit: 41296c98dd69ff5e06c541d3e3b9c3cfa3e137b3
Parents: 60fc526
Author: eshu <[email protected]>
Authored: Thu Feb 9 15:42:33 2017 -0800
Committer: eshu <[email protected]>
Committed: Thu Feb 9 15:42:33 2017 -0800

----------------------------------------------------------------------
 .../internal/cache/tx/ClientTXStateStub.java    |   6 +
 .../geode/internal/jta/TransactionImpl.java     |   4 +
 .../cache/ClientServerTransactionDUnitTest.java | 132 +++++++++++++++++++
 3 files changed, 142 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/41296c98/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java
index e79324d..ad3f103 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java
@@ -79,6 +79,8 @@ public class ClientTXStateStub extends TXStateStub {
     return !DISABLE_CONFLICT_CHECK_ON_CLIENT || 
recordedTransactionalOperations != null;
   }
 
+  private boolean txRolledback = false;
+
   /**
    * test hook
    * 
@@ -199,6 +201,7 @@ public class ClientTXStateStub extends TXStateStub {
       this.internalAfterSendRollback.run();
     }
     try {
+      txRolledback = true;
       this.firstProxy.rollback(proxy.getTxId().getUniqId());
     } finally {
       this.firstProxy.getPool().releaseServerAffinity();
@@ -208,6 +211,9 @@ public class ClientTXStateStub extends TXStateStub {
   @Override
   public void afterCompletion(int status) {
     try {
+      if (txRolledback) {
+        return;
+      }
       TXCommitMessage txcm = this.firstProxy.afterCompletion(status, 
proxy.getTxId().getUniqId());
       if (status == Status.STATUS_COMMITTED) {
         if (txcm == null) {

http://git-wip-us.apache.org/repos/asf/geode/blob/41296c98/geode-core/src/main/java/org/apache/geode/internal/jta/TransactionImpl.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/jta/TransactionImpl.java 
b/geode-core/src/main/java/org/apache/geode/internal/jta/TransactionImpl.java
index f36079a..a5e80b6 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/jta/TransactionImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/jta/TransactionImpl.java
@@ -271,4 +271,8 @@ public class TransactionImpl implements Transaction {
   List getSyncList() {
     return syncList;
   }
+
+  public boolean notifyBeforeCompletionForTest() {
+    return notifyBeforeCompletion();
+  }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/41296c98/geode-core/src/test/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java
index 270fbce..3771fa8 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java
@@ -35,6 +35,8 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import javax.naming.Context;
+import javax.transaction.RollbackException;
+import javax.transaction.Synchronization;
 import javax.transaction.UserTransaction;
 
 import org.awaitility.Awaitility;
@@ -59,6 +61,10 @@ import org.apache.geode.internal.cache.execute.data.Customer;
 import org.apache.geode.internal.cache.execute.data.Order;
 import org.apache.geode.internal.cache.execute.data.OrderId;
 import org.apache.geode.internal.cache.tx.ClientTXStateStub;
+import org.apache.geode.internal.jta.SyncImpl;
+import org.apache.geode.internal.jta.TransactionImpl;
+import org.apache.geode.internal.jta.TransactionManagerImpl;
+import org.apache.geode.internal.jta.UserTransactionImpl;
 import org.apache.geode.test.dunit.*;
 
 import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
@@ -3924,4 +3930,130 @@ public class ClientServerTransactionDUnitTest extends 
RemoteTransactionDUnitTest
       getGemfireCache().getDistributedSystem().disconnect();
     }
   }
+
+  String key1 = "KEY-1";
+  String v1 = "VALUE-1";
+
+  @Test
+  public void testTXStateCleanedUpIfJTABeforeCompletionFailedOnClient() {
+    int copies = 1;
+    int totalBuckets = 1;
+
+    Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    VM server2 = host.getVM(1);
+
+    VM client1 = host.getVM(2);
+
+    String regionName = "aRegion";
+    final int port1 = createRegionsAndStartServer(server1, false);
+    // Create PR
+    server1.invoke(() -> {
+      createSubscriptionRegion(false, regionName, copies, totalBuckets);
+      Region r = getCache().getRegion(regionName);
+      r.put(key1, v1);
+    });
+
+    createRegionsAndStartServer(server2, false);
+    server2.invoke(() -> createSubscriptionRegion(false, regionName, copies, 
totalBuckets));
+
+    // Create client 1
+    client1.invoke(() -> createClient(port1, regionName));
+
+    client1.invoke(() -> verifyClientCacheData(regionName));
+
+    client1.invoke(new SerializableCallable() {
+      public Object call() throws Exception {
+        final CountDownLatch latch1 = new CountDownLatch(1);
+        final CountDownLatch latch2 = new CountDownLatch(1);
+        Thread t1 = new Thread(new Runnable() {
+          public void run() {
+            doJTATx1(regionName, latch1, latch2);
+          }
+        });
+        t1.start();
+        doJTATx2(regionName, latch1, latch2);
+        t1.join();
+        Region r = getClientRegion(regionName);
+        assertTrue("region data has been changed", r.get(key1).equals(v1));
+        return null;
+      }
+    });
+
+    final DistributedMember clientId = (DistributedMember) 
client1.invoke(getClientDM());
+
+    server1.invoke(() -> verifyTXStateEmpty(clientId));
+    server2.invoke(() -> verifyTXStateEmpty(clientId));
+  }
+
+  private void verifyTXStateEmpty(DistributedMember clientId) {
+    TXManagerImpl txmgr = getGemfireCache().getTxManager();
+    Set<TXId> states = 
txmgr.getTransactionsForClient((InternalDistributedMember) clientId);
+    assertEquals(0, states.size()); // both transactions should be rolled back.
+  }
+
+  private SerializableCallable getClientDM() {
+    return new SerializableCallable("getClientDM") {
+      @Override
+      public Object call() {
+        return getClientDMID();
+      }
+    };
+  }
+
+  private DistributedMember getClientDMID() {
+    ClientCache cCache = getClientCache(null);
+    return cCache.getDistributedSystem().getDistributedMember();
+  }
+
+  private Region getClientRegion(String regionName) {
+    ClientCache cCache = getClientCache(null);
+    return cCache.getRegion(regionName);
+  }
+
+  private void verifyClientCacheData(String regionName) {
+    Region r = getClientRegion(regionName);
+    assertTrue("region size is not 1", r.size() == 1);
+  }
+
+  private void doJTATx1(String regionName, CountDownLatch latch1, 
CountDownLatch latch2) {
+    TransactionManagerImpl tm = TransactionManagerImpl.getTransactionManager();
+    Region r = getClientRegion(regionName);
+    try {
+      UserTransaction utx = new UserTransactionImpl();
+      utx.begin();
+      latch1.await();
+      r.put(key1, "value2");
+      utx.commit();
+      fail("Do not get expected RollbackException");
+    } catch (Exception e) {
+      if (e instanceof RollbackException) {
+        // expected exception.
+      } else {
+        Assert.fail("Unexpected exception while doing JTA Transaction1 ", e);
+      }
+    } finally {
+      latch2.countDown();
+    }
+  }
+
+  private void doJTATx2(String regionName, CountDownLatch latch1, 
CountDownLatch latch2) {
+    try {
+      TransactionManagerImpl tm = 
TransactionManagerImpl.getTransactionManager();
+      UserTransaction utx = new UserTransactionImpl();
+      Region r = getClientRegion(regionName);
+      utx.begin();
+      r.put(key1, "value3");
+      TransactionImpl txn = (TransactionImpl) tm.getTransaction();
+      Synchronization sync = new SyncImpl();
+      txn.registerSynchronization(sync);
+      txn.notifyBeforeCompletionForTest();
+      latch1.countDown();
+      latch2.await();
+      utx.rollback();
+    } catch (Exception e) {
+      latch1.countDown();
+      Assert.fail("Unexpected exception while doing JTA Transaction2 ", e);
+    }
+  }
 }

Reply via email to