GEODE-1740: Correct potential region inconsistencies with concurrent clear and transaction commit
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/7fa2c08c Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/7fa2c08c Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/7fa2c08c Branch: refs/heads/develop Commit: 7fa2c08cf403139afa7a60a81392ca13034e490b Parents: f0cdb66 Author: Scott Jewell <sjew...@pivotal.io> Authored: Wed Nov 2 15:59:35 2016 -0700 Committer: Kenneth Howe <kh...@apache.org> Committed: Thu Dec 1 11:03:47 2016 -0800 ---------------------------------------------------------------------- .../geode/internal/cache/AbstractRegionMap.java | 43 +- .../apache/geode/internal/cache/RegionMap.java | 4 + .../apache/geode/internal/cache/TXState.java | 79 +++- .../internal/cache/ClearTXLockingDUnitTest.java | 431 +++++++++++++++++++ 4 files changed, 505 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7fa2c08c/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java index 96936eef..e3e87ea 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java @@ -1551,7 +1551,6 @@ public abstract class AbstractRegionMap implements RegionMap { final boolean isRegionReady = !inTokenMode; final boolean hasRemoteOrigin = !((TXId) txId).getMemberId().equals(owner.getMyId()); boolean cbEventInPending = false; - lockForTXCacheModification(owner, versionTag); IndexManager oqlIndexManager = owner.getIndexManager(); try { RegionEntry re = getEntry(key); @@ -1818,8 +1817,6 @@ public abstract class AbstractRegionMap implements RegionMap { } catch (DiskAccessException dae) { owner.handleDiskAccessException(dae); throw dae; - } finally { - releaseTXCacheModificationLock(owner, versionTag); } } @@ -2353,7 +2350,6 @@ public abstract class AbstractRegionMap implements RegionMap { if (oqlIndexManager != null) { oqlIndexManager.waitForIndexInit(); } - lockForTXCacheModification(owner, versionTag); try { if (forceNewEntry) { boolean opCompleted = false; @@ -2582,7 +2578,6 @@ public abstract class AbstractRegionMap implements RegionMap { owner.handleDiskAccessException(dae); throw dae; } finally { - releaseTXCacheModificationLock(owner, versionTag); if (oqlIndexManager != null) { oqlIndexManager.countDownIndexUpdaters(); } @@ -3115,7 +3110,6 @@ public abstract class AbstractRegionMap implements RegionMap { if (oqlIndexManager != null) { oqlIndexManager.waitForIndexInit(); } - lockForTXCacheModification(owner, versionTag); try { if (hasRemoteOrigin && !isTXHost && !isClientTXOriginator) { // If we are not a mirror then only apply the update to existing @@ -3384,7 +3378,6 @@ public abstract class AbstractRegionMap implements RegionMap { owner.handleDiskAccessException(dae); throw dae; } finally { - releaseTXCacheModificationLock(owner, versionTag); if (oqlIndexManager != null) { oqlIndexManager.countDownIndexUpdaters(); } @@ -3693,40 +3686,32 @@ public abstract class AbstractRegionMap implements RegionMap { } - /** get version-generation permission from the region's version vector */ - private void lockForTXCacheModification(LocalRegion owner, VersionTag tag) { - + @Override + public void lockRegionForAtomicTX(LocalRegion r) { if (armLockTestHook != null) - armLockTestHook.beforeLock(owner, null); + armLockTestHook.beforeLock(r, null); - if (!(tag != null && tag.isFromOtherMember())) { - RegionVersionVector vector = owner.getVersionVector(); - if (vector != null && !owner.hasServerProxy()) { - vector.lockForCacheModification(); - } + RegionVersionVector vector = r.getVersionVector(); + if (vector != null) { + vector.lockForCacheModification(); } if (armLockTestHook != null) - armLockTestHook.afterLock(owner, null); - + armLockTestHook.afterLock(r, null); } - /** release version-generation permission from the region's version vector */ - private void releaseTXCacheModificationLock(LocalRegion owner, VersionTag tag) { - + @Override + public void unlockRegionForAtomicTX(LocalRegion r) { if (armLockTestHook != null) - armLockTestHook.beforeRelease(owner, null); + armLockTestHook.beforeRelease(r, null); - if (!(tag != null && tag.isFromOtherMember())) { - RegionVersionVector vector = owner.getVersionVector(); - if (vector != null && !owner.hasServerProxy()) { - vector.releaseCacheModificationLock(); - } + RegionVersionVector vector = r.getVersionVector(); + if (vector != null) { + vector.releaseCacheModificationLock(); } if (armLockTestHook != null) - armLockTestHook.afterRelease(owner, null); - + armLockTestHook.afterRelease(r, null); } /** http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7fa2c08c/geode-core/src/main/java/org/apache/geode/internal/cache/RegionMap.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionMap.java index ee8a84e..7ecabd7 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionMap.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionMap.java @@ -372,6 +372,10 @@ public interface RegionMap extends LRUMapCallbacks { public void close(); + default void lockRegionForAtomicTX(LocalRegion r) {} + + default void unlockRegionForAtomicTX(LocalRegion r) {} + public ARMLockTestHook getARMLockTestHook(); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7fa2c08c/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java index 99a3b83..d577f39 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java @@ -103,6 +103,7 @@ public class TXState implements TXStateInterface { // Internal testing hooks private Runnable internalAfterReservation; protected Runnable internalAfterConflictCheck; + protected Runnable internalDuringApplyChanges; protected Runnable internalAfterApplyChanges; protected Runnable internalAfterReleaseLocalLocks; Runnable internalDuringIndividualSend; // package scope allows TXCommitMessage use @@ -460,34 +461,38 @@ public class TXState implements TXStateInterface { attachFilterProfileInformation(entries); - // apply changes to the cache - applyChanges(entries); - // For internal testing - if (this.internalAfterApplyChanges != null) { - this.internalAfterApplyChanges.run(); - } + lockTXRegions(regions); - // build and send the message - msg = buildMessage(); - this.commitMessage = msg; - if (this.internalBeforeSend != null) { - this.internalBeforeSend.run(); - } + try { + // apply changes to the cache + applyChanges(entries); + // For internal testing + if (this.internalAfterApplyChanges != null) { + this.internalAfterApplyChanges.run(); + } + // build and send the message + msg = buildMessage(); + this.commitMessage = msg; + if (this.internalBeforeSend != null) { + this.internalBeforeSend.run(); + } + msg.send(this.locks.getDistributedLockId()); + // For internal testing + if (this.internalAfterSend != null) { + this.internalAfterSend.run(); + } - msg.send(this.locks.getDistributedLockId()); - // For internal testing - if (this.internalAfterSend != null) { - this.internalAfterSend.run(); + firePendingCallbacks(); + /* + * This is to prepare the commit message for the caller, make sure all events are in + * there. + */ + this.commitMessage = buildCompleteMessage(); + } finally { + unlockTXRegions(regions); } - - firePendingCallbacks(); - /* - * This is to prepare the commit message for the caller, make sure all events are in there. - */ - this.commitMessage = buildCompleteMessage(); - } finally { if (msg != null) { msg.releaseViewVersions(); @@ -503,6 +508,24 @@ public class TXState implements TXStateInterface { } } + private void lockTXRegions(IdentityHashMap<LocalRegion, TXRegionState> regions) { + Iterator<Map.Entry<LocalRegion, TXRegionState>> it = regions.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry<LocalRegion, TXRegionState> me = it.next(); + LocalRegion r = me.getKey(); + r.getRegionMap().lockRegionForAtomicTX(r); + } + } + + private void unlockTXRegions(IdentityHashMap<LocalRegion, TXRegionState> regions) { + Iterator<Map.Entry<LocalRegion, TXRegionState>> it = regions.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry<LocalRegion, TXRegionState> me = it.next(); + LocalRegion r = me.getKey(); + r.getRegionMap().unlockRegionForAtomicTX(r); + } + } + protected void attachFilterProfileInformation(List entries) { { Iterator/* <TXEntryStateWithRegionAndKey> */ it = entries.iterator(); @@ -769,6 +792,9 @@ public class TXState implements TXStateInterface { Iterator/* <TXEntryStateWithRegionAndKey> */ it = entries.iterator(); while (it.hasNext()) { TXEntryStateWithRegionAndKey o = (TXEntryStateWithRegionAndKey) it.next(); + if (this.internalDuringApplyChanges != null) { + this.internalDuringApplyChanges.run(); + } try { o.es.applyChanges(o.r, o.key, this); } catch (RegionDestroyedException ex) { @@ -1073,6 +1099,13 @@ public class TXState implements TXStateInterface { } /** + * Add an internal callback which is run as each transaction change is applied. + */ + public void setDuringApplyChanges(Runnable duringApplyChanges) { + this.internalDuringApplyChanges = duringApplyChanges; + } + + /** * Add an internal callback which is run after the transaction changes have been applied to * committed state (locally) but before local locks are released (occurs for regions of Local and * Distributed No Ack scope). http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7fa2c08c/geode-core/src/test/java/org/apache/geode/internal/cache/ClearTXLockingDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ClearTXLockingDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ClearTXLockingDUnitTest.java new file mode 100644 index 0000000..b620383 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ClearTXLockingDUnitTest.java @@ -0,0 +1,431 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +/* + * ClearRvvLockingDUnitTest.java + * + * Created on September 6, 2005, 2:57 PM + */ +package org.apache.geode.internal.cache; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.CacheEvent; +import org.apache.geode.cache.CacheFactory; +import org.apache.geode.cache.CacheTransactionManager; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionFactory; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.Scope; +import org.apache.geode.distributed.DistributedMember; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.logging.LogService; +import org.apache.geode.test.dunit.Host; +import org.apache.geode.test.dunit.SerializableCallable; +import org.apache.geode.test.dunit.SerializableRunnable; +import org.apache.geode.test.dunit.SerializableRunnableIF; +import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase; +import org.apache.geode.test.junit.categories.DistributedTest; +import org.apache.logging.log4j.Logger; +import org.assertj.core.api.JUnitSoftAssertions; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test class to verify proper locking interaction between transactions and the CLEAR region + * operation. + * + * GEODE-1740: It was observed that operations performed within a transaction were not holding + * region modification locks for the duration of commit processing. This lock is used to ensure region + * consistency during CLEAR processing. By not holding the lock for the duration of commit processing, + * a window was opened that allowed region operations such as clear to occur in mid-commit. + * + * The fix for GEODE-1740 was to acquire and hold read locks for any region involved in the commit. + * This forces CLEAR to wait until commit processing is complete. + */ +@SuppressWarnings("serial") +@Category(DistributedTest.class) +public class ClearTXLockingDUnitTest extends JUnit4CacheTestCase { + + @Rule + public transient JUnitSoftAssertions softly = new JUnitSoftAssertions(); + /* + * This test performs operations within a transaction and during commit processing + * schedules a clear to be performed on the relevant region. The scheduled clear should wait until + * commit processing is complete before clearing the region. Failure to do so, would result in + * region inconsistencies. + */ + VM vm0, vm1, opsVM, regionVM; + + static Cache cache; + + ArmLockHook theArmHook; + + DistributedMember vm0ID, vm1ID; + + static CacheTransactionManager txmgr; + + static final String THE_KEY = "theKey"; + static final String THE_VALUE = "theValue"; + static final int NUMBER_OF_PUTS = 2; + + static final String REGION_NAME1 = "testRegion1"; + static final String REGION_NAME2 = "testRegion2"; + + static CountDownLatch opsLatch; + static CountDownLatch regionLatch; + static CountDownLatch verifyLatch; + + private static final Logger logger = LogService.getLogger(); + + // test methods + + @Test + public void testPutWithClearSameVM() throws InterruptedException { + getVMs(); + setupRegions(vm0, vm0); + setClearHook(REGION_NAME1, opsVM, regionVM); + performTestAndCheckResults(putOperationsTest); + } + + @Test + public void testPutWithClearDifferentVM() throws InterruptedException { + getVMs(); + setupRegions(vm0, vm1); + setClearHook(REGION_NAME1, opsVM, regionVM); + performTestAndCheckResults(putOperationsTest); + } + + /* + * The CLOSE tests are ignored until the close operation has been + * updated to acquire a write lock during processing. + */ + @Ignore + @Test + public void testPutWithCloseSameVM() throws InterruptedException { + getVMs(); + setupRegions(vm0, vm0); + setCloseHook(REGION_NAME1, opsVM, regionVM); + performTestAndCheckResults(putOperationsTest); + } + + @Ignore + @Test + public void testPutWithCloseDifferentVM() throws InterruptedException { + getVMs(); + setupRegions(vm0, vm1); + setCloseHook(REGION_NAME1, opsVM, regionVM); + performTestAndCheckResults(putOperationsTest); + } + + /* + * The DESTROY_REGION tests are ignored until the destroy operation has been + * updated to acquire a write lock during processing. + */ + @Ignore + @Test + public void testPutWithDestroyRegionSameVM() throws InterruptedException { + getVMs(); + setupRegions(vm0, vm0); + setDestroyRegionHook(REGION_NAME1, opsVM, regionVM); + performTestAndCheckResults(putOperationsTest); + } + + @Ignore + @Test + public void testPutWithDestroyRegionDifferentVM() throws InterruptedException { + getVMs(); + setupRegions(vm0, vm1); + setDestroyRegionHook(REGION_NAME1, opsVM, regionVM); + performTestAndCheckResults(putOperationsTest); + } + + // Local methods + + /* + * This method executes a runnable test and then checks for region consistency + */ + private void performTestAndCheckResults(SerializableRunnable operationsTest) throws InterruptedException { + try { + runLockingTest(opsVM, operationsTest); + checkForConsistencyErrors(REGION_NAME1); + checkForConsistencyErrors(REGION_NAME2); + } finally { + opsVM.invoke(() -> resetArmHook(REGION_NAME1)); + } + } + + /* + * We will be using 2 vms. One for the transaction and one for the clear + */ + private void getVMs() { + Host host = Host.getHost(0); + vm0 = host.getVM(0); + vm1 = host.getVM(1); + } + + /* + * Set which vm will perform the transaction and which will perform the region operation + * and create the regions on the vms + */ + private void setupRegions(VM opsTarget, VM regionTarget) { + opsVM = opsTarget; + regionVM = regionTarget; + vm0ID = createCache(vm0); + vm1ID = createCache(vm1); + vm0.invoke(() -> createRegion(REGION_NAME1)); + vm0.invoke(() -> createRegion(REGION_NAME2)); + vm1.invoke(() -> createRegion(REGION_NAME1)); + vm1.invoke(() -> createRegion(REGION_NAME2)); + } + + /* + * Invoke a runnable on the operations vm + */ + private void runLockingTest(VM vm, SerializableRunnableIF theTest) { + vm.invoke(theTest); + } + + /* + * Runnable used to invoke the actual test + */ + SerializableRunnable putOperationsTest = new SerializableRunnable("perform PUT") { + @Override + public void run() { + opsVM.invoke(() -> doPuts(getCache(), regionVM)); + } + }; + + /* + * Set arm hook to detect when region operation is attempting to acquire write lock + * and stage the clear that will be released half way through commit processing. + */ + public void setClearHook(String rname, VM whereOps, VM whereClear) { + whereOps.invoke(() -> setArmHook(rname)); + whereClear.invokeAsync(() -> stageClear(rname, whereOps)); + } + + // remote test methods + + /* + * Wait to be notified and then execute the clear. + * Once the clear completes, notify waiter to perform region verification. + */ + private static void stageClear(String rname, VM whereOps) throws InterruptedException { + regionOperationWait(); + LocalRegion r = (LocalRegion) cache.getRegion(rname); + r.clear(); + whereOps.invoke(() -> releaseVerify()); + } + + /* + * Set and stage method for close and destroy are the same as clear + */ + public void setCloseHook(String rname, VM whereOps, VM whereClear) { + whereOps.invoke(() -> setArmHook(rname)); + whereClear.invokeAsync(() -> stageClose(rname, whereOps)); + } + + private static void stageClose(String rname, VM whereOps) throws InterruptedException { + regionOperationWait(); + LocalRegion r = (LocalRegion) cache.getRegion(rname); + r.close(); + whereOps.invoke(() -> releaseVerify()); + } + + public void setDestroyRegionHook(String rname, VM whereOps, VM whereClear) { + whereOps.invoke(() -> setArmHook(rname)); + whereClear.invokeAsync(() -> stageDestroyRegion(rname, whereOps)); + } + + private static void stageDestroyRegion(String rname, VM whereOps) throws InterruptedException { + regionOperationWait(); + LocalRegion r = (LocalRegion) cache.getRegion(rname); + r.destroyRegion(); + whereOps.invoke(() -> releaseVerify()); + } + + /* + * Set the abstract region map lock hook to detect + * attempt to acquire write lock by region operation. + */ + public void setArmHook(String rname) { + LocalRegion r = (LocalRegion) cache.getRegion(rname); + theArmHook = new ArmLockHook(); + ((AbstractRegionMap) r.entries).setARMLockTestHook(theArmHook); + } + + /* + * Cleanup arm lock hook by setting it null + */ + public void resetArmHook(String rname) { + LocalRegion r = (LocalRegion) cache.getRegion(rname); + ((AbstractRegionMap) r.entries).setARMLockTestHook(null); + } + + /* + * Wait to be notified it is time to perform region operation (i.e. CLEAR) + */ + private static void regionOperationWait() throws InterruptedException { + regionLatch = new CountDownLatch(1); + regionLatch.await(); + } + + /* + * A simple transaction that will have a region operation execute during commit. + * opsLatch is used to wait until region operation has been scheduled during commit + * and verifyLatch is used to ensure commit and clear processing have both completed. + */ + private static void doPuts(Cache cache, VM whereRegion) throws InterruptedException { + TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager(); + + opsLatch = new CountDownLatch(1); + verifyLatch = new CountDownLatch(1); + + txManager.begin(); + TXStateInterface txState = ((TXStateProxyImpl)txManager.getTXState()).getRealDeal(null,null); + ((TXState)txState).setDuringApplyChanges(new CommitTestCallback(whereRegion)); + + Region region1 = cache.getRegion(REGION_NAME1); + Region region2 = cache.getRegion(REGION_NAME2); + for (int i = 0; i < NUMBER_OF_PUTS; i++) { + region1.put(REGION_NAME1 + THE_KEY + i, THE_VALUE + i); + region2.put(REGION_NAME2 + THE_KEY + i, THE_VALUE + i); + } + + txManager.commit(); + verifyLatch.await(); + } + + /* + * Release the region operation that has been previously staged + */ + private static void releaseRegionOperation(VM whereRegion) { + whereRegion.invoke(() -> regionLatch.countDown()); + } + + /* + * Region operation has been scheduled, now resume commit processing + */ + private static void releaseOps() { + opsLatch.countDown(); + } + + /* + * Notify waiter it is time to verify region contents + */ + private static void releaseVerify() { + verifyLatch.countDown(); + } + + private InternalDistributedMember createCache(VM vm) { + return (InternalDistributedMember) vm.invoke(new SerializableCallable<Object>() { + public Object call() { + cache = getCache(new CacheFactory().set("conserve-sockets", "true")); + return getSystem().getDistributedMember(); + } + }); + } + + private static void createRegion(String rgnName) { + RegionFactory<Object, Object> rf = cache.createRegionFactory(RegionShortcut.REPLICATE); + rf.setConcurrencyChecksEnabled(true); + rf.setScope(Scope.DISTRIBUTED_ACK); + rf.create(rgnName); + } + + /* + * Get region contents from each member and verify they are consistent + */ + private void checkForConsistencyErrors(String rname) { + Map<Object, Object> r0Contents = + (Map<Object, Object>) vm0.invoke(() -> getRegionContents(rname)); + Map<Object, Object> r1Contents = + (Map<Object, Object>) vm1.invoke(() -> getRegionContents(rname)); + + for (int i = 0; i < NUMBER_OF_PUTS; i++) { + String theKey = rname + THE_KEY + i; + if (r0Contents.containsKey(theKey)) { + softly.assertThat(r1Contents.get(theKey)) + .as("region contents are not consistent for key %s", theKey) + .isEqualTo(r0Contents.get(theKey)); + } else { + softly.assertThat(r1Contents).as("expected containsKey for %s to return false", theKey) + .doesNotContainKey(theKey); + } + } + } + + @SuppressWarnings("rawtypes") + private static Map<Object, Object> getRegionContents(String rname) { + LocalRegion r = (LocalRegion) cache.getRegion(rname); + Map<Object, Object> result = new HashMap<>(); + for (Iterator i = r.entrySet().iterator(); i.hasNext();) { + Region.Entry e = (Region.Entry) i.next(); + result.put(e.getKey(), e.getValue()); + } + return result; + } + + /* + * Test callback called for each operation during commit processing. + * Half way through commit processing, release the region operation. + */ + static class CommitTestCallback implements Runnable { + VM whereRegionOperation; + static int callCount; + /* entered twice for each put lap since there are 2 regions */ + static int releasePoint = NUMBER_OF_PUTS; + + public CommitTestCallback(VM whereRegion) { + whereRegionOperation = whereRegion; + callCount = 0; + } + + public void run() { + callCount++; + if(callCount==releasePoint) { + releaseRegionOperation(whereRegionOperation); + try {opsLatch.await();} catch (InterruptedException e) {} + } + } + } + + /* + * The region operations attempt to acquire the write lock will hang while + * commit processing is occurring. Before this occurs, resume commit processing. + */ + public class ArmLockHook extends ARMLockTestHookAdapter { + int txCalls = 0; + int releasePoint = NUMBER_OF_PUTS / 2; + CountDownLatch putLatch = new CountDownLatch(1); + + @Override + public void beforeLock(LocalRegion owner, CacheEvent event) { + if(event!=null) { + if (event.getOperation().isClear() || event.getOperation().isRegionDestroy() || event.getOperation().isClose()) { + releaseOps(); + } + } + } + } + +}