Repository: incubator-geode Updated Branches: refs/heads/develop bd229d768 -> e01dbe6f5
GEODE-2091: Do not return false when containsValueForKey call failed in a transaction Correctly handle exception to fail the transaction instead of returning null. Add check for colocated buckets so that correct TrasactionException can be thrown. Fix containsKey method call as well. Add test cases in dunit test. Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/e01dbe6f Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/e01dbe6f Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/e01dbe6f Branch: refs/heads/develop Commit: e01dbe6f557a11aa99cd6ec4da349a38a97d678e Parents: bd229d7 Author: eshu <e...@pivotal.io> Authored: Thu Nov 10 14:57:48 2016 -0800 Committer: eshu <e...@pivotal.io> Committed: Thu Nov 10 15:02:58 2016 -0800 ---------------------------------------------------------------------- .../geode/internal/cache/TXStateStub.java | 4 + .../cache/tx/PartitionedTXRegionStub.java | 42 +++- .../apache/geode/disttx/PRDistTXDUnitTest.java | 8 + .../disttx/PRDistTXWithVersionsDUnitTest.java | 8 + .../cache/execute/PRTransactionDUnitTest.java | 200 ++++++++++++------- 5 files changed, 181 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e01dbe6f/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateStub.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateStub.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateStub.java index a6c78f2..5dd624b 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateStub.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateStub.java @@ -135,6 +135,10 @@ public abstract class TXStateStub implements TXStateInterface { return stub; } + public Map<Region<?, ?>, TXRegionStub> getRegionStubs() { + return this.regionStubs; + } + @Override public String toString() { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e01dbe6f/geode-core/src/main/java/org/apache/geode/internal/cache/tx/PartitionedTXRegionStub.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/PartitionedTXRegionStub.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/PartitionedTXRegionStub.java index 0e9c128..10ae7a5 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/PartitionedTXRegionStub.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/PartitionedTXRegionStub.java @@ -17,6 +17,7 @@ package org.apache.geode.internal.cache.tx; import org.apache.geode.CancelException; import org.apache.geode.cache.CacheClosedException; import org.apache.geode.cache.EntryNotFoundException; +import org.apache.geode.cache.Region; import org.apache.geode.cache.Region.Entry; import org.apache.geode.cache.TransactionDataNodeHasDepartedException; import org.apache.geode.cache.TransactionDataNotColocatedException; @@ -24,6 +25,7 @@ import org.apache.geode.cache.TransactionDataRebalancedException; import org.apache.geode.cache.TransactionException; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.cache.BucketNotFoundException; +import org.apache.geode.internal.cache.ColocationHelper; import org.apache.geode.internal.cache.DataLocationException; import org.apache.geode.internal.cache.DistributedPutAllOperation; import org.apache.geode.internal.cache.DistributedRemoveAllOperation; @@ -46,6 +48,7 @@ import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList; import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.offheap.annotations.Released; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -64,6 +67,9 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub { super(txstate, r); } + public Map<Integer, Boolean> getBuckets() { + return buckets; + } public void destroyExistingEntry(EntryEventImpl event, boolean cacheWrite, Object expectedOldValue) { @@ -107,16 +113,15 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub { } ex = ex.getCause(); } - if (keyInfo != null && !buckets.isEmpty() && !buckets.containsKey(keyInfo.getBucketId())) { - // for parent region if previous ops were successful and for child colocated regions - // where the bucketId was not previously encountered + + if (isKeyInNonColocatedBucket(keyInfo)) { return new TransactionDataNotColocatedException( LocalizedStrings.PartitionedRegion_KEY_0_NOT_COLOCATED_WITH_TRANSACTION .toLocalizedString(keyInfo.getKey())); } ex = cause; while (ex != null) { - if (ex instanceof PrimaryBucketException) { + if (ex instanceof PrimaryBucketException || ex instanceof BucketNotFoundException) { return new TransactionDataRebalancedException( LocalizedStrings.PartitionedRegion_TRANSACTIONAL_DATA_MOVED_DUE_TO_REBALANCING .toLocalizedString()); @@ -126,6 +131,23 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub { return new TransactionDataNodeHasDepartedException(cause.getLocalizedMessage()); } + // is this key in a different bucket from all the existing buckets + // of the underlying PR or its colocated PRs touched by the transaction. + private boolean isKeyInNonColocatedBucket(KeyInfo keyInfo) { + Map<Region<?, ?>, TXRegionStub> regionStubs = this.state.getRegionStubs(); + Collection<PartitionedRegion> colcatedRegions = (Collection<PartitionedRegion>) ColocationHelper + .getAllColocationRegions((PartitionedRegion) this.region).values(); + // get all colocated region buckets touched in the transaction + for (PartitionedRegion colcatedRegion : colcatedRegions) { + PartitionedTXRegionStub regionStub = + (PartitionedTXRegionStub) regionStubs.get(colcatedRegion); + if (regionStub != null) { + buckets.putAll(regionStub.getBuckets()); + } + } + return keyInfo != null && !buckets.isEmpty() && !buckets.containsKey(keyInfo.getBucketId()); + } + /** * wait to retry after getting a ForceReattemptException @@ -232,7 +254,9 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub { throw re; } catch (ForceReattemptException e) { if (isBucketNotFoundException(e)) { - return false; + RuntimeException re = getTransactionException(keyInfo, e); + re.initCause(e); + throw re; } waitToRetry(); RuntimeException re = new TransactionDataNodeHasDepartedException( @@ -274,7 +298,9 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub { throw re; } catch (ForceReattemptException e) { if (isBucketNotFoundException(e)) { - return false; + RuntimeException re = getTransactionException(keyInfo, e); + re.initCause(e); + throw re; } waitToRetry(); RuntimeException re = new TransactionDataNodeHasDepartedException( @@ -306,7 +332,9 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub { throw re; } catch (ForceReattemptException e) { if (isBucketNotFoundException(e)) { - return null; + RuntimeException re = getTransactionException(keyInfo, e); + re.initCause(e); + throw re; } waitToRetry(); RuntimeException re = getTransactionException(keyInfo, e); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e01dbe6f/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java b/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java index 150fe33..ed8d3c6 100644 --- a/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java @@ -43,6 +43,14 @@ public class PRDistTXDUnitTest extends PRTransactionDUnitTest { @Test public void testTxWithGetOnMovedBucket() {} + @Ignore("[DISTTX] TODO test overridden and intentionally left blank as it does not apply to disttx.") + @Test + public void testTxWithContainsValueForKeyOnMovedBucket() {} + + @Ignore("[DISTTX] TODO test overridden and intentionally left blank as it does not apply to disttx.") + @Test + public void testTxWithContainsKeyOnMovedBucket() {} + @Ignore("[DISTTX] TODO test overridden and intentionally left blank as they fail.") @Test public void testBasicPRTransactionRedundancy0() {} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e01dbe6f/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java b/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java index 8ff1d94..4e6f846 100644 --- a/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java @@ -43,6 +43,14 @@ public class PRDistTXWithVersionsDUnitTest extends PRTransactionWithVersionsDUni @Test public void testTxWithGetOnMovedBucket() {} + @Ignore("[DISTTX] TODO test overridden and intentionally left blank as it does not apply to disttx.") + @Test + public void testTxWithContainsValueForKeyOnMovedBucket() {} + + @Ignore("[DISTTX] TODO test overridden and intentionally left blank as it does not apply to disttx.") + @Test + public void testTxWithContainsKeyOnMovedBucket() {} + @Ignore("[DISTTX] TODO test overridden and intentionally left blank as they fail.") @Override @Test http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e01dbe6f/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java index 2122960..e2ba2b3 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java @@ -336,7 +336,7 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest { * * @param bucketRedundancy redundancy for the colocated PRs */ - protected void baiscPRTXWithNonColocatedGet(int bucketRedundancy) { + protected void basicPRTXWithNonColocatedGet(int bucketRedundancy) { dataStore1.invoke(runGetCache); dataStore2.invoke(runGetCache); redundancy = new Integer(bucketRedundancy); @@ -454,45 +454,26 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest { } /** - * This method executes a transaction with get on a key in a moved bucket, and expects transaction - * to fail with TransactionDataRebalancedException. + * This method executes a transaction with operation on a key in a moved bucket, and expects + * transaction to fail with TransactionDataRebalancedException. * + * @param op which entry op to be executed * @param bucketRedundancy redundancy for the colocated PRs */ @SuppressWarnings("unchecked") - protected void baiscPRTXWithGetOnMovedBucket(int bucketRedundancy) { - dataStore1.invoke(runGetCache); - dataStore2.invoke(runGetCache); - redundancy = new Integer(bucketRedundancy); - localMaxmemory = new Integer(50); - totalNumBuckets = new Integer(1); - - setAttributes(CustomerPartitionedRegionName, null); - - createPRInTwoNodes(); - - setAttributes(OrderPartitionedRegionName, CustomerPartitionedRegionName); - - createPRInTwoNodes(); - - // Put the customer 1 in CustomerPartitionedRegion - dataStore1.invoke( - () -> PRColocationDUnitTest.putCustomerPartitionedRegion(CustomerPartitionedRegionName, 1)); - - // Put the associated order in colocated OrderPartitionedRegion - dataStore1.invoke( - () -> PRColocationDUnitTest.putOrderPartitionedRegion(OrderPartitionedRegionName, 1)); + protected void basicPRTXWithOpOnMovedBucket(Op op, int bucketRedundancy) { + setupMoveBucket(bucketRedundancy); DistributedMember dm1 = (DistributedMember) dataStore1.invoke(getDM()); DistributedMember dm2 = (DistributedMember) dataStore2.invoke(getDM()); // First get transaction. TransactionId txId = (TransactionId) dataStore1.invoke(beginTx()); - dataStore1.invoke(resumeTx(txId, dm1, dm2)); + dataStore1.invoke(resumeTx(op, txId, dm1, dm2)); // Second one. Will go through different path (using TXState or TXStateStub) txId = (TransactionId) dataStore1.invoke(beginTx()); - dataStore1.invoke(resumeTx(txId, dm1, dm2)); + dataStore1.invoke(resumeTx(op, txId, dm1, dm2)); } @SuppressWarnings({"rawtypes", "serial"}) @@ -523,7 +504,7 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest { } @SuppressWarnings("serial") - private SerializableRunnable resumeTx(TransactionId txId, DistributedMember dm1, + private SerializableRunnable resumeTx(Op op, TransactionId txId, DistributedMember dm1, DistributedMember dm2) { return new SerializableRunnable("resume tx") { @Override @@ -532,57 +513,96 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest { .getRegion(Region.SEPARATOR + OrderPartitionedRegionName); CacheTransactionManager mgr = basicGetCache().getCacheTransactionManager(); - moveBucket(dm1, dm2); + moveBucket(op, dm1, dm2); - Assertions.assertThatThrownBy(() -> _resumeTx(txId, pr, mgr)) + Assertions.assertThatThrownBy(() -> _resumeTx(op, txId, pr, mgr)) .isInstanceOf(TransactionDataRebalancedException.class); } + }; + } - private void _resumeTx(TransactionId txId, PartitionedRegion pr, - CacheTransactionManager mgr) { - CustId cust1 = new CustId(1); - OrderId order1 = new OrderId(11, cust1); - mgr.resume(txId); - try { + enum Op { + GET, CONTAINSVALUEFORKEY, CONTAINSKEY; + } + + private void _resumeTx(Op op, TransactionId txId, PartitionedRegion pr, + CacheTransactionManager mgr) { + CustId cust1 = new CustId(1); + OrderId order1 = new OrderId(11, cust1); + mgr.resume(txId); + try { + switch (op) { + case GET: pr.get(order1); - } finally { - mgr.rollback(); - } + break; + case CONTAINSVALUEFORKEY: + pr.containsValueForKey(order1); + break; + case CONTAINSKEY: + pr.containsKey(order1); + break; + default: + throw new AssertionError("Unknown operations " + op); } - @SuppressWarnings("unchecked") - private void moveBucket(DistributedMember dm1, DistributedMember dm2) { - PartitionedRegion pr = (PartitionedRegion) basicGetCache() - .getRegion(Region.SEPARATOR + CustomerPartitionedRegionName); - CustId cust1 = new CustId(1); - OrderId order1 = new OrderId(11, cust1); - boolean isCust1Local = isCust1LocalSingleBucket(pr, cust1); - DistributedMember source = isCust1Local ? dm1 : dm2; - DistributedMember destination = isCust1Local ? dm2 : dm1; - PartitionedRegion prOrder = (PartitionedRegion) basicGetCache() - .getRegion(Region.SEPARATOR + OrderPartitionedRegionName); + } finally { + mgr.rollback(); + } + } - LogService.getLogger().info("source ={}, destination ={}", source, destination); - if (isCust1Local) { - // Use TXState - setBucketReadHook(order1, source, destination, prOrder); - } else { - // Use TXStateStub -- transaction data on remote node - PartitionRegionHelper.moveBucketByKey(prOrder, source, destination, order1); - } - } + @SuppressWarnings("unchecked") + private void moveBucket(Op op, DistributedMember dm1, DistributedMember dm2) { + PartitionedRegion pr = (PartitionedRegion) basicGetCache() + .getRegion(Region.SEPARATOR + CustomerPartitionedRegionName); + CustId cust1 = new CustId(1); + OrderId order1 = new OrderId(11, cust1); + boolean isCust1Local = isCust1LocalSingleBucket(pr, cust1); + DistributedMember source = isCust1Local ? dm1 : dm2; + DistributedMember destination = isCust1Local ? dm2 : dm1; + PartitionedRegion prOrder = (PartitionedRegion) basicGetCache() + .getRegion(Region.SEPARATOR + OrderPartitionedRegionName); + + LogService.getLogger().info("source ={}, destination ={}", source, destination); + + switch (op) { + case GET: + moveBucketForGet(order1, isCust1Local, source, destination, prOrder); + break; + case CONTAINSVALUEFORKEY: + case CONTAINSKEY: + PartitionRegionHelper.moveBucketByKey(prOrder, source, destination, order1); + break; + default: + throw new AssertionError("Unknown operations " + op); + } + } - private void setBucketReadHook(OrderId order1, DistributedMember source, - DistributedMember destination, PartitionedRegion prOrder) { - prOrder.getDataStore().setBucketReadHook(new Runnable() { - @SuppressWarnings("unchecked") - public void run() { - LogService.getLogger().info("In bucketReadHook"); - PartitionRegionHelper.moveBucketByKey(prOrder, source, destination, order1); - } - }); + @SuppressWarnings("unchecked") + private void moveBucketForGet(OrderId order1, boolean isCust1Local, DistributedMember source, + DistributedMember destination, PartitionedRegion prOrder) { + if (isCust1Local) { + // Use TXState + setBucketReadHook(order1, source, destination, prOrder); + } else { + // Use TXStateStub -- transaction data on remote node + PartitionRegionHelper.moveBucketByKey(prOrder, source, destination, order1); + } + } + + private void setBucketReadHook(OrderId order1, DistributedMember source, + DistributedMember destination, PartitionedRegion prOrder) { + prOrder.getDataStore().setBucketReadHook(new Runnable() { + @SuppressWarnings("unchecked") + public void run() { + LogService.getLogger().info("In bucketReadHook"); + PartitionRegionHelper.moveBucketByKey(prOrder, source, destination, order1); } - }; + }); + } + + private void createPRInTwoNodes() { + dataStore1.invoke(PRColocationDUnitTest.class, "createPR", this.attributeObjects); + dataStore2.invoke(PRColocationDUnitTest.class, "createPR", this.attributeObjects); } @SuppressWarnings("unchecked") @@ -591,20 +611,52 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest { return (Integer) localPrimaryBucketList.size() == 1; } + @SuppressWarnings("unchecked") + private void setupMoveBucket(int bucketRedundancy) { + dataStore1.invoke(runGetCache); + dataStore2.invoke(runGetCache); + redundancy = new Integer(bucketRedundancy); + localMaxmemory = new Integer(50); + totalNumBuckets = new Integer(1); - private void createPRInTwoNodes() { - dataStore1.invoke(PRColocationDUnitTest.class, "createPR", this.attributeObjects); - dataStore2.invoke(PRColocationDUnitTest.class, "createPR", this.attributeObjects); + setAttributes(CustomerPartitionedRegionName, null); + + createPRInTwoNodes(); + + setAttributes(OrderPartitionedRegionName, CustomerPartitionedRegionName); + + createPRInTwoNodes(); + + // Put the customer 1 in CustomerPartitionedRegion + dataStore1.invoke( + () -> PRColocationDUnitTest.putCustomerPartitionedRegion(CustomerPartitionedRegionName, 1)); + + // Put the associated order in colocated OrderPartitionedRegion + dataStore1.invoke( + () -> PRColocationDUnitTest.putOrderPartitionedRegion(OrderPartitionedRegionName, 1)); } @Test public void testTxWithNonColocatedGet() { - baiscPRTXWithNonColocatedGet(0); + basicPRTXWithNonColocatedGet(0); } @Test public void testTxWithGetOnMovedBucket() { - baiscPRTXWithGetOnMovedBucket(0); + Op op = Op.GET; + basicPRTXWithOpOnMovedBucket(op, 0); + } + + @Test + public void testTxWithContainsValueForKeyOnMovedBucket() { + Op op = Op.CONTAINSVALUEFORKEY; + basicPRTXWithOpOnMovedBucket(op, 0); + } + + @Test + public void testTxWithContainsKeyOnMovedBucket() { + Op op = Op.CONTAINSKEY; + basicPRTXWithOpOnMovedBucket(op, 0); } @Test