Repository: incubator-geode
Updated Branches:
  refs/heads/develop 21b0adacb -> 9b710ab0a


GEODE-1885: Removed call to check readiness (region check) after the offheap 
region entry is released.

GEODE-1885: Missing subsctiption event with Offheap partitioned region during 
bucket rebalance.

During the trasaction commit on redundant bucket region, if the bucket region 
is moved, the call-back logic (to deliver subscription events) were not invoked 
due to check-readiness call with offheap region. The check-readiness throws 
exception, if the region is not found, which causes the code to return early 
without sending the subscription events.

In this scenario, calling check-readiness is not needed...


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/9b710ab0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/9b710ab0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/9b710ab0

Branch: refs/heads/develop
Commit: 9b710ab0af2bc6af2667010c004ad4798b0b8700
Parents: 21b0ada
Author: Anil <[email protected]>
Authored: Thu Sep 15 13:40:06 2016 -0700
Committer: Anil <[email protected]>
Committed: Thu Sep 15 14:25:15 2016 -0700

----------------------------------------------------------------------
 .../internal/cache/AbstractDiskRegionEntry.java |   8 +-
 .../internal/cache/AbstractRegionEntry.java     |  31 ++-
 .../geode/internal/cache/AbstractRegionMap.java |   1 -
 .../cache/ClientServerTransactionDUnitTest.java | 222 +++++++++++++++++--
 4 files changed, 231 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9b710ab0/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractDiskRegionEntry.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractDiskRegionEntry.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractDiskRegionEntry.java
index 4440417..3f88ed8 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractDiskRegionEntry.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractDiskRegionEntry.java
@@ -51,13 +51,9 @@ public abstract class AbstractDiskRegionEntry
   @Override
   public void setValueWithContext(RegionEntryContext context, Object value) {
     _setValue(value);
-    if (value != null && context != null && (this instanceof 
OffHeapRegionEntry) 
-        && context instanceof LocalRegion && 
((LocalRegion)context).isThisRegionBeingClosedOrDestroyed()) {
-      ((OffHeapRegionEntry)this).release();
-      ((LocalRegion)context).checkReadiness();
-    }
+    releaseOffHeapRefIfRegionBeingClosedOrDestroyed(context, value);
   }
-  
+
   // Do not add any instances fields to this class.
   // Instead add them to the DISK section of LeafRegionEntry.cpp.
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9b710ab0/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionEntry.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionEntry.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionEntry.java
index be82bd4..2c82ade 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionEntry.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionEntry.java
@@ -434,16 +434,29 @@ public abstract class AbstractRegionEntry implements 
RegionEntry,
   @Released
   protected void setValue(RegionEntryContext context, @Unretained Object 
value, boolean recentlyUsed) {
     _setValue(value);
-    if (value != null && context != null && (this instanceof 
OffHeapRegionEntry) 
-        && context instanceof LocalRegion && 
((LocalRegion)context).isThisRegionBeingClosedOrDestroyed()) {
-      ((OffHeapRegionEntry)this).release();
-      ((LocalRegion)context).checkReadiness();
-    }
+    releaseOffHeapRefIfRegionBeingClosedOrDestroyed(context, value);
     if (recentlyUsed) {
       setRecentlyUsed();
     }
   }
 
+  public void releaseOffHeapRefIfRegionBeingClosedOrDestroyed(
+      RegionEntryContext context, Object ref) {
+    if (isOffHeapReference(ref) && 
isThisRegionBeingClosedOrDestroyed(context)) {
+      ((OffHeapRegionEntry)this).release();
+    }
+  }
+
+  private boolean isThisRegionBeingClosedOrDestroyed(RegionEntryContext 
context) {
+    return context instanceof LocalRegion
+        && ((LocalRegion)context).isThisRegionBeingClosedOrDestroyed();
+  }
+
+  private boolean isOffHeapReference(Object ref) {
+    return ref != Token.REMOVED_PHASE1 && this instanceof OffHeapRegionEntry
+        && ref instanceof StoredObject && ((StoredObject)ref).hasRefCount();
+  }
+
   /**
    * This method determines if the value is in a compressed representation and 
decompresses it if it is.
    *
@@ -792,11 +805,9 @@ public abstract class AbstractRegionEntry implements 
RegionEntry,
             if(isValueNull()) {
               @Released Object value = 
getValueOffHeapOrDiskWithoutFaultIn(region);
               try {
-              _setValue(prepareValueForCache(region, value, false));
-              if (value != null && region != null && (this instanceof 
OffHeapRegionEntry) && region.isThisRegionBeingClosedOrDestroyed()) {
-                ((OffHeapRegionEntry)this).release();
-                region.checkReadiness();
-              }
+                Object preparedValue = prepareValueForCache(region, value, 
false);
+                _setValue(preparedValue);
+                releaseOffHeapRefIfRegionBeingClosedOrDestroyed(region, 
preparedValue);
               } finally {
                 OffHeapHelper.release(value);
               }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9b710ab0/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
index 738fef1..33e98b6 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
@@ -253,7 +253,6 @@ public abstract class AbstractRegionMap implements 
RegionMap {
       if (_getMap().remove(key, re)) {
         ((OffHeapRegionEntry)re).release();
       }
-      _getOwner().checkReadiness(); // throw RegionDestroyedException
     }
     return value;
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9b710ab0/geode-core/src/test/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java
index e7866c5..b72c595 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java
@@ -75,6 +75,9 @@ public class ClientServerTransactionDUnitTest extends 
RemoteTransactionDUnitTest
 
   protected static final int MAX_ENTRIES = 10;
 
+  private enum forop {
+    CREATE, UPDATE, DESTROY
+  };
 
   protected static final String OTHER_REGION = "OtherRegion";
 
@@ -226,7 +229,81 @@ public class ClientServerTransactionDUnitTest extends 
RemoteTransactionDUnitTest
     return clientVM;
   }
   
-  
+  private void configureOffheapSystemProperty() {
+    Properties p = new Properties();
+    //p.setProperty(LOG_LEVEL, "finer");
+    p.setProperty(OFF_HEAP_MEMORY_SIZE, "1m");
+    this.getSystem(p);
+  }
+
+  private void createSubscriptionRegion(boolean isOffHeap, String regionName,
+      int copies, int totalBuckets) {
+    PartitionAttributesFactory paf = new PartitionAttributesFactory();
+    paf.setRedundantCopies(copies).setTotalNumBuckets(totalBuckets);
+    AttributesFactory attr = new AttributesFactory();
+    attr.setPartitionAttributes(paf.create());
+    attr.setConcurrencyChecksEnabled(true);
+    if (isOffHeap) {
+      attr.setOffHeap(isOffHeap);
+    }
+    Region offheapRegion = getCache().createRegion(regionName, attr.create());
+    assertNotNull(offheapRegion);
+  }
+
+  private void createClient(int port, String regionName) throws Exception {
+    System.setProperty(DistributionConfig.GEMFIRE_PREFIX
+        + "bridge.disableShufflingOfEndpoints", "true");
+    ClientCacheFactory ccf = new ClientCacheFactory();
+    ccf.addPoolServer("localhost"/* getServerHostName(Host.getHost(0)) */, 
port);
+    ccf.setPoolMinConnections(0);
+    ccf.setPoolSubscriptionEnabled(true);
+    ccf.setPoolSubscriptionRedundancy(0);
+    ccf.set(LOG_LEVEL, getDUnitLogLevel());
+    ClientCache cCache = getClientCache(ccf);
+    Region r = cCache
+        .createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY)
+        .addCacheListener(new ClientCacheListener()).create(regionName);
+    r.registerInterestRegex(".*");
+  }
+
+  class ClientCacheListener extends CacheListenerAdapter {
+    private int eventCount;
+
+    @Override
+    public void afterCreate(EntryEvent event) {
+      onEvent(event);
+    }
+
+    @Override
+    public void afterUpdate(EntryEvent event) {
+      onEvent(event);
+    }
+
+    @Override
+    public void afterDestroy(EntryEvent event) {
+      onEvent(event);
+    }
+
+    private void onEvent(EntryEvent event) {
+      this.eventCount++;
+    }
+
+    public int getEventCount() {
+      return this.eventCount;
+    }
+  };
+
+  private int getClientCacheListnerEventCount(String regionName) {
+    Region r = getCache().getRegion(regionName);
+    CacheListener<?, ?>[] listeners = r.getAttributes().getCacheListeners();
+    for (CacheListener<?, ?> listener : listeners) {
+      if (listener instanceof ClientCacheListener) {
+        return ((ClientCacheListener)listener).getEventCount();
+      }
+    }
+    return 0;
+  }
+
 
   @Test
   public void testTwoPoolsNotAllowed() {
@@ -243,15 +320,14 @@ public class ClientServerTransactionDUnitTest extends 
RemoteTransactionDUnitTest
     setCCF(port1, ccf);
 
     ClientCache cCache = getClientCache(ccf);
-    
-    
+
     ClientRegionFactory<CustId, Customer> custrf = cCache
       .createClientRegionFactory(cachingProxy ? 
ClientRegionShortcut.CACHING_PROXY : ClientRegionShortcut.PROXY);
     ClientRegionFactory<Integer, String> refrf = cCache
       .createClientRegionFactory(cachingProxy ? 
ClientRegionShortcut.CACHING_PROXY : ClientRegionShortcut.PROXY);
     Region<Integer, String> r = refrf.create(D_REFERENCE);
     Region<CustId, Customer> pr = custrf.create(CUSTOMER);
-    
+
     // set up a second pool for the other distributed system's region
     final int port2 = createRegionOnDisconnectedServer(datastore2, true);
     PoolFactory pf = PoolManager.createFactory();
@@ -259,7 +335,7 @@ public class ClientServerTransactionDUnitTest extends 
RemoteTransactionDUnitTest
     pf.create("otherServer");
 
     ClientRegionFactory otherrf = cCache
-      .createClientRegionFactory(cachingProxy? 
ClientRegionShortcut.CACHING_PROXY : ClientRegionShortcut.PROXY);
+      .createClientRegionFactory(cachingProxy ? 
ClientRegionShortcut.CACHING_PROXY : ClientRegionShortcut.PROXY);
     otherrf.setPoolName("otherServer");
     Region<Object, Object> otherRegion = otherrf.create(OTHER_REGION);
 
@@ -272,23 +348,22 @@ public class ClientServerTransactionDUnitTest extends 
RemoteTransactionDUnitTest
     } catch (TransactionException expected) {
       exceptionThrown = true;
     }
-    
+
     SerializableCallable disconnect = new SerializableCallable("disconnect") {
       public Object call() throws Exception {
         InternalDistributedSystem.getConnectedInstance().disconnect();
         return null;
       }
     };
-    
+
     cCache.close();
     datastore1.invoke(disconnect);
     datastore2.invoke(disconnect);
-    
+
     if (!exceptionThrown) {
       fail("expected TransactionException to be thrown since two pools were 
used");
     }
   }
-  
 
   @Test
   public void testCleanupAfterClientFailure() {
@@ -296,7 +371,7 @@ public class ClientServerTransactionDUnitTest extends 
RemoteTransactionDUnitTest
     VM accessor = host.getVM(0);
     VM datastore = host.getVM(1);
     final boolean cachingProxy = false;
-    
+
     disconnectAllFromDS(); // some other VMs seem to be hanging around and 
have the region this tests uses
 
     final int port1 = createRegionsAndStartServerWithTimeout(accessor, true, 
5);
@@ -3045,6 +3120,7 @@ public class ClientServerTransactionDUnitTest extends 
RemoteTransactionDUnitTest
   public void testClientCommitFunctionWithFailure() {
     doFunctionWithFailureWork(true);
   }
+
   @Test
   public void testRollbackFunctionWithFailure() {
     doFunctionWithFailureWork(false);
@@ -3122,6 +3198,7 @@ public class ClientServerTransactionDUnitTest extends 
RemoteTransactionDUnitTest
   public void testCommitFunctionFromPeer() {
     doTestFunctionFromPeer(true);
   }
+
   @Test
   public void testRollbackFunctionFromPeer() {
     doTestFunctionFromPeer(false);
@@ -3638,18 +3715,19 @@ public class ClientServerTransactionDUnitTest extends 
RemoteTransactionDUnitTest
         for (CacheListener listener : listeners) {
           if (listener instanceof ClientListener) {
             foundListener = true;
-            final ClientListener clientListener = (ClientListener) listener;
+            final ClientListener clientListener = (ClientListener)listener;
             WaitCriterion wc = new WaitCriterion() {
               @Override
               public boolean done() {
                 return clientListener.keys.containsAll(keys);
               }
+
               @Override
               public String description() {
-                return "expected:"+keys+" found:"+clientListener.keys;
+                return "expected:" + keys + " found:" + clientListener.keys;
               }
             };
-            Wait.waitForCriterion(wc, 30*1000, 500, true);
+            Wait.waitForCriterion(wc, 30 * 1000, 500, true);
           }
         }
         assertTrue(foundListener);
@@ -3658,6 +3736,122 @@ public class ClientServerTransactionDUnitTest extends 
RemoteTransactionDUnitTest
     });
   }
 
+  @Test
+  public void 
testCreateSubscriptionEventsWithPrWhioleBucketRegionIsDestroyed() {
+    testSubscriptionEventsWhenBucketRegionIsDestroyed(false, forop.CREATE);
+  }
+
+  @Test
+  public void 
testDestroySubscriptionEventsWithPrWhileBucketRegionIsDestroyed() {
+    testSubscriptionEventsWhenBucketRegionIsDestroyed(false, forop.DESTROY);
+  }
+
+  @Test
+  public void 
testCreateSubscriptionEventsWithOffheapPrWhioleBucketRegionIsDestroyed() {
+    testSubscriptionEventsWhenBucketRegionIsDestroyed(true, forop.CREATE);
+  }
+
+  @Test
+  public void 
testDestroySubscriptionEventsWithOffheapPrWhileBucketRegionIsDestroyed() {
+    testSubscriptionEventsWhenBucketRegionIsDestroyed(true, forop.DESTROY);
+  }
+
+  private void testSubscriptionEventsWhenBucketRegionIsDestroyed(boolean 
offheap, forop op) {
+    int copies = 1;
+    int totalBuckets = 1;
+
+    Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    VM server2 = host.getVM(1);
+
+    VM client1 = host.getVM(2);
+    VM client2 = host.getVM(3);
+
+    final String regionName = "SubscriptionPr";
+
+    server1.invoke(() -> {
+      configureOffheapSystemProperty();
+    });
+    server2.invoke(() -> {
+      configureOffheapSystemProperty();
+    });
+
+    final int port1 = createRegionsAndStartServer(server1, false);
+    // Create PR
+    server1.invoke(() -> {
+      createSubscriptionRegion(offheap, regionName, copies, totalBuckets);
+      Region r = getCache().getRegion(regionName);
+      r.put("KEY-1", "VALUE-1");
+      r.put("KEY-2", "VALUE-2");
+    });
+
+    final int port2 = createRegionsAndStartServer(server2, false);
+
+    // Create PR
+    server2.invoke(() -> {
+      createSubscriptionRegion(offheap, regionName, copies, totalBuckets);
+      Region r = getCache().getRegion(regionName);
+
+      Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> {
+        List<Integer> ids = 
((PartitionedRegion)r).getLocalBucketsListTestOnly();
+        assertFalse(ids.isEmpty());
+      });
+    });
+
+    // Create client 1
+    client1.invoke(() -> {
+      createClient(port1, regionName);
+    });
+
+    // Create client 2
+    client2.invoke(() -> {
+      createClient(port2, regionName);
+    });
+
+    // Destroy secondary bucket region. This simulates bucket re-balance.
+    server2.invoke(() -> {
+      BucketRegion br = ((PartitionedRegion)getCache().getRegion(regionName))
+          .getBucketRegion("KEY-1");
+      AbstractRegionMap arm = (AbstractRegionMap)((LocalRegion)br).entries;
+      arm.setARMLockTestHook(new ARMLockTestHookAdapter() {
+        @Override
+        public void beforeLock(LocalRegion owner, CacheEvent event) {
+          List<Integer> ids = ((PartitionedRegion)getCache().getRegion(
+              regionName)).getLocalBucketsListTestOnly();
+          assertFalse(ids.isEmpty());
+          br.localDestroyRegion();
+        }
+      });
+    });
+
+    server1.invoke(() -> {
+      Cache cache = getCache();
+      Region r = cache.getRegion(regionName);
+      CacheTransactionManager mgr = cache.getCacheTransactionManager();
+      mgr.begin();
+      if (op == forop.CREATE) {
+        r.create("KEY-3", "VALUE-3");
+      }
+      else if (op == forop.UPDATE) {
+        r.put("KEY-1", "VALUE-1_2");
+      }
+      else if (op == forop.DESTROY) {
+        r.destroy("KEY-2");
+      }
+      mgr.commit();
+    });
+
+    client1.invoke(() -> {
+      Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() ->
+      (assertEquals(1, getClientCacheListnerEventCount(regionName))));
+    });
+
+    client2.invoke(() -> {
+      Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() ->
+      (assertEquals(1, getClientCacheListnerEventCount(regionName))));
+    });
+  }
+
   Object verifyTXStateExpired(final DistributedMember myId, final 
TXManagerImpl txmgr) {
     try {
       Wait.waitForCriterion(new WaitCriterion() {
@@ -3675,7 +3869,7 @@ public class ClientServerTransactionDUnitTest extends 
RemoteTransactionDUnitTest
       getGemfireCache().getDistributedSystem().disconnect();
     }
   }
-  
+
   Object verifyProxyServerChanged(final TXStateProxyImpl tx, final 
DistributedMember newProxy) {
     try {
       Awaitility.await().pollInterval(10, TimeUnit.MILLISECONDS).pollDelay(10, 
TimeUnit.MILLISECONDS)

Reply via email to