Repository: incubator-geode
Updated Branches:
  refs/heads/develop bd229d768 -> e01dbe6f5


GEODE-2091: Do not return false when containsValueForKey call failed in a 
transaction

Correctly handle exception to fail the transaction instead of returning null.
Add check for colocated buckets so that correct TrasactionException can be 
thrown.
Fix containsKey method call as well.
Add test cases in dunit test.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/e01dbe6f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/e01dbe6f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/e01dbe6f

Branch: refs/heads/develop
Commit: e01dbe6f557a11aa99cd6ec4da349a38a97d678e
Parents: bd229d7
Author: eshu <e...@pivotal.io>
Authored: Thu Nov 10 14:57:48 2016 -0800
Committer: eshu <e...@pivotal.io>
Committed: Thu Nov 10 15:02:58 2016 -0800

----------------------------------------------------------------------
 .../geode/internal/cache/TXStateStub.java       |   4 +
 .../cache/tx/PartitionedTXRegionStub.java       |  42 +++-
 .../apache/geode/disttx/PRDistTXDUnitTest.java  |   8 +
 .../disttx/PRDistTXWithVersionsDUnitTest.java   |   8 +
 .../cache/execute/PRTransactionDUnitTest.java   | 200 ++++++++++++-------
 5 files changed, 181 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e01dbe6f/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateStub.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateStub.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateStub.java
index a6c78f2..5dd624b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateStub.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateStub.java
@@ -135,6 +135,10 @@ public abstract class TXStateStub implements 
TXStateInterface {
     return stub;
   }
 
+  public Map<Region<?, ?>, TXRegionStub> getRegionStubs() {
+    return this.regionStubs;
+  }
+
 
   @Override
   public String toString() {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e01dbe6f/geode-core/src/main/java/org/apache/geode/internal/cache/tx/PartitionedTXRegionStub.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/PartitionedTXRegionStub.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/PartitionedTXRegionStub.java
index 0e9c128..10ae7a5 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/PartitionedTXRegionStub.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/PartitionedTXRegionStub.java
@@ -17,6 +17,7 @@ package org.apache.geode.internal.cache.tx;
 import org.apache.geode.CancelException;
 import org.apache.geode.cache.CacheClosedException;
 import org.apache.geode.cache.EntryNotFoundException;
+import org.apache.geode.cache.Region;
 import org.apache.geode.cache.Region.Entry;
 import org.apache.geode.cache.TransactionDataNodeHasDepartedException;
 import org.apache.geode.cache.TransactionDataNotColocatedException;
@@ -24,6 +25,7 @@ import 
org.apache.geode.cache.TransactionDataRebalancedException;
 import org.apache.geode.cache.TransactionException;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.BucketNotFoundException;
+import org.apache.geode.internal.cache.ColocationHelper;
 import org.apache.geode.internal.cache.DataLocationException;
 import org.apache.geode.internal.cache.DistributedPutAllOperation;
 import org.apache.geode.internal.cache.DistributedRemoveAllOperation;
@@ -46,6 +48,7 @@ import 
org.apache.geode.internal.cache.tier.sockets.VersionedObjectList;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.offheap.annotations.Released;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -64,6 +67,9 @@ public class PartitionedTXRegionStub extends 
AbstractPeerTXRegionStub {
     super(txstate, r);
   }
 
+  public Map<Integer, Boolean> getBuckets() {
+    return buckets;
+  }
 
   public void destroyExistingEntry(EntryEventImpl event, boolean cacheWrite,
       Object expectedOldValue) {
@@ -107,16 +113,15 @@ public class PartitionedTXRegionStub extends 
AbstractPeerTXRegionStub {
       }
       ex = ex.getCause();
     }
-    if (keyInfo != null && !buckets.isEmpty() && 
!buckets.containsKey(keyInfo.getBucketId())) {
-      // for parent region if previous ops were successful and for child 
colocated regions
-      // where the bucketId was not previously encountered
+
+    if (isKeyInNonColocatedBucket(keyInfo)) {
       return new TransactionDataNotColocatedException(
           
LocalizedStrings.PartitionedRegion_KEY_0_NOT_COLOCATED_WITH_TRANSACTION
               .toLocalizedString(keyInfo.getKey()));
     }
     ex = cause;
     while (ex != null) {
-      if (ex instanceof PrimaryBucketException) {
+      if (ex instanceof PrimaryBucketException || ex instanceof 
BucketNotFoundException) {
         return new TransactionDataRebalancedException(
             
LocalizedStrings.PartitionedRegion_TRANSACTIONAL_DATA_MOVED_DUE_TO_REBALANCING
                 .toLocalizedString());
@@ -126,6 +131,23 @@ public class PartitionedTXRegionStub extends 
AbstractPeerTXRegionStub {
     return new 
TransactionDataNodeHasDepartedException(cause.getLocalizedMessage());
   }
 
+  // is this key in a different bucket from all the existing buckets
+  // of the underlying PR or its colocated PRs touched by the transaction.
+  private boolean isKeyInNonColocatedBucket(KeyInfo keyInfo) {
+    Map<Region<?, ?>, TXRegionStub> regionStubs = this.state.getRegionStubs();
+    Collection<PartitionedRegion> colcatedRegions = 
(Collection<PartitionedRegion>) ColocationHelper
+        .getAllColocationRegions((PartitionedRegion) this.region).values();
+    // get all colocated region buckets touched in the transaction
+    for (PartitionedRegion colcatedRegion : colcatedRegions) {
+      PartitionedTXRegionStub regionStub =
+          (PartitionedTXRegionStub) regionStubs.get(colcatedRegion);
+      if (regionStub != null) {
+        buckets.putAll(regionStub.getBuckets());
+      }
+    }
+    return keyInfo != null && !buckets.isEmpty() && 
!buckets.containsKey(keyInfo.getBucketId());
+  }
+
 
   /**
    * wait to retry after getting a ForceReattemptException
@@ -232,7 +254,9 @@ public class PartitionedTXRegionStub extends 
AbstractPeerTXRegionStub {
       throw re;
     } catch (ForceReattemptException e) {
       if (isBucketNotFoundException(e)) {
-        return false;
+        RuntimeException re = getTransactionException(keyInfo, e);
+        re.initCause(e);
+        throw re;
       }
       waitToRetry();
       RuntimeException re = new TransactionDataNodeHasDepartedException(
@@ -274,7 +298,9 @@ public class PartitionedTXRegionStub extends 
AbstractPeerTXRegionStub {
       throw re;
     } catch (ForceReattemptException e) {
       if (isBucketNotFoundException(e)) {
-        return false;
+        RuntimeException re = getTransactionException(keyInfo, e);
+        re.initCause(e);
+        throw re;
       }
       waitToRetry();
       RuntimeException re = new TransactionDataNodeHasDepartedException(
@@ -306,7 +332,9 @@ public class PartitionedTXRegionStub extends 
AbstractPeerTXRegionStub {
       throw re;
     } catch (ForceReattemptException e) {
       if (isBucketNotFoundException(e)) {
-        return null;
+        RuntimeException re = getTransactionException(keyInfo, e);
+        re.initCause(e);
+        throw re;
       }
       waitToRetry();
       RuntimeException re = getTransactionException(keyInfo, e);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e01dbe6f/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java 
b/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java
index 150fe33..ed8d3c6 100644
--- a/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java
@@ -43,6 +43,14 @@ public class PRDistTXDUnitTest extends 
PRTransactionDUnitTest {
   @Test
   public void testTxWithGetOnMovedBucket() {}
 
+  @Ignore("[DISTTX] TODO test overridden and intentionally left blank as it 
does not apply to disttx.")
+  @Test
+  public void testTxWithContainsValueForKeyOnMovedBucket() {}
+
+  @Ignore("[DISTTX] TODO test overridden and intentionally left blank as it 
does not apply to disttx.")
+  @Test
+  public void testTxWithContainsKeyOnMovedBucket() {}
+
   @Ignore("[DISTTX] TODO test overridden and intentionally left blank as they 
fail.")
   @Test
   public void testBasicPRTransactionRedundancy0() {}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e01dbe6f/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java
index 8ff1d94..4e6f846 100644
--- 
a/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java
@@ -43,6 +43,14 @@ public class PRDistTXWithVersionsDUnitTest extends 
PRTransactionWithVersionsDUni
   @Test
   public void testTxWithGetOnMovedBucket() {}
 
+  @Ignore("[DISTTX] TODO test overridden and intentionally left blank as it 
does not apply to disttx.")
+  @Test
+  public void testTxWithContainsValueForKeyOnMovedBucket() {}
+
+  @Ignore("[DISTTX] TODO test overridden and intentionally left blank as it 
does not apply to disttx.")
+  @Test
+  public void testTxWithContainsKeyOnMovedBucket() {}
+
   @Ignore("[DISTTX] TODO test overridden and intentionally left blank as they 
fail.")
   @Override
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e01dbe6f/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java
index 2122960..e2ba2b3 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java
@@ -336,7 +336,7 @@ public class PRTransactionDUnitTest extends 
PRColocationDUnitTest {
    * 
    * @param bucketRedundancy redundancy for the colocated PRs
    */
-  protected void baiscPRTXWithNonColocatedGet(int bucketRedundancy) {
+  protected void basicPRTXWithNonColocatedGet(int bucketRedundancy) {
     dataStore1.invoke(runGetCache);
     dataStore2.invoke(runGetCache);
     redundancy = new Integer(bucketRedundancy);
@@ -454,45 +454,26 @@ public class PRTransactionDUnitTest extends 
PRColocationDUnitTest {
   }
 
   /**
-   * This method executes a transaction with get on a key in a moved bucket, 
and expects transaction
-   * to fail with TransactionDataRebalancedException.
+   * This method executes a transaction with operation on a key in a moved 
bucket, and expects
+   * transaction to fail with TransactionDataRebalancedException.
    * 
+   * @param op which entry op to be executed
    * @param bucketRedundancy redundancy for the colocated PRs
    */
   @SuppressWarnings("unchecked")
-  protected void baiscPRTXWithGetOnMovedBucket(int bucketRedundancy) {
-    dataStore1.invoke(runGetCache);
-    dataStore2.invoke(runGetCache);
-    redundancy = new Integer(bucketRedundancy);
-    localMaxmemory = new Integer(50);
-    totalNumBuckets = new Integer(1);
-
-    setAttributes(CustomerPartitionedRegionName, null);
-
-    createPRInTwoNodes();
-
-    setAttributes(OrderPartitionedRegionName, CustomerPartitionedRegionName);
-
-    createPRInTwoNodes();
-
-    // Put the customer 1 in CustomerPartitionedRegion
-    dataStore1.invoke(
-        () -> 
PRColocationDUnitTest.putCustomerPartitionedRegion(CustomerPartitionedRegionName,
 1));
-
-    // Put the associated order in colocated OrderPartitionedRegion
-    dataStore1.invoke(
-        () -> 
PRColocationDUnitTest.putOrderPartitionedRegion(OrderPartitionedRegionName, 1));
+  protected void basicPRTXWithOpOnMovedBucket(Op op, int bucketRedundancy) {
+    setupMoveBucket(bucketRedundancy);
 
     DistributedMember dm1 = (DistributedMember) dataStore1.invoke(getDM());
     DistributedMember dm2 = (DistributedMember) dataStore2.invoke(getDM());
 
     // First get transaction.
     TransactionId txId = (TransactionId) dataStore1.invoke(beginTx());
-    dataStore1.invoke(resumeTx(txId, dm1, dm2));
+    dataStore1.invoke(resumeTx(op, txId, dm1, dm2));
 
     // Second one. Will go through different path (using TXState or 
TXStateStub)
     txId = (TransactionId) dataStore1.invoke(beginTx());
-    dataStore1.invoke(resumeTx(txId, dm1, dm2));
+    dataStore1.invoke(resumeTx(op, txId, dm1, dm2));
   }
 
   @SuppressWarnings({"rawtypes", "serial"})
@@ -523,7 +504,7 @@ public class PRTransactionDUnitTest extends 
PRColocationDUnitTest {
   }
 
   @SuppressWarnings("serial")
-  private SerializableRunnable resumeTx(TransactionId txId, DistributedMember 
dm1,
+  private SerializableRunnable resumeTx(Op op, TransactionId txId, 
DistributedMember dm1,
       DistributedMember dm2) {
     return new SerializableRunnable("resume tx") {
       @Override
@@ -532,57 +513,96 @@ public class PRTransactionDUnitTest extends 
PRColocationDUnitTest {
             .getRegion(Region.SEPARATOR + OrderPartitionedRegionName);
         CacheTransactionManager mgr = 
basicGetCache().getCacheTransactionManager();
 
-        moveBucket(dm1, dm2);
+        moveBucket(op, dm1, dm2);
 
-        Assertions.assertThatThrownBy(() -> _resumeTx(txId, pr, mgr))
+        Assertions.assertThatThrownBy(() -> _resumeTx(op, txId, pr, mgr))
             .isInstanceOf(TransactionDataRebalancedException.class);
       }
+    };
+  }
 
-      private void _resumeTx(TransactionId txId, PartitionedRegion pr,
-          CacheTransactionManager mgr) {
-        CustId cust1 = new CustId(1);
-        OrderId order1 = new OrderId(11, cust1);
-        mgr.resume(txId);
-        try {
+  enum Op {
+    GET, CONTAINSVALUEFORKEY, CONTAINSKEY;
+  }
+
+  private void _resumeTx(Op op, TransactionId txId, PartitionedRegion pr,
+      CacheTransactionManager mgr) {
+    CustId cust1 = new CustId(1);
+    OrderId order1 = new OrderId(11, cust1);
+    mgr.resume(txId);
+    try {
+      switch (op) {
+        case GET:
           pr.get(order1);
-        } finally {
-          mgr.rollback();
-        }
+          break;
+        case CONTAINSVALUEFORKEY:
+          pr.containsValueForKey(order1);
+          break;
+        case CONTAINSKEY:
+          pr.containsKey(order1);
+          break;
+        default:
+          throw new AssertionError("Unknown operations " + op);
       }
 
-      @SuppressWarnings("unchecked")
-      private void moveBucket(DistributedMember dm1, DistributedMember dm2) {
-        PartitionedRegion pr = (PartitionedRegion) basicGetCache()
-            .getRegion(Region.SEPARATOR + CustomerPartitionedRegionName);
-        CustId cust1 = new CustId(1);
-        OrderId order1 = new OrderId(11, cust1);
-        boolean isCust1Local = isCust1LocalSingleBucket(pr, cust1);
-        DistributedMember source = isCust1Local ? dm1 : dm2;
-        DistributedMember destination = isCust1Local ? dm2 : dm1;
-        PartitionedRegion prOrder = (PartitionedRegion) basicGetCache()
-            .getRegion(Region.SEPARATOR + OrderPartitionedRegionName);
+    } finally {
+      mgr.rollback();
+    }
+  }
 
-        LogService.getLogger().info("source ={}, destination ={}", source, 
destination);
-        if (isCust1Local) {
-          // Use TXState
-          setBucketReadHook(order1, source, destination, prOrder);
-        } else {
-          // Use TXStateStub -- transaction data on remote node
-          PartitionRegionHelper.moveBucketByKey(prOrder, source, destination, 
order1);
-        }
-      }
+  @SuppressWarnings("unchecked")
+  private void moveBucket(Op op, DistributedMember dm1, DistributedMember dm2) 
{
+    PartitionedRegion pr = (PartitionedRegion) basicGetCache()
+        .getRegion(Region.SEPARATOR + CustomerPartitionedRegionName);
+    CustId cust1 = new CustId(1);
+    OrderId order1 = new OrderId(11, cust1);
+    boolean isCust1Local = isCust1LocalSingleBucket(pr, cust1);
+    DistributedMember source = isCust1Local ? dm1 : dm2;
+    DistributedMember destination = isCust1Local ? dm2 : dm1;
+    PartitionedRegion prOrder = (PartitionedRegion) basicGetCache()
+        .getRegion(Region.SEPARATOR + OrderPartitionedRegionName);
+
+    LogService.getLogger().info("source ={}, destination ={}", source, 
destination);
+
+    switch (op) {
+      case GET:
+        moveBucketForGet(order1, isCust1Local, source, destination, prOrder);
+        break;
+      case CONTAINSVALUEFORKEY:
+      case CONTAINSKEY:
+        PartitionRegionHelper.moveBucketByKey(prOrder, source, destination, 
order1);
+        break;
+      default:
+        throw new AssertionError("Unknown operations " + op);
+    }
+  }
 
-      private void setBucketReadHook(OrderId order1, DistributedMember source,
-          DistributedMember destination, PartitionedRegion prOrder) {
-        prOrder.getDataStore().setBucketReadHook(new Runnable() {
-          @SuppressWarnings("unchecked")
-          public void run() {
-            LogService.getLogger().info("In bucketReadHook");
-            PartitionRegionHelper.moveBucketByKey(prOrder, source, 
destination, order1);
-          }
-        });
+  @SuppressWarnings("unchecked")
+  private void moveBucketForGet(OrderId order1, boolean isCust1Local, 
DistributedMember source,
+      DistributedMember destination, PartitionedRegion prOrder) {
+    if (isCust1Local) {
+      // Use TXState
+      setBucketReadHook(order1, source, destination, prOrder);
+    } else {
+      // Use TXStateStub -- transaction data on remote node
+      PartitionRegionHelper.moveBucketByKey(prOrder, source, destination, 
order1);
+    }
+  }
+
+  private void setBucketReadHook(OrderId order1, DistributedMember source,
+      DistributedMember destination, PartitionedRegion prOrder) {
+    prOrder.getDataStore().setBucketReadHook(new Runnable() {
+      @SuppressWarnings("unchecked")
+      public void run() {
+        LogService.getLogger().info("In bucketReadHook");
+        PartitionRegionHelper.moveBucketByKey(prOrder, source, destination, 
order1);
       }
-    };
+    });
+  }
+
+  private void createPRInTwoNodes() {
+    dataStore1.invoke(PRColocationDUnitTest.class, "createPR", 
this.attributeObjects);
+    dataStore2.invoke(PRColocationDUnitTest.class, "createPR", 
this.attributeObjects);
   }
 
   @SuppressWarnings("unchecked")
@@ -591,20 +611,52 @@ public class PRTransactionDUnitTest extends 
PRColocationDUnitTest {
     return (Integer) localPrimaryBucketList.size() == 1;
   }
 
+  @SuppressWarnings("unchecked")
+  private void setupMoveBucket(int bucketRedundancy) {
+    dataStore1.invoke(runGetCache);
+    dataStore2.invoke(runGetCache);
+    redundancy = new Integer(bucketRedundancy);
+    localMaxmemory = new Integer(50);
+    totalNumBuckets = new Integer(1);
 
-  private void createPRInTwoNodes() {
-    dataStore1.invoke(PRColocationDUnitTest.class, "createPR", 
this.attributeObjects);
-    dataStore2.invoke(PRColocationDUnitTest.class, "createPR", 
this.attributeObjects);
+    setAttributes(CustomerPartitionedRegionName, null);
+
+    createPRInTwoNodes();
+
+    setAttributes(OrderPartitionedRegionName, CustomerPartitionedRegionName);
+
+    createPRInTwoNodes();
+
+    // Put the customer 1 in CustomerPartitionedRegion
+    dataStore1.invoke(
+        () -> 
PRColocationDUnitTest.putCustomerPartitionedRegion(CustomerPartitionedRegionName,
 1));
+
+    // Put the associated order in colocated OrderPartitionedRegion
+    dataStore1.invoke(
+        () -> 
PRColocationDUnitTest.putOrderPartitionedRegion(OrderPartitionedRegionName, 1));
   }
 
   @Test
   public void testTxWithNonColocatedGet() {
-    baiscPRTXWithNonColocatedGet(0);
+    basicPRTXWithNonColocatedGet(0);
   }
 
   @Test
   public void testTxWithGetOnMovedBucket() {
-    baiscPRTXWithGetOnMovedBucket(0);
+    Op op = Op.GET;
+    basicPRTXWithOpOnMovedBucket(op, 0);
+  }
+
+  @Test
+  public void testTxWithContainsValueForKeyOnMovedBucket() {
+    Op op = Op.CONTAINSVALUEFORKEY;
+    basicPRTXWithOpOnMovedBucket(op, 0);
+  }
+
+  @Test
+  public void testTxWithContainsKeyOnMovedBucket() {
+    Op op = Op.CONTAINSKEY;
+    basicPRTXWithOpOnMovedBucket(op, 0);
   }
 
   @Test

Reply via email to