GEODE-2021: Non colocated gets in a transaction should get TransactionDataNotColocatedException
Throw TransactionDataNotColocatedException when get locally failed with BucketNotFoundException Added a dunit test with two transactions with gets that will use TXStateStub or TXState based on data location. Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/56917a26 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/56917a26 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/56917a26 Branch: refs/heads/feature/GEODE-288 Commit: 56917a26a8916b83f0cec6e85285b5040ff66ee6 Parents: fadd92b Author: eshu <[email protected]> Authored: Fri Oct 21 11:43:36 2016 -0700 Committer: eshu <[email protected]> Committed: Fri Oct 21 11:43:36 2016 -0700 ---------------------------------------------------------------------- .../geode/internal/cache/PartitionedRegion.java | 6 + .../apache/geode/disttx/PRDistTXDUnitTest.java | 5 + .../disttx/PRDistTXWithVersionsDUnitTest.java | 5 + .../cache/execute/PRColocationDUnitTest.java | 6 +- .../cache/execute/PRTransactionDUnitTest.java | 131 ++++++++++++++++++- 5 files changed, 151 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/56917a26/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java index f7ecdaf..df52764 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java @@ -4105,6 +4105,12 @@ public class PartitionedRegion extends LocalRegion implements retryTime.waitToRetryNode(); } } else { + if (prce instanceof BucketNotFoundException) { + TransactionException ex = new TransactionDataNotColocatedException(LocalizedStrings. + PartitionedRegion_KEY_0_NOT_COLOCATED_WITH_TRANSACTION.toLocalizedString(key)); + ex.initCause(prce); + throw ex; + } Throwable cause = prce.getCause(); if (cause instanceof PrimaryBucketException) { throw (PrimaryBucketException)cause; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/56917a26/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java b/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java index f36085b..68a83f1 100644 --- a/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java @@ -37,6 +37,11 @@ public class PRDistTXDUnitTest extends PRTransactionDUnitTest { return props; } + @Ignore("[DISTTX] TODO test overridden and intentionally left blank as it does not apply to disttx.") + @Test + public void testTxWithNonColocatedGet() { + } + @Ignore("[DISTTX] TODO test overridden and intentionally left blank as they fail.") @Test public void testBasicPRTransactionRedundancy0() { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/56917a26/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java b/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java index 268c2ed..d692468 100644 --- a/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java @@ -37,6 +37,11 @@ public class PRDistTXWithVersionsDUnitTest extends PRTransactionWithVersionsDUni return props; } + @Ignore("[DISTTX] TODO test overridden and intentionally left blank as it does not apply to disttx.") + @Test + public void testTxWithNonColocatedGet() { + } + @Ignore("[DISTTX] TODO test overridden and intentionally left blank as they fail.") @Override @Test http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/56917a26/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRColocationDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRColocationDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRColocationDUnitTest.java index 1b8d2d1..f6ee565 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRColocationDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRColocationDUnitTest.java @@ -2388,11 +2388,15 @@ public class PRColocationDUnitTest extends JUnit4CacheTestCase { assertTrue("Region should have failed to close. regionName = " + partitionedRegionName , exceptionThrown); } public static void putCustomerPartitionedRegion(String partitionedRegionName) { + putCustomerPartitionedRegion(partitionedRegionName, 10); + } + + public static void putCustomerPartitionedRegion(String partitionedRegionName, int numOfRecord) { assertNotNull(basicGetCache()); Region partitionedregion = basicGetCache().getRegion(Region.SEPARATOR + partitionedRegionName); assertNotNull(partitionedregion); - for (int i = 1; i <= 10; i++) { + for (int i = 1; i <= numOfRecord; i++) { CustId custid = new CustId(i); Customer customer = new Customer("name" + i, "Address" + i); try { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/56917a26/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java index 516c240..332ec01 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java @@ -25,10 +25,12 @@ import static org.junit.Assert.*; import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase; import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase; import org.apache.geode.test.junit.categories.DistributedTest; +import org.assertj.core.api.Assertions; import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Set; import util.TestException; @@ -45,6 +47,7 @@ import org.apache.geode.cache.execute.FunctionException; import org.apache.geode.cache.execute.FunctionService; import org.apache.geode.cache.util.CacheListenerAdapter; import org.apache.geode.internal.NanoTimer; +import org.apache.geode.internal.cache.ForceReattemptException; import org.apache.geode.internal.cache.PartitionedRegion; import org.apache.geode.internal.cache.TXManagerImpl; import org.apache.geode.internal.cache.execute.data.CustId; @@ -53,10 +56,13 @@ import org.apache.geode.internal.cache.execute.data.Order; import org.apache.geode.internal.cache.execute.data.OrderId; import org.apache.geode.internal.cache.execute.data.Shipment; import org.apache.geode.internal.cache.execute.data.ShipmentId; +import org.apache.geode.internal.cache.partitioned.PRLocallyDestroyedException; +import org.apache.geode.internal.logging.LogService; import org.apache.geode.test.dunit.Assert; import org.apache.geode.test.dunit.Invoke; import org.apache.geode.test.dunit.LogWriterUtils; import org.apache.geode.test.dunit.SerializableCallable; +import org.apache.geode.test.dunit.SerializableRunnable; /** * Test for co-located PR transactions. @@ -316,18 +322,141 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest { } protected void createPRWithCoLocation(String prName, String coLocatedWith) { + setAttributes(prName, coLocatedWith); + createPartitionedRegion(attributeObjects); + } + + protected void setAttributes(String prName, String coLocatedWith) { this.regionName = prName; this.colocatedWith = coLocatedWith; this.isPartitionResolver = new Boolean(true); this.attributeObjects = new Object[] { regionName, redundancy, localMaxmemory, totalNumBuckets, colocatedWith, isPartitionResolver, getEnableConcurrency() }; - createPartitionedRegion(attributeObjects); } protected boolean getEnableConcurrency() { return false; } + /** + * This method executes a transaction with get on non colocated entries and + * expects the transaction to fail with TransactionDataNotColocatedException. + * @param bucketRedundancy redundancy for the colocated PRs + */ + protected void baiscPRTXWithNonColocatedGet(int bucketRedundancy) { + dataStore1.invoke(runGetCache); + dataStore2.invoke(runGetCache); + redundancy = new Integer(bucketRedundancy); + localMaxmemory = new Integer(50); + totalNumBuckets = new Integer(2); + + setAttributes(CustomerPartitionedRegionName, null); + + dataStore1.invoke(PRColocationDUnitTest.class, "createPR", this.attributeObjects); + dataStore2.invoke(PRColocationDUnitTest.class, "createPR", this.attributeObjects); + + // Put the customer 1-2 in CustomerPartitionedRegion + dataStore1.invoke(() -> PRColocationDUnitTest.putCustomerPartitionedRegion(CustomerPartitionedRegionName, 2)); + + dataStore1.invoke(verifyNonColocated); + dataStore2.invoke(verifyNonColocated); + + dataStore1.invoke(getTx); + } + + + @SuppressWarnings("serial") + private SerializableRunnable verifyNonColocated = new SerializableRunnable("verifyNonColocated") { + @Override + public void run() throws PRLocallyDestroyedException, ForceReattemptException { + containsKeyLocally(); + } + }; + + @SuppressWarnings("serial") + private SerializableRunnable getTx = new SerializableRunnable("getTx") { + @Override + public void run() { + performGetTx(); + } + }; + + + @SuppressWarnings({ "unchecked", "rawtypes" }) + private void containsKeyLocally() throws PRLocallyDestroyedException, ForceReattemptException { + PartitionedRegion pr = (PartitionedRegion) basicGetCache().getRegion(Region.SEPARATOR + CustomerPartitionedRegionName); + + CustId cust1 = new CustId(1); + CustId cust2 = new CustId(2); + int bucketId1 = pr.getKeyInfo(cust1).getBucketId(); + int bucketId2 = pr.getKeyInfo(cust2).getBucketId(); + + List<Integer> localPrimaryBucketList = pr.getLocalPrimaryBucketsListTestOnly(); + Set localBucket1Keys; + Set localBucket2Keys; + assertTrue(localPrimaryBucketList.size() == 1); + for (int bucketId: localPrimaryBucketList) { + if (bucketId == bucketId1) { + //primary bucket has cust1 + localBucket1Keys = pr.getDataStore().getKeysLocally(bucketId1, false); + for (Object key: localBucket1Keys) { + LogService.getLogger().info("local key set contains " + key); + } + assertTrue(localBucket1Keys.size() == 1); + } else { + localBucket2Keys = pr.getDataStore().getKeysLocally(bucketId2, false); + for (Object key: localBucket2Keys) { + LogService.getLogger().info("local key set contains " + key); + } + assertTrue(localBucket2Keys.size() == 1); + } + } + } + + @SuppressWarnings("unchecked") + private void performGetTx() { + PartitionedRegion pr = (PartitionedRegion) basicGetCache().getRegion(Region.SEPARATOR + CustomerPartitionedRegionName); + CacheTransactionManager mgr = pr.getCache().getCacheTransactionManager(); + CustId cust1 = new CustId(1); + CustId cust2 = new CustId(2); + int bucketId1 = pr.getKeyInfo(cust1).getBucketId(); + List<Integer> localPrimaryBucketList = pr.getLocalPrimaryBucketsListTestOnly(); + assertTrue(localPrimaryBucketList.size() == 1); + boolean isCust1Local = (Integer)localPrimaryBucketList.get(0) == bucketId1; + + //touch first get on remote node -- using TXStateStub + Assertions.assertThatThrownBy(()-> getTx(!isCust1Local, mgr, pr, cust1, cust2)) + .isInstanceOf(TransactionDataNotColocatedException.class); + + //touch first get on local node-- using TXState + Assertions.assertThatThrownBy(()-> getTx(isCust1Local, mgr, pr, cust1, cust2)) + .isInstanceOf(TransactionDataNotColocatedException.class); + } + + private void getTx(boolean doCust1First, CacheTransactionManager mgr, PartitionedRegion pr, CustId cust1, CustId cust2) { + CustId first = doCust1First ? cust1 : cust2; + CustId second = !doCust1First ? cust1 : cust2; + + mgr.begin(); + boolean doRollback = true; + try { + pr.get(first); + pr.get(second); + doRollback = false; + } finally { + if (doRollback) { + mgr.rollback(); + } else { + mgr.commit(); + } + } + } + + @Test + public void testTxWithNonColocatedGet() { + baiscPRTXWithNonColocatedGet(0); + } + @Test public void testPRTXInCacheListenerRedundancy0() { basicPRTXInCacheListener(0);
