http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e08c1f54/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMap.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMap.java index 14a2d2f..cce0b73 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMap.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMap.java @@ -27,6 +27,7 @@ import com.gemstone.gemfire.cache.EntryNotFoundException; import com.gemstone.gemfire.cache.Operation; import com.gemstone.gemfire.cache.TimeoutException; import com.gemstone.gemfire.cache.TransactionId; +import com.gemstone.gemfire.internal.cache.AbstractRegionMap.ARMLockTestHook; import com.gemstone.gemfire.internal.cache.lru.LRUMapCallbacks; import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID; import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; @@ -391,4 +392,7 @@ public interface RegionMap extends LRUMapCallbacks { public void decTxRefCount(RegionEntry e); public void close(); + + public ARMLockTestHook getARMLockTestHook(); + }
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e08c1f54/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/StateFlushOperation.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/StateFlushOperation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/StateFlushOperation.java index 8866689..da35c86 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/StateFlushOperation.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/StateFlushOperation.java @@ -140,6 +140,11 @@ public class StateFlushOperation { dm.putOutgoing(gr); processors.add(processor); } + + if(r.getRegionMap().getARMLockTestHook()!=null) { + r.getRegionMap().getARMLockTestHook().beforeStateFlushWait(); + } + for (ReplyProcessor21 processor: processors) { try { processor.waitForReplies(); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e08c1f54/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/versions/RegionVersionVector.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/versions/RegionVersionVector.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/versions/RegionVersionVector.java index 2f02422..c543303 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/versions/RegionVersionVector.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/versions/RegionVersionVector.java @@ -403,7 +403,6 @@ public abstract class RegionVersionVector<T extends VersionSource<?>> implements this.versionLock.readLock().lock(); } } - /** release the lock preventing concurrent clear() from happening */ public void releaseCacheModificationLock(LocalRegion owner) { @@ -411,6 +410,16 @@ public abstract class RegionVersionVector<T extends VersionSource<?>> implements this.versionLock.readLock().unlock(); } } + + /** obtain a lock to prevent concurrent clear() from happening */ + public void lockForCacheModification() { + this.versionLock.readLock().lock(); + } + + /** release the lock preventing concurrent clear() from happening */ + public void releaseCacheModificationLock() { + this.versionLock.readLock().unlock(); + } private void syncLocalVersion() { long v = localVersion.get(); @@ -1461,6 +1470,7 @@ public abstract class RegionVersionVector<T extends VersionSource<?>> implements public Version[] getSerializationVersions(){ return null; } + // /** // * This class will wrap DM member IDs to provide integers that can be stored // * on disk and be timed out in the vector. http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e08c1f54/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ARMLockTestHookAdapter.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ARMLockTestHookAdapter.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ARMLockTestHookAdapter.java new file mode 100644 index 0000000..89bb4ee --- /dev/null +++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ARMLockTestHookAdapter.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache; + +import java.io.Serializable; +import java.util.concurrent.CountDownLatch; + +import com.gemstone.gemfire.cache.CacheEvent; +import com.gemstone.gemfire.test.dunit.VM; + +public class ARMLockTestHookAdapter implements AbstractRegionMap.ARMLockTestHook, Serializable { + + public void beforeBulkLock(LocalRegion region) {}; + public void afterBulkLock(LocalRegion region) {}; + public void beforeBulkRelease(LocalRegion region) {}; + public void afterBulkRelease(LocalRegion region) {}; + + public void beforeLock(LocalRegion region, CacheEvent event) {}; + public void afterLock(LocalRegion region, CacheEvent event) {}; + public void beforeRelease(LocalRegion region, CacheEvent event) {}; + public void afterRelease(LocalRegion region, CacheEvent event) {}; + + public void beforeStateFlushWait() {} +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e08c1f54/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ClearRvvLockingDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ClearRvvLockingDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ClearRvvLockingDUnitTest.java new file mode 100644 index 0000000..d97fddf --- /dev/null +++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ClearRvvLockingDUnitTest.java @@ -0,0 +1,667 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * ClearRvvLockingDUnitTest.java + * + * Created on September 6, 2005, 2:57 PM + */ +package com.gemstone.gemfire.internal.cache; + +import static com.gemstone.gemfire.test.dunit.Assert.fail; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +import org.apache.logging.log4j.Logger; +import org.assertj.core.api.JUnitSoftAssertions; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.gemstone.gemfire.cache.Cache; +import com.gemstone.gemfire.cache.CacheEvent; +import com.gemstone.gemfire.cache.CacheFactory; +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.cache.RegionFactory; +import com.gemstone.gemfire.cache.RegionShortcut; +import com.gemstone.gemfire.cache.Scope; +import com.gemstone.gemfire.distributed.DistributedMember; +import com.gemstone.gemfire.distributed.DistributedSystem; +import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; +import com.gemstone.gemfire.internal.logging.LogService; +import com.gemstone.gemfire.test.dunit.Host; +import com.gemstone.gemfire.test.dunit.SerializableCallable; +import com.gemstone.gemfire.test.dunit.SerializableRunnable; +import com.gemstone.gemfire.test.dunit.SerializableRunnableIF; +import com.gemstone.gemfire.test.dunit.VM; +import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +/** + * Test class to verify proper RVV locking interaction between + * entry operations such as PUT/REMOVE and the CLEAR region operation + * + * GEODE-599: After an operation completed, it would unlock the RVV. + * This was occurring before the operation was distributed to other members + * which created a window in which another operation could be performed + * prior to that operation being distributed. + * + * The fix for GEODE-599 was to not release the lock until after + * distributing the operation to the other members. + * + */ + +@SuppressWarnings("serial") +@Category(DistributedTest.class) +public class ClearRvvLockingDUnitTest extends JUnit4CacheTestCase { + + @Rule + public transient JUnitSoftAssertions softly = new JUnitSoftAssertions(); + /* + * The tests perform a single operation and a concurrent clear. + * + * opsVM determines where the single operation will be performed, null will perform the op on the test VM (vm0) + * clearVM determines where the clear operation will be performed, null will perform the clear on the test VM (vm0) + * + * Specifying NULL/NULL for opsVM and clearVM has the effect of performing both in the same thread + * whereas specifying vm0/vm0 for example will run them both on the same vm, but different threads. + * NULL/NULL is not tested here since the same thread performing a clear prior to returning from a put + * is not possible using the public API. + * + * Each test is performed twice once with operation and clear on the same vm, once on different vms. + * + */ + VM vm0, vm1, opsVM, clearVM; + + static Cache cache; + static LocalRegion region; + DistributedMember vm0ID, vm1ID; + + static AbstractRegionMap.ARMLockTestHook theHook; + + static final String THE_KEY = "theKey"; + static final String THE_VALUE = "theValue"; + + private static final Logger logger = LogService.getLogger(); + + //test methods + + @Test + public void testPutOperationSameVM() { + try { + setupMembers(); + setOpAndClearVM(vm0, vm0); // first arg is where to perform operation, second arg where to perform clear + opsVM.invoke(() -> setBasicHook(opsVM)); + runConsistencyTest(vm0, performPutOperation); + checkForConsistencyErrors(); + } finally { + opsVM.invoke(() -> resetHook()); + } + } + + @Test + public void testPutOperationDifferentVM() { + try { + setupMembers(); + setOpAndClearVM(vm0, vm1); // first arg is where to perform operation, second arg where to perform clear + opsVM.invoke(() -> setBasicHook(clearVM)); + runConsistencyTest(vm0, performPutOperation); + checkForConsistencyErrors(); + } finally { + opsVM.invoke(() -> resetHook()); + } + } + + @Test + public void testPutOperationNoAck() { + try { + setupNoAckMembers(); + setOpAndClearVM(vm0, vm0); + vm0.invoke(() -> setLocalNoAckHook(vm1)); + vm1.invoke(() -> setRemoteNoAckHook(vm0)); + vm0.invoke(() -> primeStep1(1)); + vm1.invoke(() -> primeStep2(1)); + runConsistencyTest(vm0, performNoAckPutOperation); + checkForConsistencyErrors(); + } finally { + vm0.invoke(() -> resetHook()); + vm1.invoke(() -> resetHook()); + } + } + + @Test + public void testRemoveOperationSameVM() { + try { + setupMembers(); + setOpAndClearVM(vm0, vm0); + opsVM.invoke(() -> setRemoveAndInvalidateHook(clearVM)); + runConsistencyTest(vm0, performRemoveOperation); + checkForConsistencyErrors(); + } finally { + opsVM.invoke(() -> resetHook()); + } + } + + @Test + public void testRemoveOperationDifferentVM() { + try { + setupMembers(); + setOpAndClearVM(vm0, vm1); + opsVM.invoke(() -> setRemoveAndInvalidateHook(clearVM)); + runConsistencyTest(vm0, performRemoveOperation); + checkForConsistencyErrors(); + } finally { + opsVM.invoke(() -> resetHook()); + } + } + + @Test + public void testInvalidateOperationSameVM() { + try { + setupMembers(); + setOpAndClearVM(vm0, vm0); + opsVM.invoke(() -> setRemoveAndInvalidateHook(clearVM)); + runConsistencyTest(vm0, performInvalidateOperation); + checkForConsistencyErrors(); + } finally { + opsVM.invoke(() -> resetHook()); + } + } + + @Test + public void testInvalidateOperationDifferentVM() { + try { + setupMembers(); + setOpAndClearVM(vm0, vm1); + opsVM.invoke(() -> setRemoveAndInvalidateHook(clearVM)); + runConsistencyTest(vm0, performInvalidateOperation); + checkForConsistencyErrors(); + } finally { + opsVM.invoke(() -> resetHook()); + } + } + + @Test + public void testPutAllOperationSameVM() { + try { + setupMembers(); + setOpAndClearVM(vm0, vm0); + opsVM.invoke(() -> setBulkHook(vm0)); + runConsistencyTest(vm0, performPutAllOperation); + checkForConsistencyErrors(); + } finally { + opsVM.invoke(() -> resetHook()); + } + } + + @Test + public void testPutAllOperationDifferentVM() { + try { + setupMembers(); + setOpAndClearVM(vm0, vm1); + opsVM.invoke(() -> setBulkHook(vm0)); + runConsistencyTest(vm0, performPutAllOperation); + checkForConsistencyErrors(); + } finally { + opsVM.invoke(() -> resetHook()); + } + } + + @Test + public void testRemoveAllOperationSameVM() { + try { + setupMembers(); + setOpAndClearVM(vm0, vm0); + opsVM.invoke(() -> setBulkHook(vm0)); + runConsistencyTest(vm0, performRemoveAllOperation); + checkForConsistencyErrors(); + } finally { + opsVM.invoke(() -> resetHook()); + } + } + + @Test + public void testRemoveAllOperationDifferentVM() { + try { + setupMembers(); + setOpAndClearVM(vm0, vm1); + opsVM.invoke(() -> setBulkHook(vm0)); + runConsistencyTest(vm0, performRemoveAllOperation); + checkForConsistencyErrors(); + } finally { + opsVM.invoke(() -> resetHook()); + } + } + + + private void invokePut(VM whichVM) { + if(whichVM==null) { + doPut(); + } else { + whichVM.invoke(() -> doPut()); + } + } + + private void invokeRemove(VM whichVM) { + if(whichVM==null) { + doRemove(); + } else { + whichVM.invoke(() -> doRemove()); + } + } + + private void invokeInvalidate(VM whichVM) { + if(whichVM==null) { + doInvalidate(); + } else { + whichVM.invoke(() -> doInvalidate()); + } + } + + private void invokePutAll(VM whichVM) { + if(whichVM==null) { + doPutAll(); + } else { + whichVM.invoke(() -> doPutAll()); + } + } + + private void invokeRemoveAll(VM whichVM) { + if(whichVM==null) { + doRemoveAll(); + } else { + whichVM.invoke(() -> doRemoveAll()); + } + } + + private static void invokeClear(VM whichVM) { + if(whichVM==null) { + doClear(); + } else { + whichVM.invoke(() -> doClear()); + } + } + + // remote test methods + + private static boolean doesRegionEntryExist(String key) { + return region.getRegionEntry(key)!=null; + } + + private static void doPut() { + region.put(THE_KEY, THE_VALUE); + } + + private static void doRemove() { + region.remove(THE_KEY); + } + + private static void doInvalidate() { + region.invalidate(THE_KEY); + } + + private static void doPutAll() { + Map<Object, Object> map = generateKeyValues(); + region.putAll(map, "putAllCallback"); + } + + private static void doRemoveAll() { + Map<Object, Object> map = generateKeyValues(); + region.removeAll(map.keySet(), "removeAllCallback"); + } + + private static void doClear() { + region.clear(); + } + + private static void primeStep1(int cnt) { + primeStep1Latch(cnt); + } + + private static void primeStep2(int cnt) { + primeStep2Latch(cnt); + } + + private static void releaseStep1() { + decrementStep1Latch(); + } + + SerializableRunnable performPutOperation = new SerializableRunnable("perform PUT") { + @Override + public void run() { + try { + invokePut(opsVM); + } catch (Exception e) { + fail("while performing PUT", e); + } + } + }; + + SerializableRunnable performNoAckPutOperation = new SerializableRunnable("perform NoAckPUT") { + @Override + public void run() throws InterruptedException { + Runnable putThread1 = new Runnable() { + public void run() { + DistributedSystem.setThreadsSocketPolicy(false); + doPut(); + DistributedSystem.releaseThreadsSockets(); + } + }; + + Runnable putThread2 = new Runnable() { + public void run() { + DistributedSystem.setThreadsSocketPolicy(false); + awaitStep1Latch(); + doClear(); + DistributedSystem.releaseThreadsSockets(); + } + }; + + Thread t1 = new Thread(putThread1); + Thread t2 = new Thread(putThread2); + t2.start(); + t1.start(); + t1.join(); + t2.join(); + } + }; + + SerializableRunnable performRemoveOperation = new SerializableRunnable("perform REMOVE") { + @Override + public void run() { + try { + invokePut(opsVM); + invokeRemove(opsVM); + } catch (Exception e) { + fail("while performing REMOVE", e); + } + } + }; + + SerializableRunnable performInvalidateOperation = new SerializableRunnable("perform INVALIDATE") { + @Override + public void run() { + try { + invokePut(opsVM); + invokeInvalidate(opsVM); + } catch (Exception e) { + fail("while performing INVALIDATE", e); + } + } + }; + + SerializableRunnable performPutAllOperation = new SerializableRunnable("perform PUTALL") { + @Override + public void run() { + try { + invokePutAll(opsVM); + } catch (Exception e) { + fail("while performing PUTALL", e); + } + } + }; + + SerializableRunnable performRemoveAllOperation = new SerializableRunnable("perform REMOVEALL") { + @Override + public void run() { + try { + invokePutAll(opsVM); + invokeRemoveAll(opsVM); + } catch (Exception e) { + fail("while performing REMOVEALL", e); + } + } + }; + + // helper methods + + private void setOpAndClearVM(VM opsTarget, VM clearTarget) { + opsVM = opsTarget; + clearVM = clearTarget; + } + + private void setupMembers() { + Host host = Host.getHost(0); + vm0 = host.getVM(0); + vm1 = host.getVM(1); + vm0ID = createCache(vm0); + vm1ID = createCache(vm1); + String testName = getName(); + vm0.invoke(() -> createRegion(testName)); + vm1.invoke(() -> createRegion(testName)); + } + + private void setupNoAckMembers() { + Host host = Host.getHost(0); + vm0 = host.getVM(0); + vm1 = host.getVM(1); + vm0ID = createNoConserveSocketsCache(vm0); + vm1ID = createNoConserveSocketsCache(vm1); + String testName = getName(); + vm0.invoke(() -> createNOACKRegion(testName)); + vm1.invoke(() -> createNOACKRegion(testName)); + } + + private void runConsistencyTest(VM vm, SerializableRunnableIF theTest) { + vm.invoke(theTest); + } + + private void checkForConsistencyErrors() { + Map<Object, Object> r0Contents = (Map<Object, Object>)vm0.invoke(() -> getRegionContents()); + Map<Object, Object> r1Contents = (Map<Object, Object>)vm1.invoke(() -> getRegionContents()); + + String key = THE_KEY; + softly.assertThat(r1Contents.get(key)).as("region contents are not consistent for key %s", key).isEqualTo(r0Contents.get(key)); + softly.assertThat(checkRegionEntry(vm1, key)).as("region entries are not consistent for key %s", key).isEqualTo(checkRegionEntry(vm0, key)); + + for (int subi=1; subi<3; subi++) { + String subkey = key + "-" + subi; + if (r0Contents.containsKey(subkey)) { + softly.assertThat(r1Contents.get(subkey)).as("region contents are not consistent for key %s", subkey).isEqualTo(r0Contents.get(subkey)); + } else { + softly.assertThat(r1Contents).as("expected containsKey for %s to return false", subkey).doesNotContainKey(subkey); + } + } + } + + public void resetHook() { + ((AbstractRegionMap) region.entries).setARMLockTestHook(null); + } + + public void setBasicHook(VM whichVM) { + theOtherVM = whichVM; + theHook = new ArmBasicClearHook(); + ((AbstractRegionMap) region.entries).setARMLockTestHook(theHook); + } + + public void setRemoveAndInvalidateHook(VM whichVM) { + theOtherVM = whichVM; + theHook = new ArmRemoveAndInvalidateClearHook(); + ((AbstractRegionMap) region.entries).setARMLockTestHook(theHook); + } + + public void setRemoteNoAckHook(VM whichVM) { + theOtherVM = whichVM; + theHook = new ArmNoAckRemoteHook(); + ((AbstractRegionMap) region.entries).setARMLockTestHook(theHook); + } + + public void setLocalNoAckHook(VM whichVM) { + theOtherVM = whichVM; + theHook = new ArmNoAckLocalHook(); + ((AbstractRegionMap) region.entries).setARMLockTestHook(theHook); + } + + public void setBulkHook(VM whichVM) { + theOtherVM = whichVM; + theHook = new ArmBulkClearHook(); + ((AbstractRegionMap) region.entries).setARMLockTestHook(theHook); + } + + private InternalDistributedMember createCache(VM vm) { + return (InternalDistributedMember) vm.invoke(new SerializableCallable<Object>() { + public Object call() { + cache = getCache(new CacheFactory().set("conserve-sockets", "true")); + return getSystem().getDistributedMember(); + } + }); + } + + private InternalDistributedMember createNoConserveSocketsCache(VM vm) { + return (InternalDistributedMember) vm.invoke(new SerializableCallable<Object>() { + public Object call() { + cache = getCache(new CacheFactory().set("conserve-sockets", "false")); + return getSystem().getDistributedMember(); + } + }); + } + + private static void createRegion(String rgnName) { + RegionFactory<Object, Object> rf = cache.createRegionFactory(RegionShortcut.REPLICATE); + rf.setConcurrencyChecksEnabled(true); + rf.setScope(Scope.DISTRIBUTED_ACK); + region = (LocalRegion)rf.create(rgnName); + } + + private static void createNOACKRegion(String rgnName) { + RegionFactory<Object, Object> rf = cache.createRegionFactory(RegionShortcut.REPLICATE); + rf.setConcurrencyChecksEnabled(true); + rf.setScope(Scope.DISTRIBUTED_NO_ACK); + region = (LocalRegion)rf.create(rgnName); + } + + private static Map<Object, Object> generateKeyValues() { + String key = THE_KEY; + String value = THE_VALUE; + Map<Object, Object> map = new HashMap<>(); + map.put(key, value); + map.put(key+"-1", value+"-1"); + map.put(key+"-2", value+"-2"); + return map; + } + + @SuppressWarnings("rawtypes") + private static Map<Object, Object> getRegionContents() { + Map<Object, Object> result = new HashMap<>(); + for (Iterator i=region.entrySet().iterator(); i.hasNext(); ) { + Region.Entry e = (Region.Entry)i.next(); + result.put(e.getKey(), e.getValue()); + } + return result; + } + + private boolean checkRegionEntry(VM vm, String key) { + boolean target = vm.invoke(() -> doesRegionEntryExist(key)); + return target; + } + + static VM theOtherVM; + transient static CountDownLatch step1Latch, step2Latch; + + public static void primeStep1Latch(int waitCount) { + step1Latch = new CountDownLatch(waitCount); + } + + public static void awaitStep1Latch() { + try { + step1Latch.await(); + } catch (InterruptedException e) {} + } + + public static void decrementStep1Latch() { + step1Latch.countDown(); + } + + public static void decrementRemoteStep1Latch() { + theOtherVM.invoke(() -> decrementStep1Latch()); + } + + public static void primeStep2Latch(int waitCount) { + step2Latch = new CountDownLatch(waitCount); + } + + public static void awaitStep2Latch() { + try { + step2Latch.await(); + } catch (InterruptedException e) {} + } + + public static void decrementStep2Latch() { + step2Latch.countDown(); + } + + public static void decrementRemoteStep2Latch() { + theOtherVM.invoke(() -> decrementStep2Latch()); + } + /* + * Test callback class used to hook the rvv locking mechanism with basic operations. + */ + public static class ArmBasicClearHook extends ARMLockTestHookAdapter { + @Override + public void afterRelease(LocalRegion owner, CacheEvent event) { + if((event.getOperation().isCreate()) && owner.getName().startsWith("test")) { + invokeClear(theOtherVM); + } + } + } + + /* + * Test callback class used to hook the rvv locking mechanism with basic operations. + */ + public static class ArmRemoveAndInvalidateClearHook extends ARMLockTestHookAdapter { + + @Override + public void afterRelease(LocalRegion owner, CacheEvent event) { + if((event.getOperation().isDestroy() || + event.getOperation().isInvalidate()) && + owner.getName().startsWith("test")) { + invokeClear(theOtherVM); + } + } + } + + /* + * Test callback class used to hook the rvv locking mechanism for NOACK testing. + */ + public static class ArmNoAckRemoteHook extends ARMLockTestHookAdapter { + @Override + public void beforeLock(LocalRegion owner, CacheEvent event) { + if(event.isOriginRemote() && event.getOperation().isCreate() && owner.getName().startsWith("test")) { + theOtherVM.invoke(() -> releaseStep1()); // start clear + awaitStep2Latch(); // wait for clear to complete + } + } + } + + public static class ArmNoAckLocalHook extends ARMLockTestHookAdapter { + @Override + public void beforeStateFlushWait() { + decrementRemoteStep2Latch(); + } + } + + /* + * Test callback class used to hook the rvv locking mechanism with bulk operations. + */ + public static class ArmBulkClearHook extends ARMLockTestHookAdapter { + @Override + public void afterBulkRelease(LocalRegion region) { + invokeClear(theOtherVM); + } + } +}
