Repository: incubator-geode Updated Branches: refs/heads/develop 02d962c20 -> 89c522ad6
GEODE-2077: Throw appropriate exceptions when get op in a transaction failed instead of return null. Also added a dunit test which fails without the fix. Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/89c522ad Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/89c522ad Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/89c522ad Branch: refs/heads/develop Commit: 89c522ad6ff75766c6ee80d0c45e995d571a57f2 Parents: 02d962c Author: eshu <[email protected]> Authored: Mon Nov 7 14:45:50 2016 -0800 Committer: eshu <[email protected]> Committed: Mon Nov 7 14:45:50 2016 -0800 ---------------------------------------------------------------------- .../geode/internal/cache/PartitionedRegion.java | 24 ++- .../apache/geode/disttx/PRDistTXDUnitTest.java | 4 + .../disttx/PRDistTXWithVersionsDUnitTest.java | 4 + .../cache/execute/PRColocationDUnitTest.java | 6 +- .../cache/execute/PRTransactionDUnitTest.java | 171 ++++++++++++++++++- 5 files changed, 199 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/89c522ad/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java index 8f67e25..96c58d5 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java @@ -3965,7 +3965,16 @@ public class PartitionedRegion extends LocalRegion if (allowRetry) { retryNode = getNodeForBucketReadOrLoad(bucketId); } else { - return null; + // Only transactions set allowRetry to false, + // fail the transaction here as region is destroyed. + Throwable cause = pde.getCause(); + if (cause != null && cause instanceof RegionDestroyedException) { + throw (RegionDestroyedException) cause; + } else { + // Should not see it currently, all current constructors of PRLocallyDestroyedException + // set the cause to RegionDestroyedException. + throw new RegionDestroyedException(toString(), getFullPath()); + } } } catch (ForceReattemptException prce) { prce.checkKey(key); @@ -3990,6 +3999,7 @@ public class PartitionedRegion extends LocalRegion retryTime.waitToRetryNode(); } } else { + // with transaction if (prce instanceof BucketNotFoundException) { TransactionException ex = new TransactionDataNotColocatedException( LocalizedStrings.PartitionedRegion_KEY_0_NOT_COLOCATED_WITH_TRANSACTION @@ -4002,8 +4012,18 @@ public class PartitionedRegion extends LocalRegion throw (PrimaryBucketException) cause; } else if (cause instanceof TransactionDataRebalancedException) { throw (TransactionDataRebalancedException) cause; + } else if (cause instanceof RegionDestroyedException) { + TransactionException ex = new TransactionDataRebalancedException( + LocalizedStrings.PartitionedRegion_TRANSACTIONAL_DATA_MOVED_DUE_TO_REBALANCING + .toLocalizedString(key)); + ex.initCause(cause); + throw ex; } else { - return null; + // Make transaction fail so client could retry + // instead of returning null if ForceReattemptException is thrown. + // Should not see it currently, added to be protected against future changes. + TransactionException ex = new TransactionException("Failed to get key: " + key, prce); + throw ex; } } } catch (PrimaryBucketException notPrimary) { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/89c522ad/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java b/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java index 024776a..150fe33 100644 --- a/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java @@ -39,6 +39,10 @@ public class PRDistTXDUnitTest extends PRTransactionDUnitTest { @Test public void testTxWithNonColocatedGet() {} + @Ignore("[DISTTX] TODO test overridden and intentionally left blank as it does not apply to disttx.") + @Test + public void testTxWithGetOnMovedBucket() {} + @Ignore("[DISTTX] TODO test overridden and intentionally left blank as they fail.") @Test public void testBasicPRTransactionRedundancy0() {} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/89c522ad/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java b/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java index aefb581..8ff1d94 100644 --- a/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java @@ -39,6 +39,10 @@ public class PRDistTXWithVersionsDUnitTest extends PRTransactionWithVersionsDUni @Test public void testTxWithNonColocatedGet() {} + @Ignore("[DISTTX] TODO test overridden and intentionally left blank as it does not apply to disttx.") + @Test + public void testTxWithGetOnMovedBucket() {} + @Ignore("[DISTTX] TODO test overridden and intentionally left blank as they fail.") @Override @Test http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/89c522ad/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRColocationDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRColocationDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRColocationDUnitTest.java index bedb1d4..b018bfd 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRColocationDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRColocationDUnitTest.java @@ -2371,10 +2371,14 @@ public class PRColocationDUnitTest extends JUnit4CacheTestCase { } public static void putOrderPartitionedRegion(String partitionedRegionName) { + putOrderPartitionedRegion(partitionedRegionName, 10); + } + + public static void putOrderPartitionedRegion(String partitionedRegionName, int numOfCust) { assertNotNull(basicGetCache()); Region partitionedregion = basicGetCache().getRegion(Region.SEPARATOR + partitionedRegionName); assertNotNull(partitionedregion); - for (int i = 1; i <= 10; i++) { + for (int i = 1; i <= numOfCust; i++) { CustId custid = new CustId(i); for (int j = 1; j <= 10; j++) { int oid = (i * 10) + j; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/89c522ad/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java index 37ea4e5..2122960 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java @@ -39,13 +39,17 @@ import org.apache.geode.cache.Region; import org.apache.geode.cache.Region.Entry; import org.apache.geode.cache.TransactionDataNotColocatedException; import org.apache.geode.cache.TransactionDataRebalancedException; +import org.apache.geode.cache.TransactionId; import org.apache.geode.cache.execute.Execution; import org.apache.geode.cache.execute.Function; import org.apache.geode.cache.execute.FunctionException; import org.apache.geode.cache.execute.FunctionService; +import org.apache.geode.cache.partition.PartitionRegionHelper; import org.apache.geode.cache.util.CacheListenerAdapter; +import org.apache.geode.distributed.DistributedMember; import org.apache.geode.internal.NanoTimer; import org.apache.geode.internal.cache.ForceReattemptException; +import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.internal.cache.PartitionedRegion; import org.apache.geode.internal.cache.TXManagerImpl; import org.apache.geode.internal.cache.execute.data.CustId; @@ -341,8 +345,7 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest { setAttributes(CustomerPartitionedRegionName, null); - dataStore1.invoke(PRColocationDUnitTest.class, "createPR", this.attributeObjects); - dataStore2.invoke(PRColocationDUnitTest.class, "createPR", this.attributeObjects); + createPRInTwoNodes(); // Put the customer 1-2 in CustomerPartitionedRegion dataStore1.invoke( @@ -404,17 +407,14 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest { } } - @SuppressWarnings("unchecked") + private void performGetTx() { PartitionedRegion pr = (PartitionedRegion) basicGetCache() .getRegion(Region.SEPARATOR + CustomerPartitionedRegionName); CacheTransactionManager mgr = pr.getCache().getCacheTransactionManager(); CustId cust1 = new CustId(1); CustId cust2 = new CustId(2); - int bucketId1 = pr.getKeyInfo(cust1).getBucketId(); - List<Integer> localPrimaryBucketList = pr.getLocalPrimaryBucketsListTestOnly(); - assertTrue(localPrimaryBucketList.size() == 1); - boolean isCust1Local = (Integer) localPrimaryBucketList.get(0) == bucketId1; + boolean isCust1Local = isCust1Local(pr, cust1); // touch first get on remote node -- using TXStateStub Assertions.assertThatThrownBy(() -> getTx(!isCust1Local, mgr, pr, cust1, cust2)) @@ -425,6 +425,14 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest { .isInstanceOf(TransactionDataNotColocatedException.class); } + @SuppressWarnings("unchecked") + private boolean isCust1Local(PartitionedRegion pr, CustId cust1) { + int bucketId1 = pr.getKeyInfo(cust1).getBucketId(); + List<Integer> localPrimaryBucketList = pr.getLocalPrimaryBucketsListTestOnly(); + assertTrue(localPrimaryBucketList.size() == 1); + return (Integer) localPrimaryBucketList.get(0) == bucketId1; + } + private void getTx(boolean doCust1First, CacheTransactionManager mgr, PartitionedRegion pr, CustId cust1, CustId cust2) { CustId first = doCust1First ? cust1 : cust2; @@ -445,12 +453,161 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest { } } + /** + * This method executes a transaction with get on a key in a moved bucket, and expects transaction + * to fail with TransactionDataRebalancedException. + * + * @param bucketRedundancy redundancy for the colocated PRs + */ + @SuppressWarnings("unchecked") + protected void baiscPRTXWithGetOnMovedBucket(int bucketRedundancy) { + dataStore1.invoke(runGetCache); + dataStore2.invoke(runGetCache); + redundancy = new Integer(bucketRedundancy); + localMaxmemory = new Integer(50); + totalNumBuckets = new Integer(1); + + setAttributes(CustomerPartitionedRegionName, null); + + createPRInTwoNodes(); + + setAttributes(OrderPartitionedRegionName, CustomerPartitionedRegionName); + + createPRInTwoNodes(); + + // Put the customer 1 in CustomerPartitionedRegion + dataStore1.invoke( + () -> PRColocationDUnitTest.putCustomerPartitionedRegion(CustomerPartitionedRegionName, 1)); + + // Put the associated order in colocated OrderPartitionedRegion + dataStore1.invoke( + () -> PRColocationDUnitTest.putOrderPartitionedRegion(OrderPartitionedRegionName, 1)); + + DistributedMember dm1 = (DistributedMember) dataStore1.invoke(getDM()); + DistributedMember dm2 = (DistributedMember) dataStore2.invoke(getDM()); + + // First get transaction. + TransactionId txId = (TransactionId) dataStore1.invoke(beginTx()); + dataStore1.invoke(resumeTx(txId, dm1, dm2)); + + // Second one. Will go through different path (using TXState or TXStateStub) + txId = (TransactionId) dataStore1.invoke(beginTx()); + dataStore1.invoke(resumeTx(txId, dm1, dm2)); + } + + @SuppressWarnings({"rawtypes", "serial"}) + private SerializableCallable getDM() { + return new SerializableCallable("getDM") { + @Override + public Object call() { + return ((GemFireCacheImpl) basicGetCache()).getMyId(); + } + }; + } + + @SuppressWarnings({"rawtypes", "serial"}) + private SerializableCallable beginTx() { + return new SerializableCallable("begin tx") { + @Override + public Object call() { + PartitionedRegion pr = (PartitionedRegion) basicGetCache() + .getRegion(Region.SEPARATOR + CustomerPartitionedRegionName); + CacheTransactionManager mgr = basicGetCache().getCacheTransactionManager(); + CustId cust1 = new CustId(1); + mgr.begin(); + Object value = pr.get(cust1); + assertNotNull(value); + return mgr.suspend(); + } + }; + } + + @SuppressWarnings("serial") + private SerializableRunnable resumeTx(TransactionId txId, DistributedMember dm1, + DistributedMember dm2) { + return new SerializableRunnable("resume tx") { + @Override + public void run() { + PartitionedRegion pr = (PartitionedRegion) basicGetCache() + .getRegion(Region.SEPARATOR + OrderPartitionedRegionName); + CacheTransactionManager mgr = basicGetCache().getCacheTransactionManager(); + + moveBucket(dm1, dm2); + + Assertions.assertThatThrownBy(() -> _resumeTx(txId, pr, mgr)) + .isInstanceOf(TransactionDataRebalancedException.class); + } + + private void _resumeTx(TransactionId txId, PartitionedRegion pr, + CacheTransactionManager mgr) { + CustId cust1 = new CustId(1); + OrderId order1 = new OrderId(11, cust1); + mgr.resume(txId); + try { + pr.get(order1); + } finally { + mgr.rollback(); + } + } + + @SuppressWarnings("unchecked") + private void moveBucket(DistributedMember dm1, DistributedMember dm2) { + PartitionedRegion pr = (PartitionedRegion) basicGetCache() + .getRegion(Region.SEPARATOR + CustomerPartitionedRegionName); + CustId cust1 = new CustId(1); + OrderId order1 = new OrderId(11, cust1); + boolean isCust1Local = isCust1LocalSingleBucket(pr, cust1); + DistributedMember source = isCust1Local ? dm1 : dm2; + DistributedMember destination = isCust1Local ? dm2 : dm1; + PartitionedRegion prOrder = (PartitionedRegion) basicGetCache() + .getRegion(Region.SEPARATOR + OrderPartitionedRegionName); + + LogService.getLogger().info("source ={}, destination ={}", source, destination); + if (isCust1Local) { + // Use TXState + setBucketReadHook(order1, source, destination, prOrder); + } else { + // Use TXStateStub -- transaction data on remote node + PartitionRegionHelper.moveBucketByKey(prOrder, source, destination, order1); + } + } + + private void setBucketReadHook(OrderId order1, DistributedMember source, + DistributedMember destination, PartitionedRegion prOrder) { + prOrder.getDataStore().setBucketReadHook(new Runnable() { + @SuppressWarnings("unchecked") + public void run() { + LogService.getLogger().info("In bucketReadHook"); + PartitionRegionHelper.moveBucketByKey(prOrder, source, destination, order1); + } + }); + } + }; + } + + @SuppressWarnings("unchecked") + private boolean isCust1LocalSingleBucket(PartitionedRegion pr, CustId cust1) { + List<Integer> localPrimaryBucketList = pr.getLocalPrimaryBucketsListTestOnly(); + return (Integer) localPrimaryBucketList.size() == 1; + } + + + private void createPRInTwoNodes() { + dataStore1.invoke(PRColocationDUnitTest.class, "createPR", this.attributeObjects); + dataStore2.invoke(PRColocationDUnitTest.class, "createPR", this.attributeObjects); + } + @Test public void testTxWithNonColocatedGet() { baiscPRTXWithNonColocatedGet(0); } @Test + public void testTxWithGetOnMovedBucket() { + baiscPRTXWithGetOnMovedBucket(0); + } + + @Test public void testPRTXInCacheListenerRedundancy0() { basicPRTXInCacheListener(0); }
