Repository: incubator-geode Updated Branches: refs/heads/develop 21b0adacb -> 9b710ab0a
GEODE-1885: Removed call to check readiness (region check) after the offheap region entry is released. GEODE-1885: Missing subsctiption event with Offheap partitioned region during bucket rebalance. During the trasaction commit on redundant bucket region, if the bucket region is moved, the call-back logic (to deliver subscription events) were not invoked due to check-readiness call with offheap region. The check-readiness throws exception, if the region is not found, which causes the code to return early without sending the subscription events. In this scenario, calling check-readiness is not needed... Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/9b710ab0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/9b710ab0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/9b710ab0 Branch: refs/heads/develop Commit: 9b710ab0af2bc6af2667010c004ad4798b0b8700 Parents: 21b0ada Author: Anil <[email protected]> Authored: Thu Sep 15 13:40:06 2016 -0700 Committer: Anil <[email protected]> Committed: Thu Sep 15 14:25:15 2016 -0700 ---------------------------------------------------------------------- .../internal/cache/AbstractDiskRegionEntry.java | 8 +- .../internal/cache/AbstractRegionEntry.java | 31 ++- .../geode/internal/cache/AbstractRegionMap.java | 1 - .../cache/ClientServerTransactionDUnitTest.java | 222 +++++++++++++++++-- 4 files changed, 231 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9b710ab0/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractDiskRegionEntry.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractDiskRegionEntry.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractDiskRegionEntry.java index 4440417..3f88ed8 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractDiskRegionEntry.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractDiskRegionEntry.java @@ -51,13 +51,9 @@ public abstract class AbstractDiskRegionEntry @Override public void setValueWithContext(RegionEntryContext context, Object value) { _setValue(value); - if (value != null && context != null && (this instanceof OffHeapRegionEntry) - && context instanceof LocalRegion && ((LocalRegion)context).isThisRegionBeingClosedOrDestroyed()) { - ((OffHeapRegionEntry)this).release(); - ((LocalRegion)context).checkReadiness(); - } + releaseOffHeapRefIfRegionBeingClosedOrDestroyed(context, value); } - + // Do not add any instances fields to this class. // Instead add them to the DISK section of LeafRegionEntry.cpp. http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9b710ab0/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionEntry.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionEntry.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionEntry.java index be82bd4..2c82ade 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionEntry.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionEntry.java @@ -434,16 +434,29 @@ public abstract class AbstractRegionEntry implements RegionEntry, @Released protected void setValue(RegionEntryContext context, @Unretained Object value, boolean recentlyUsed) { _setValue(value); - if (value != null && context != null && (this instanceof OffHeapRegionEntry) - && context instanceof LocalRegion && ((LocalRegion)context).isThisRegionBeingClosedOrDestroyed()) { - ((OffHeapRegionEntry)this).release(); - ((LocalRegion)context).checkReadiness(); - } + releaseOffHeapRefIfRegionBeingClosedOrDestroyed(context, value); if (recentlyUsed) { setRecentlyUsed(); } } + public void releaseOffHeapRefIfRegionBeingClosedOrDestroyed( + RegionEntryContext context, Object ref) { + if (isOffHeapReference(ref) && isThisRegionBeingClosedOrDestroyed(context)) { + ((OffHeapRegionEntry)this).release(); + } + } + + private boolean isThisRegionBeingClosedOrDestroyed(RegionEntryContext context) { + return context instanceof LocalRegion + && ((LocalRegion)context).isThisRegionBeingClosedOrDestroyed(); + } + + private boolean isOffHeapReference(Object ref) { + return ref != Token.REMOVED_PHASE1 && this instanceof OffHeapRegionEntry + && ref instanceof StoredObject && ((StoredObject)ref).hasRefCount(); + } + /** * This method determines if the value is in a compressed representation and decompresses it if it is. * @@ -792,11 +805,9 @@ public abstract class AbstractRegionEntry implements RegionEntry, if(isValueNull()) { @Released Object value = getValueOffHeapOrDiskWithoutFaultIn(region); try { - _setValue(prepareValueForCache(region, value, false)); - if (value != null && region != null && (this instanceof OffHeapRegionEntry) && region.isThisRegionBeingClosedOrDestroyed()) { - ((OffHeapRegionEntry)this).release(); - region.checkReadiness(); - } + Object preparedValue = prepareValueForCache(region, value, false); + _setValue(preparedValue); + releaseOffHeapRefIfRegionBeingClosedOrDestroyed(region, preparedValue); } finally { OffHeapHelper.release(value); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9b710ab0/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java index 738fef1..33e98b6 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java @@ -253,7 +253,6 @@ public abstract class AbstractRegionMap implements RegionMap { if (_getMap().remove(key, re)) { ((OffHeapRegionEntry)re).release(); } - _getOwner().checkReadiness(); // throw RegionDestroyedException } return value; } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9b710ab0/geode-core/src/test/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java index e7866c5..b72c595 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java @@ -75,6 +75,9 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest protected static final int MAX_ENTRIES = 10; + private enum forop { + CREATE, UPDATE, DESTROY + }; protected static final String OTHER_REGION = "OtherRegion"; @@ -226,7 +229,81 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest return clientVM; } - + private void configureOffheapSystemProperty() { + Properties p = new Properties(); + //p.setProperty(LOG_LEVEL, "finer"); + p.setProperty(OFF_HEAP_MEMORY_SIZE, "1m"); + this.getSystem(p); + } + + private void createSubscriptionRegion(boolean isOffHeap, String regionName, + int copies, int totalBuckets) { + PartitionAttributesFactory paf = new PartitionAttributesFactory(); + paf.setRedundantCopies(copies).setTotalNumBuckets(totalBuckets); + AttributesFactory attr = new AttributesFactory(); + attr.setPartitionAttributes(paf.create()); + attr.setConcurrencyChecksEnabled(true); + if (isOffHeap) { + attr.setOffHeap(isOffHeap); + } + Region offheapRegion = getCache().createRegion(regionName, attr.create()); + assertNotNull(offheapRegion); + } + + private void createClient(int port, String regionName) throws Exception { + System.setProperty(DistributionConfig.GEMFIRE_PREFIX + + "bridge.disableShufflingOfEndpoints", "true"); + ClientCacheFactory ccf = new ClientCacheFactory(); + ccf.addPoolServer("localhost"/* getServerHostName(Host.getHost(0)) */, port); + ccf.setPoolMinConnections(0); + ccf.setPoolSubscriptionEnabled(true); + ccf.setPoolSubscriptionRedundancy(0); + ccf.set(LOG_LEVEL, getDUnitLogLevel()); + ClientCache cCache = getClientCache(ccf); + Region r = cCache + .createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY) + .addCacheListener(new ClientCacheListener()).create(regionName); + r.registerInterestRegex(".*"); + } + + class ClientCacheListener extends CacheListenerAdapter { + private int eventCount; + + @Override + public void afterCreate(EntryEvent event) { + onEvent(event); + } + + @Override + public void afterUpdate(EntryEvent event) { + onEvent(event); + } + + @Override + public void afterDestroy(EntryEvent event) { + onEvent(event); + } + + private void onEvent(EntryEvent event) { + this.eventCount++; + } + + public int getEventCount() { + return this.eventCount; + } + }; + + private int getClientCacheListnerEventCount(String regionName) { + Region r = getCache().getRegion(regionName); + CacheListener<?, ?>[] listeners = r.getAttributes().getCacheListeners(); + for (CacheListener<?, ?> listener : listeners) { + if (listener instanceof ClientCacheListener) { + return ((ClientCacheListener)listener).getEventCount(); + } + } + return 0; + } + @Test public void testTwoPoolsNotAllowed() { @@ -243,15 +320,14 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest setCCF(port1, ccf); ClientCache cCache = getClientCache(ccf); - - + ClientRegionFactory<CustId, Customer> custrf = cCache .createClientRegionFactory(cachingProxy ? ClientRegionShortcut.CACHING_PROXY : ClientRegionShortcut.PROXY); ClientRegionFactory<Integer, String> refrf = cCache .createClientRegionFactory(cachingProxy ? ClientRegionShortcut.CACHING_PROXY : ClientRegionShortcut.PROXY); Region<Integer, String> r = refrf.create(D_REFERENCE); Region<CustId, Customer> pr = custrf.create(CUSTOMER); - + // set up a second pool for the other distributed system's region final int port2 = createRegionOnDisconnectedServer(datastore2, true); PoolFactory pf = PoolManager.createFactory(); @@ -259,7 +335,7 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest pf.create("otherServer"); ClientRegionFactory otherrf = cCache - .createClientRegionFactory(cachingProxy? ClientRegionShortcut.CACHING_PROXY : ClientRegionShortcut.PROXY); + .createClientRegionFactory(cachingProxy ? ClientRegionShortcut.CACHING_PROXY : ClientRegionShortcut.PROXY); otherrf.setPoolName("otherServer"); Region<Object, Object> otherRegion = otherrf.create(OTHER_REGION); @@ -272,23 +348,22 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest } catch (TransactionException expected) { exceptionThrown = true; } - + SerializableCallable disconnect = new SerializableCallable("disconnect") { public Object call() throws Exception { InternalDistributedSystem.getConnectedInstance().disconnect(); return null; } }; - + cCache.close(); datastore1.invoke(disconnect); datastore2.invoke(disconnect); - + if (!exceptionThrown) { fail("expected TransactionException to be thrown since two pools were used"); } } - @Test public void testCleanupAfterClientFailure() { @@ -296,7 +371,7 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest VM accessor = host.getVM(0); VM datastore = host.getVM(1); final boolean cachingProxy = false; - + disconnectAllFromDS(); // some other VMs seem to be hanging around and have the region this tests uses final int port1 = createRegionsAndStartServerWithTimeout(accessor, true, 5); @@ -3045,6 +3120,7 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest public void testClientCommitFunctionWithFailure() { doFunctionWithFailureWork(true); } + @Test public void testRollbackFunctionWithFailure() { doFunctionWithFailureWork(false); @@ -3122,6 +3198,7 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest public void testCommitFunctionFromPeer() { doTestFunctionFromPeer(true); } + @Test public void testRollbackFunctionFromPeer() { doTestFunctionFromPeer(false); @@ -3638,18 +3715,19 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest for (CacheListener listener : listeners) { if (listener instanceof ClientListener) { foundListener = true; - final ClientListener clientListener = (ClientListener) listener; + final ClientListener clientListener = (ClientListener)listener; WaitCriterion wc = new WaitCriterion() { @Override public boolean done() { return clientListener.keys.containsAll(keys); } + @Override public String description() { - return "expected:"+keys+" found:"+clientListener.keys; + return "expected:" + keys + " found:" + clientListener.keys; } }; - Wait.waitForCriterion(wc, 30*1000, 500, true); + Wait.waitForCriterion(wc, 30 * 1000, 500, true); } } assertTrue(foundListener); @@ -3658,6 +3736,122 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest }); } + @Test + public void testCreateSubscriptionEventsWithPrWhioleBucketRegionIsDestroyed() { + testSubscriptionEventsWhenBucketRegionIsDestroyed(false, forop.CREATE); + } + + @Test + public void testDestroySubscriptionEventsWithPrWhileBucketRegionIsDestroyed() { + testSubscriptionEventsWhenBucketRegionIsDestroyed(false, forop.DESTROY); + } + + @Test + public void testCreateSubscriptionEventsWithOffheapPrWhioleBucketRegionIsDestroyed() { + testSubscriptionEventsWhenBucketRegionIsDestroyed(true, forop.CREATE); + } + + @Test + public void testDestroySubscriptionEventsWithOffheapPrWhileBucketRegionIsDestroyed() { + testSubscriptionEventsWhenBucketRegionIsDestroyed(true, forop.DESTROY); + } + + private void testSubscriptionEventsWhenBucketRegionIsDestroyed(boolean offheap, forop op) { + int copies = 1; + int totalBuckets = 1; + + Host host = Host.getHost(0); + VM server1 = host.getVM(0); + VM server2 = host.getVM(1); + + VM client1 = host.getVM(2); + VM client2 = host.getVM(3); + + final String regionName = "SubscriptionPr"; + + server1.invoke(() -> { + configureOffheapSystemProperty(); + }); + server2.invoke(() -> { + configureOffheapSystemProperty(); + }); + + final int port1 = createRegionsAndStartServer(server1, false); + // Create PR + server1.invoke(() -> { + createSubscriptionRegion(offheap, regionName, copies, totalBuckets); + Region r = getCache().getRegion(regionName); + r.put("KEY-1", "VALUE-1"); + r.put("KEY-2", "VALUE-2"); + }); + + final int port2 = createRegionsAndStartServer(server2, false); + + // Create PR + server2.invoke(() -> { + createSubscriptionRegion(offheap, regionName, copies, totalBuckets); + Region r = getCache().getRegion(regionName); + + Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> { + List<Integer> ids = ((PartitionedRegion)r).getLocalBucketsListTestOnly(); + assertFalse(ids.isEmpty()); + }); + }); + + // Create client 1 + client1.invoke(() -> { + createClient(port1, regionName); + }); + + // Create client 2 + client2.invoke(() -> { + createClient(port2, regionName); + }); + + // Destroy secondary bucket region. This simulates bucket re-balance. + server2.invoke(() -> { + BucketRegion br = ((PartitionedRegion)getCache().getRegion(regionName)) + .getBucketRegion("KEY-1"); + AbstractRegionMap arm = (AbstractRegionMap)((LocalRegion)br).entries; + arm.setARMLockTestHook(new ARMLockTestHookAdapter() { + @Override + public void beforeLock(LocalRegion owner, CacheEvent event) { + List<Integer> ids = ((PartitionedRegion)getCache().getRegion( + regionName)).getLocalBucketsListTestOnly(); + assertFalse(ids.isEmpty()); + br.localDestroyRegion(); + } + }); + }); + + server1.invoke(() -> { + Cache cache = getCache(); + Region r = cache.getRegion(regionName); + CacheTransactionManager mgr = cache.getCacheTransactionManager(); + mgr.begin(); + if (op == forop.CREATE) { + r.create("KEY-3", "VALUE-3"); + } + else if (op == forop.UPDATE) { + r.put("KEY-1", "VALUE-1_2"); + } + else if (op == forop.DESTROY) { + r.destroy("KEY-2"); + } + mgr.commit(); + }); + + client1.invoke(() -> { + Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> + (assertEquals(1, getClientCacheListnerEventCount(regionName)))); + }); + + client2.invoke(() -> { + Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> + (assertEquals(1, getClientCacheListnerEventCount(regionName)))); + }); + } + Object verifyTXStateExpired(final DistributedMember myId, final TXManagerImpl txmgr) { try { Wait.waitForCriterion(new WaitCriterion() { @@ -3675,7 +3869,7 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest getGemfireCache().getDistributedSystem().disconnect(); } } - + Object verifyProxyServerChanged(final TXStateProxyImpl tx, final DistributedMember newProxy) { try { Awaitility.await().pollInterval(10, TimeUnit.MILLISECONDS).pollDelay(10, TimeUnit.MILLISECONDS)
