GEODE-2547: Interest registration no longer causes a CacheLoader to be invoked
Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/1a36d36e Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/1a36d36e Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/1a36d36e Branch: refs/heads/GEODE-4160-mockito Commit: 1a36d36ec90d91094689cc3cb30c21be9b25276b Parents: fb1fdf9 Author: Barry Oglesby <[email protected]> Authored: Tue Feb 28 13:43:50 2017 -0800 Committer: Barry Oglesby <[email protected]> Committed: Tue Feb 28 15:55:38 2017 -0800 ---------------------------------------------------------------------- .../java/org/apache/geode/cache/Operation.java | 16 ++++ .../geode/internal/cache/DistributedRegion.java | 30 ++++-- .../org/apache/geode/internal/cache/OpType.java | 2 + .../cache/tier/sockets/BaseCommand.java | 18 ++-- .../tier/sockets/InterestListDUnitTest.java | 99 ++++++++++++++++++++ 5 files changed, 148 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/1a36d36e/geode-core/src/main/java/org/apache/geode/cache/Operation.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/cache/Operation.java b/geode-core/src/main/java/org/apache/geode/cache/Operation.java index 9b2227b..d835b6c 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/Operation.java +++ b/geode-core/src/main/java/org/apache/geode/cache/Operation.java @@ -49,6 +49,7 @@ public final class Operation implements java.io.Serializable { private static final byte OP_TYPE_CLEAR = OpType.CLEAR; private static final byte OP_TYPE_MARKER = OpType.MARKER; private static final byte OP_TYPE_UPDATE_VERSION = OpType.UPDATE_ENTRY_VERSION; + private static final byte OP_TYPE_GET_FOR_REGISTER_INTEREST = OpType.GET_FOR_REGISTER_INTEREST; private static final int OP_DETAILS_NONE = 0; private static final int OP_DETAILS_SEARCH = 1; @@ -531,6 +532,14 @@ public final class Operation implements java.io.Serializable { false, // isRegion OP_TYPE_DESTROY, OP_DETAILS_REMOVEALL); + /** + * A 'get for register interest' operation. + */ + public static final Operation GET_FOR_REGISTER_INTEREST = + new Operation("GET_FOR_REGISTER_INTEREST", false, // isLocal + false, // isRegion + OP_TYPE_GET_FOR_REGISTER_INTEREST, OP_DETAILS_NONE); + /** The name of this mirror type. */ private final transient String name; @@ -636,6 +645,13 @@ public final class Operation implements java.io.Serializable { } /** + * Returns true if this operation is a get for register interest. + */ + public boolean isGetForRegisterInterest() { + return this.opType == OP_TYPE_GET_FOR_REGISTER_INTEREST; + } + + /** * Returns true if the operation invalidated an entry. */ public boolean isInvalidate() { http://git-wip-us.apache.org/repos/asf/geode/blob/1a36d36e/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java index cc6ccf7..b9cdfd7 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java @@ -2302,17 +2302,27 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA if (requestingClient != null) { event.setContext(requestingClient); } - SearchLoadAndWriteProcessor processor = SearchLoadAndWriteProcessor.getProcessor(); - try { - processor.initialize(this, key, aCallbackArgument); - // processor fills in event - processor.doSearchAndLoad(event, txState, localValue); - if (clientEvent != null && clientEvent.getVersionTag() == null) { - clientEvent.setVersionTag(event.getVersionTag()); + // If this event is because of a register interest call, don't invoke the CacheLoader + boolean getForRegisterInterest = clientEvent != null && clientEvent.getOperation() != null + && clientEvent.getOperation().isGetForRegisterInterest(); + if (!getForRegisterInterest) { + SearchLoadAndWriteProcessor processor = SearchLoadAndWriteProcessor.getProcessor(); + try { + processor.initialize(this, key, aCallbackArgument); + // processor fills in event + processor.doSearchAndLoad(event, txState, localValue); + if (clientEvent != null && clientEvent.getVersionTag() == null) { + clientEvent.setVersionTag(event.getVersionTag()); + } + lastModified = processor.getLastModified(); + } finally { + processor.release(); + } + } else { + if (logger.isDebugEnabled()) { + logger.debug("DistributedRegion.findObjectInSystem skipping loader for region=" + + getFullPath() + "; key=" + key); } - lastModified = processor.getLastModified(); - } finally { - processor.release(); } } if (event.hasNewValue() && !isMemoryThresholdReachedForLoad()) { http://git-wip-us.apache.org/repos/asf/geode/blob/1a36d36e/geode-core/src/main/java/org/apache/geode/internal/cache/OpType.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/OpType.java b/geode-core/src/main/java/org/apache/geode/internal/cache/OpType.java index 7685988..ff36a57 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/OpType.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/OpType.java @@ -49,6 +49,8 @@ public final class OpType { public static final byte UPDATE_ENTRY_VERSION = 11; + public static final byte GET_FOR_REGISTER_INTEREST = 12; + public static final byte CLEAR = 16; public static final byte MARKER = 32; http://git-wip-us.apache.org/repos/asf/geode/blob/1a36d36e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java index 5379605..d217672 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java @@ -1074,7 +1074,7 @@ public abstract class BaseCommand implements Command { if (region != null) { if (region.containsKey(entryKey) || region.containsTombstone(entryKey)) { - VersionTagHolder versionHolder = new VersionTagHolder(); + VersionTagHolder versionHolder = createVersionTagHolder(); ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID(); // From Get70.getValueAndIsObject() Object data = region.get(entryKey, null, true, true, true, id, versionHolder, true); @@ -1161,7 +1161,7 @@ public abstract class BaseCommand implements Command { } for (Object key : region.keySet(true)) { - VersionTagHolder versionHolder = new VersionTagHolder(); + VersionTagHolder versionHolder = createVersionTagHolder(); if (keyPattern != null) { if (!(key instanceof String)) { // key is not a String, cannot apply regex to this entry @@ -1263,12 +1263,10 @@ public abstract class BaseCommand implements Command { public static void appendNewRegisterInterestResponseChunkFromLocal(LocalRegion region, VersionedObjectList values, Object riKeys, Set keySet, ServerConnection servConn) throws IOException { - Object key = null; - VersionTagHolder versionHolder = null; ClientProxyMembershipID requestingClient = servConn == null ? null : servConn.getProxyID(); for (Iterator it = keySet.iterator(); it.hasNext();) { - key = it.next(); - versionHolder = new VersionTagHolder(); + Object key = it.next(); + VersionTagHolder versionHolder = createVersionTagHolder(); Object value = region.get(key, null, true, true, true, requestingClient, versionHolder, true); @@ -1454,7 +1452,7 @@ public abstract class BaseCommand implements Command { for (Iterator it = keyList.iterator(); it.hasNext();) { Object key = it.next(); if (region.containsKey(key) || region.containsTombstone(key)) { - VersionTagHolder versionHolder = new VersionTagHolder(); + VersionTagHolder versionHolder = createVersionTagHolder(); ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID(); data = region.get(key, null, true, true, true, id, versionHolder, true); @@ -1475,6 +1473,12 @@ public abstract class BaseCommand implements Command { sendNewRegisterInterestResponseChunk(region, keyList, values, true, servConn); } + private static VersionTagHolder createVersionTagHolder() { + VersionTagHolder versionHolder = new VersionTagHolder(); + versionHolder.setOperation(Operation.GET_FOR_REGISTER_INTEREST); + return versionHolder; + } + /** * Append an interest response * http://git-wip-us.apache.org/repos/asf/geode/blob/1a36d36e/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/InterestListDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/InterestListDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/InterestListDUnitTest.java index e5fb5fc..d8164f1 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/InterestListDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/InterestListDUnitTest.java @@ -479,6 +479,80 @@ public class InterestListDUnitTest extends JUnit4DistributedTestCase { vm1.invoke(() -> InterestListDUnitTest.confirmNoCacheListenerInvalidates()); } + @Test + public void testRegisterInterestSingleKeyWithDestroyOnReplicatedRegionWithCacheLoader() { + List keysToDestroy = new ArrayList(); + keysToDestroy.add("0"); + runRegisterInterestWithDestroyAndCacheLoaderTest(true, keysToDestroy, keysToDestroy); + } + + @Test + public void testRegisterInterestSingleKeyWithDestroyOnPartitionedRegionWithCacheLoader() { + List keysToDestroy = new ArrayList(); + keysToDestroy.add("0"); + runRegisterInterestWithDestroyAndCacheLoaderTest(false, keysToDestroy, keysToDestroy); + } + + @Test + public void testRegisterInterestListOfKeysWithDestroyOnReplicatedRegionWithCacheLoader() { + List keysToDestroy = new ArrayList(); + for (int i = 0; i < 5; i++) { + keysToDestroy.add(String.valueOf(i)); + } + runRegisterInterestWithDestroyAndCacheLoaderTest(true, keysToDestroy, keysToDestroy); + } + + @Test + public void testRegisterInterestListOfKeysWithDestroyOnPartitionedRegionWithCacheLoader() { + List keysToDestroy = new ArrayList(); + for (int i = 0; i < 5; i++) { + keysToDestroy.add(String.valueOf(i)); + } + runRegisterInterestWithDestroyAndCacheLoaderTest(false, keysToDestroy, keysToDestroy); + } + + @Test + public void testRegisterInterestAllKeysWithDestroyOnReplicatedRegionWithCacheLoader() { + List keysToDestroy = new ArrayList(); + keysToDestroy.add("0"); + runRegisterInterestWithDestroyAndCacheLoaderTest(true, keysToDestroy, "ALL_KEYS"); + } + + @Test + public void testRegisterInterestAllKeysWithDestroyOnPartitionedRegionWithCacheLoader() { + List keysToDestroy = new ArrayList(); + keysToDestroy.add("0"); + runRegisterInterestWithDestroyAndCacheLoaderTest(false, keysToDestroy, "ALL_KEYS"); + } + + private void runRegisterInterestWithDestroyAndCacheLoaderTest(boolean addReplicatedRegion, + List keysToDestroy, Object keyToRegister) { + // The server was already started with a replicated region. Bounce it if necessary + int port1 = PORT1; + if (!addReplicatedRegion) { + vm0.invoke(() -> closeCache()); + port1 = + ((Integer) vm0.invoke(() -> InterestListDUnitTest.createServerCache(addReplicatedRegion))) + .intValue(); + } + final int port = port1; + + // Add a cache loader to the region + vm0.invoke(() -> addCacheLoader()); + + // Create client cache + vm1.invoke(() -> createClientCache(NetworkUtils.getServerHostName(vm0.getHost()), port)); + + // Destroy appropriate key(s) + vm1.invoke(() -> destroyKeys(keysToDestroy)); + + // Register interest in appropriate keys(s) + vm1.invoke(() -> registerKey(keyToRegister)); + + // Verify CacheLoader was not invoked + vm0.invoke(() -> verifyNoCacheLoaderLoads()); + } + private void createCache(Properties props) throws Exception { DistributedSystem ds = getSystem(props); cache = CacheFactory.create(ds); @@ -905,6 +979,20 @@ public class InterestListDUnitTest extends JUnit4DistributedTestCase { } } + private static void destroyKeys(List keys) { + Region r = cache.getRegion(REGION_NAME); + for (Object key : keys) { + r.destroy(key); + } + } + + private static void verifyNoCacheLoaderLoads() throws Exception { + Region region = cache.getRegion(REGION_NAME); + ReturnKeyCacheLoader cacheLoader = + (ReturnKeyCacheLoader) region.getAttributes().getCacheLoader(); + assertEquals(0/* expected */, cacheLoader.getLoads()/* actual */); + } + private static void validateEntriesK1andK2(final String vm) { WaitCriterion ev = new WaitCriterion() { @Override @@ -1076,6 +1164,8 @@ public class InterestListDUnitTest extends JUnit4DistributedTestCase { private static class ReturnKeyCacheLoader implements CacheLoader { + private AtomicInteger loads = new AtomicInteger(); + @Override public void close() { // Do nothing @@ -1083,7 +1173,16 @@ public class InterestListDUnitTest extends JUnit4DistributedTestCase { @Override public Object load(LoaderHelper helper) throws CacheLoaderException { + incrementLoads(); return helper.getKey(); } + + private void incrementLoads() { + this.loads.incrementAndGet(); + } + + private int getLoads() { + return this.loads.get(); + } } }
