GEODE-1740: Correct potential region inconsistencies with concurrent clear and 
transaction commit


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/7fa2c08c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/7fa2c08c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/7fa2c08c

Branch: refs/heads/develop
Commit: 7fa2c08cf403139afa7a60a81392ca13034e490b
Parents: f0cdb66
Author: Scott Jewell <sjew...@pivotal.io>
Authored: Wed Nov 2 15:59:35 2016 -0700
Committer: Kenneth Howe <kh...@apache.org>
Committed: Thu Dec 1 11:03:47 2016 -0800

----------------------------------------------------------------------
 .../geode/internal/cache/AbstractRegionMap.java |  43 +-
 .../apache/geode/internal/cache/RegionMap.java  |   4 +
 .../apache/geode/internal/cache/TXState.java    |  79 +++-
 .../internal/cache/ClearTXLockingDUnitTest.java | 431 +++++++++++++++++++
 4 files changed, 505 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7fa2c08c/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
index 96936eef..e3e87ea 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
@@ -1551,7 +1551,6 @@ public abstract class AbstractRegionMap implements 
RegionMap {
     final boolean isRegionReady = !inTokenMode;
     final boolean hasRemoteOrigin = !((TXId) 
txId).getMemberId().equals(owner.getMyId());
     boolean cbEventInPending = false;
-    lockForTXCacheModification(owner, versionTag);
     IndexManager oqlIndexManager = owner.getIndexManager();
     try {
       RegionEntry re = getEntry(key);
@@ -1818,8 +1817,6 @@ public abstract class AbstractRegionMap implements 
RegionMap {
     } catch (DiskAccessException dae) {
       owner.handleDiskAccessException(dae);
       throw dae;
-    } finally {
-      releaseTXCacheModificationLock(owner, versionTag);
     }
   }
 
@@ -2353,7 +2350,6 @@ public abstract class AbstractRegionMap implements 
RegionMap {
     if (oqlIndexManager != null) {
       oqlIndexManager.waitForIndexInit();
     }
-    lockForTXCacheModification(owner, versionTag);
     try {
       if (forceNewEntry) {
         boolean opCompleted = false;
@@ -2582,7 +2578,6 @@ public abstract class AbstractRegionMap implements 
RegionMap {
       owner.handleDiskAccessException(dae);
       throw dae;
     } finally {
-      releaseTXCacheModificationLock(owner, versionTag);
       if (oqlIndexManager != null) {
         oqlIndexManager.countDownIndexUpdaters();
       }
@@ -3115,7 +3110,6 @@ public abstract class AbstractRegionMap implements 
RegionMap {
       if (oqlIndexManager != null) {
         oqlIndexManager.waitForIndexInit();
       }
-      lockForTXCacheModification(owner, versionTag);
       try {
         if (hasRemoteOrigin && !isTXHost && !isClientTXOriginator) {
           // If we are not a mirror then only apply the update to existing
@@ -3384,7 +3378,6 @@ public abstract class AbstractRegionMap implements 
RegionMap {
         owner.handleDiskAccessException(dae);
         throw dae;
       } finally {
-        releaseTXCacheModificationLock(owner, versionTag);
         if (oqlIndexManager != null) {
           oqlIndexManager.countDownIndexUpdaters();
         }
@@ -3693,40 +3686,32 @@ public abstract class AbstractRegionMap implements 
RegionMap {
 
   }
 
-  /** get version-generation permission from the region's version vector */
-  private void lockForTXCacheModification(LocalRegion owner, VersionTag tag) {
-
+  @Override
+  public void lockRegionForAtomicTX(LocalRegion r) {
     if (armLockTestHook != null)
-      armLockTestHook.beforeLock(owner, null);
+      armLockTestHook.beforeLock(r, null);
 
-    if (!(tag != null && tag.isFromOtherMember())) {
-      RegionVersionVector vector = owner.getVersionVector();
-      if (vector != null && !owner.hasServerProxy()) {
-        vector.lockForCacheModification();
-      }
+    RegionVersionVector vector = r.getVersionVector();
+    if (vector != null) {
+      vector.lockForCacheModification();
     }
 
     if (armLockTestHook != null)
-      armLockTestHook.afterLock(owner, null);
-
+      armLockTestHook.afterLock(r, null);
   }
 
-  /** release version-generation permission from the region's version vector */
-  private void releaseTXCacheModificationLock(LocalRegion owner, VersionTag 
tag) {
-
+  @Override
+  public void unlockRegionForAtomicTX(LocalRegion r) {
     if (armLockTestHook != null)
-      armLockTestHook.beforeRelease(owner, null);
+      armLockTestHook.beforeRelease(r, null);
 
-    if (!(tag != null && tag.isFromOtherMember())) {
-      RegionVersionVector vector = owner.getVersionVector();
-      if (vector != null && !owner.hasServerProxy()) {
-        vector.releaseCacheModificationLock();
-      }
+    RegionVersionVector vector = r.getVersionVector();
+    if (vector != null) {
+      vector.releaseCacheModificationLock();
     }
 
     if (armLockTestHook != null)
-      armLockTestHook.afterRelease(owner, null);
-
+      armLockTestHook.afterRelease(r, null);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7fa2c08c/geode-core/src/main/java/org/apache/geode/internal/cache/RegionMap.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionMap.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionMap.java
index ee8a84e..7ecabd7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionMap.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionMap.java
@@ -372,6 +372,10 @@ public interface RegionMap extends LRUMapCallbacks {
 
   public void close();
 
+  default void lockRegionForAtomicTX(LocalRegion r) {}
+
+  default void unlockRegionForAtomicTX(LocalRegion r) {}
+
   public ARMLockTestHook getARMLockTestHook();
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7fa2c08c/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
index 99a3b83..d577f39 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
@@ -103,6 +103,7 @@ public class TXState implements TXStateInterface {
   // Internal testing hooks
   private Runnable internalAfterReservation;
   protected Runnable internalAfterConflictCheck;
+  protected Runnable internalDuringApplyChanges;
   protected Runnable internalAfterApplyChanges;
   protected Runnable internalAfterReleaseLocalLocks;
   Runnable internalDuringIndividualSend; // package scope allows 
TXCommitMessage use
@@ -460,34 +461,38 @@ public class TXState implements TXStateInterface {
 
         attachFilterProfileInformation(entries);
 
-        // apply changes to the cache
-        applyChanges(entries);
-        // For internal testing
-        if (this.internalAfterApplyChanges != null) {
-          this.internalAfterApplyChanges.run();
-        }
+        lockTXRegions(regions);
 
-        // build and send the message
-        msg = buildMessage();
-        this.commitMessage = msg;
-        if (this.internalBeforeSend != null) {
-          this.internalBeforeSend.run();
-        }
+        try {
+          // apply changes to the cache
+          applyChanges(entries);
+          // For internal testing
+          if (this.internalAfterApplyChanges != null) {
+            this.internalAfterApplyChanges.run();
+          }
 
+          // build and send the message
+          msg = buildMessage();
+          this.commitMessage = msg;
+          if (this.internalBeforeSend != null) {
+            this.internalBeforeSend.run();
+          }
 
+          msg.send(this.locks.getDistributedLockId());
+          // For internal testing
+          if (this.internalAfterSend != null) {
+            this.internalAfterSend.run();
+          }
 
-        msg.send(this.locks.getDistributedLockId());
-        // For internal testing
-        if (this.internalAfterSend != null) {
-          this.internalAfterSend.run();
+          firePendingCallbacks();
+          /*
+           * This is to prepare the commit message for the caller, make sure 
all events are in
+           * there.
+           */
+          this.commitMessage = buildCompleteMessage();
+        } finally {
+          unlockTXRegions(regions);
         }
-
-        firePendingCallbacks();
-        /*
-         * This is to prepare the commit message for the caller, make sure all 
events are in there.
-         */
-        this.commitMessage = buildCompleteMessage();
-
       } finally {
         if (msg != null) {
           msg.releaseViewVersions();
@@ -503,6 +508,24 @@ public class TXState implements TXStateInterface {
     }
   }
 
+  private void lockTXRegions(IdentityHashMap<LocalRegion, TXRegionState> 
regions) {
+    Iterator<Map.Entry<LocalRegion, TXRegionState>> it = 
regions.entrySet().iterator();
+    while (it.hasNext()) {
+      Map.Entry<LocalRegion, TXRegionState> me = it.next();
+      LocalRegion r = me.getKey();
+      r.getRegionMap().lockRegionForAtomicTX(r);
+    }
+  }
+
+  private void unlockTXRegions(IdentityHashMap<LocalRegion, TXRegionState> 
regions) {
+    Iterator<Map.Entry<LocalRegion, TXRegionState>> it = 
regions.entrySet().iterator();
+    while (it.hasNext()) {
+      Map.Entry<LocalRegion, TXRegionState> me = it.next();
+      LocalRegion r = me.getKey();
+      r.getRegionMap().unlockRegionForAtomicTX(r);
+    }
+  }
+
   protected void attachFilterProfileInformation(List entries) {
     {
       Iterator/* <TXEntryStateWithRegionAndKey> */ it = entries.iterator();
@@ -769,6 +792,9 @@ public class TXState implements TXStateInterface {
       Iterator/* <TXEntryStateWithRegionAndKey> */ it = entries.iterator();
       while (it.hasNext()) {
         TXEntryStateWithRegionAndKey o = (TXEntryStateWithRegionAndKey) 
it.next();
+        if (this.internalDuringApplyChanges != null) {
+          this.internalDuringApplyChanges.run();
+        }
         try {
           o.es.applyChanges(o.r, o.key, this);
         } catch (RegionDestroyedException ex) {
@@ -1073,6 +1099,13 @@ public class TXState implements TXStateInterface {
   }
 
   /**
+   * Add an internal callback which is run as each transaction change is 
applied.
+   */
+  public void setDuringApplyChanges(Runnable duringApplyChanges) {
+    this.internalDuringApplyChanges = duringApplyChanges;
+  }
+
+  /**
    * Add an internal callback which is run after the transaction changes have 
been applied to
    * committed state (locally) but before local locks are released (occurs for 
regions of Local and
    * Distributed No Ack scope).

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7fa2c08c/geode-core/src/test/java/org/apache/geode/internal/cache/ClearTXLockingDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/ClearTXLockingDUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/ClearTXLockingDUnitTest.java
new file mode 100644
index 0000000..b620383
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/ClearTXLockingDUnitTest.java
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+/*
+ * ClearRvvLockingDUnitTest.java
+ *
+ * Created on September 6, 2005, 2:57 PM
+ */
+package org.apache.geode.internal.cache;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheEvent;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.CacheTransactionManager;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.distributed.DistributedMember;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.SerializableCallable;
+import org.apache.geode.test.dunit.SerializableRunnable;
+import org.apache.geode.test.dunit.SerializableRunnableIF;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.logging.log4j.Logger;
+import org.assertj.core.api.JUnitSoftAssertions;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test class to verify proper locking interaction between transactions and 
the CLEAR region
+ * operation.
+ * 
+ * GEODE-1740: It was observed that operations performed within a transaction 
were not holding
+ * region modification locks for the duration of commit processing. This lock 
is used to ensure region
+ * consistency during CLEAR processing.  By not holding the lock for the 
duration of commit processing,
+ * a window was opened that allowed region operations such as clear to occur 
in mid-commit.
+ * 
+ * The fix for GEODE-1740 was to acquire and hold read locks for any region 
involved in the commit.
+ * This forces CLEAR to wait until commit processing is complete.
+ */
+@SuppressWarnings("serial")
+@Category(DistributedTest.class)
+public class ClearTXLockingDUnitTest extends JUnit4CacheTestCase {
+
+  @Rule
+  public transient JUnitSoftAssertions softly = new JUnitSoftAssertions();
+  /*
+   * This test performs operations within a transaction and during commit 
processing
+   * schedules a clear to be performed on the relevant region. The scheduled 
clear should wait until
+   * commit processing is complete before clearing the region. Failure to do 
so, would result in
+   * region inconsistencies.
+   */
+  VM vm0, vm1, opsVM, regionVM;
+
+  static Cache cache;
+  
+  ArmLockHook theArmHook;
+
+  DistributedMember vm0ID, vm1ID;
+
+  static CacheTransactionManager txmgr;
+
+  static final String THE_KEY = "theKey";
+  static final String THE_VALUE = "theValue";
+  static final int NUMBER_OF_PUTS = 2;
+
+  static final String REGION_NAME1 = "testRegion1";
+  static final String REGION_NAME2 = "testRegion2";
+
+  static CountDownLatch opsLatch;
+  static CountDownLatch regionLatch;
+  static CountDownLatch verifyLatch;
+
+  private static final Logger logger = LogService.getLogger();
+
+  // test methods
+
+  @Test
+  public void testPutWithClearSameVM() throws InterruptedException {
+    getVMs();
+    setupRegions(vm0, vm0);
+    setClearHook(REGION_NAME1, opsVM, regionVM);
+    performTestAndCheckResults(putOperationsTest);
+  }
+
+  @Test
+  public void testPutWithClearDifferentVM() throws InterruptedException {
+    getVMs();
+    setupRegions(vm0, vm1);
+    setClearHook(REGION_NAME1, opsVM, regionVM);
+    performTestAndCheckResults(putOperationsTest);
+  }
+  
+  /*
+   * The CLOSE tests are ignored until the close operation has been
+   * updated to acquire a write lock during processing.
+   */
+  @Ignore
+  @Test
+  public void testPutWithCloseSameVM() throws InterruptedException {
+    getVMs();
+    setupRegions(vm0, vm0);
+    setCloseHook(REGION_NAME1, opsVM, regionVM);
+    performTestAndCheckResults(putOperationsTest);
+  }
+
+  @Ignore
+  @Test
+  public void testPutWithCloseDifferentVM() throws InterruptedException {
+    getVMs();
+    setupRegions(vm0, vm1);
+    setCloseHook(REGION_NAME1, opsVM, regionVM);
+    performTestAndCheckResults(putOperationsTest);
+  }
+      
+  /*
+   * The DESTROY_REGION tests are ignored until the destroy operation has been
+   * updated to acquire a write lock during processing.
+   */
+  @Ignore
+  @Test
+  public void testPutWithDestroyRegionSameVM() throws InterruptedException {
+    getVMs();
+    setupRegions(vm0, vm0);
+    setDestroyRegionHook(REGION_NAME1, opsVM, regionVM);
+    performTestAndCheckResults(putOperationsTest);
+  }
+
+  @Ignore
+  @Test
+  public void testPutWithDestroyRegionDifferentVM() throws 
InterruptedException {
+    getVMs();
+    setupRegions(vm0, vm1);
+    setDestroyRegionHook(REGION_NAME1, opsVM, regionVM);
+    performTestAndCheckResults(putOperationsTest);
+  }
+      
+  // Local methods
+
+  /*
+   * This method executes a runnable test and then checks for region 
consistency
+   */
+  private void performTestAndCheckResults(SerializableRunnable operationsTest) 
throws InterruptedException {
+    try {
+      runLockingTest(opsVM, operationsTest);
+      checkForConsistencyErrors(REGION_NAME1);
+      checkForConsistencyErrors(REGION_NAME2);
+    } finally {
+      opsVM.invoke(() -> resetArmHook(REGION_NAME1));
+    }
+  }
+
+  /*
+   * We will be using 2 vms.  One for the transaction and one for the clear
+   */
+  private void getVMs() {
+    Host host = Host.getHost(0);
+    vm0 = host.getVM(0);
+    vm1 = host.getVM(1);
+  }
+  
+  /*
+   * Set which vm will perform the transaction and which will perform the 
region operation
+   * and create the regions on the vms
+   */
+  private void setupRegions(VM opsTarget, VM regionTarget) {
+    opsVM = opsTarget;
+    regionVM = regionTarget;
+    vm0ID = createCache(vm0);
+    vm1ID = createCache(vm1);
+    vm0.invoke(() -> createRegion(REGION_NAME1));
+    vm0.invoke(() -> createRegion(REGION_NAME2));
+    vm1.invoke(() -> createRegion(REGION_NAME1));
+    vm1.invoke(() -> createRegion(REGION_NAME2));
+  }
+
+  /*
+   * Invoke a runnable on the operations vm
+   */
+  private void runLockingTest(VM vm, SerializableRunnableIF theTest) {
+    vm.invoke(theTest);
+  }
+
+  /*
+   * Runnable used to invoke the actual test 
+   */
+  SerializableRunnable putOperationsTest = new SerializableRunnable("perform 
PUT") {
+    @Override
+    public void run() {
+      opsVM.invoke(() -> doPuts(getCache(), regionVM));
+    }
+  };
+
+  /*
+   * Set arm hook to detect when region operation is attempting to acquire 
write lock
+   * and stage the clear that will be released half way through commit 
processing.
+   */
+  public void setClearHook(String rname, VM whereOps, VM whereClear) {
+    whereOps.invoke(() -> setArmHook(rname));
+    whereClear.invokeAsync(() -> stageClear(rname, whereOps));
+  }
+
+  // remote test methods
+
+  /*
+   * Wait to be notified and then execute the clear.
+   * Once the clear completes, notify waiter to perform region verification. 
+   */
+  private static void stageClear(String rname, VM whereOps) throws 
InterruptedException {
+    regionOperationWait();
+    LocalRegion r = (LocalRegion) cache.getRegion(rname);
+    r.clear();
+    whereOps.invoke(() -> releaseVerify());
+  }
+
+  /*
+   * Set and stage method for close and destroy are the same as clear
+   */
+  public void setCloseHook(String rname, VM whereOps, VM whereClear) {
+    whereOps.invoke(() -> setArmHook(rname));
+    whereClear.invokeAsync(() -> stageClose(rname, whereOps));
+  }
+
+  private static void stageClose(String rname, VM whereOps) throws 
InterruptedException {
+    regionOperationWait();
+    LocalRegion r = (LocalRegion) cache.getRegion(rname);
+    r.close();
+    whereOps.invoke(() -> releaseVerify());
+  }
+
+  public void setDestroyRegionHook(String rname, VM whereOps, VM whereClear) {
+    whereOps.invoke(() -> setArmHook(rname));
+    whereClear.invokeAsync(() -> stageDestroyRegion(rname, whereOps));
+  }
+
+  private static void stageDestroyRegion(String rname, VM whereOps) throws 
InterruptedException {
+    regionOperationWait();
+    LocalRegion r = (LocalRegion) cache.getRegion(rname);
+    r.destroyRegion();
+    whereOps.invoke(() -> releaseVerify());
+  }
+
+  /*
+   * Set the abstract region map lock hook to detect 
+   * attempt to acquire write lock by region operation.
+   */
+  public void setArmHook(String rname) {
+    LocalRegion r = (LocalRegion) cache.getRegion(rname);
+    theArmHook = new ArmLockHook();
+    ((AbstractRegionMap) r.entries).setARMLockTestHook(theArmHook);
+  }
+
+  /*
+   * Cleanup arm lock hook by setting it null
+   */
+  public void resetArmHook(String rname) {
+    LocalRegion r = (LocalRegion) cache.getRegion(rname);
+    ((AbstractRegionMap) r.entries).setARMLockTestHook(null);
+  }
+
+  /*
+   * Wait to be notified it is time to perform region operation (i.e. CLEAR)
+   */
+  private static void regionOperationWait() throws InterruptedException {
+    regionLatch = new CountDownLatch(1);
+    regionLatch.await();
+  }
+  
+  /*
+   * A simple transaction that will have a region operation execute during 
commit.
+   * opsLatch is used to wait until region operation has been scheduled during 
commit
+   * and verifyLatch is used to ensure commit and clear processing have both 
completed.
+   */
+  private static void doPuts(Cache cache, VM whereRegion) throws 
InterruptedException {
+    TXManagerImpl txManager = (TXManagerImpl) 
cache.getCacheTransactionManager();
+
+    opsLatch = new CountDownLatch(1);
+    verifyLatch = new CountDownLatch(1);
+
+    txManager.begin();
+    TXStateInterface txState = 
((TXStateProxyImpl)txManager.getTXState()).getRealDeal(null,null);
+    ((TXState)txState).setDuringApplyChanges(new 
CommitTestCallback(whereRegion));
+
+    Region region1 = cache.getRegion(REGION_NAME1);
+    Region region2 = cache.getRegion(REGION_NAME2);
+    for (int i = 0; i < NUMBER_OF_PUTS; i++) {
+      region1.put(REGION_NAME1 + THE_KEY + i, THE_VALUE + i);
+      region2.put(REGION_NAME2 + THE_KEY + i, THE_VALUE + i);
+    }
+
+    txManager.commit();
+    verifyLatch.await();
+  }
+
+  /*
+   * Release the region operation that has been previously staged
+   */
+  private static void releaseRegionOperation(VM whereRegion) {
+    whereRegion.invoke(() -> regionLatch.countDown());
+  }
+
+  /*
+   * Region operation has been scheduled, now resume commit processing
+   */
+  private static void releaseOps() {
+    opsLatch.countDown();
+  }
+
+  /*
+   * Notify waiter it is time to verify region contents
+   */
+  private static void releaseVerify() {
+    verifyLatch.countDown();
+  }
+
+  private InternalDistributedMember createCache(VM vm) {
+    return (InternalDistributedMember) vm.invoke(new 
SerializableCallable<Object>() {
+      public Object call() {
+        cache = getCache(new CacheFactory().set("conserve-sockets", "true"));
+        return getSystem().getDistributedMember();
+      }
+    });
+  }
+
+  private static void createRegion(String rgnName) {
+    RegionFactory<Object, Object> rf = 
cache.createRegionFactory(RegionShortcut.REPLICATE);
+    rf.setConcurrencyChecksEnabled(true);
+    rf.setScope(Scope.DISTRIBUTED_ACK);
+    rf.create(rgnName);
+  }
+
+  /*
+   * Get region contents from each member and verify they are consistent
+   */
+  private void checkForConsistencyErrors(String rname) {
+    Map<Object, Object> r0Contents =
+        (Map<Object, Object>) vm0.invoke(() -> getRegionContents(rname));
+    Map<Object, Object> r1Contents =
+        (Map<Object, Object>) vm1.invoke(() -> getRegionContents(rname));
+
+    for (int i = 0; i < NUMBER_OF_PUTS; i++) {
+      String theKey = rname + THE_KEY + i;
+      if (r0Contents.containsKey(theKey)) {
+        softly.assertThat(r1Contents.get(theKey))
+            .as("region contents are not consistent for key %s", theKey)
+            .isEqualTo(r0Contents.get(theKey));
+      } else {
+        softly.assertThat(r1Contents).as("expected containsKey for %s to 
return false", theKey)
+            .doesNotContainKey(theKey);
+      }
+    }
+  }
+
+  @SuppressWarnings("rawtypes")
+  private static Map<Object, Object> getRegionContents(String rname) {
+    LocalRegion r = (LocalRegion) cache.getRegion(rname);
+    Map<Object, Object> result = new HashMap<>();
+    for (Iterator i = r.entrySet().iterator(); i.hasNext();) {
+      Region.Entry e = (Region.Entry) i.next();
+      result.put(e.getKey(), e.getValue());
+    }
+    return result;
+  }
+  
+  /*
+   * Test callback called for each operation during commit processing.
+   * Half way through commit processing, release the region operation.
+   */
+  static class CommitTestCallback implements Runnable {
+    VM whereRegionOperation;
+    static int callCount;
+    /* entered twice for each put lap since there are 2 regions */
+    static int releasePoint = NUMBER_OF_PUTS;
+    
+    public CommitTestCallback(VM whereRegion) {
+      whereRegionOperation = whereRegion;
+      callCount = 0;
+    }
+    
+    public void run() {
+      callCount++;
+      if(callCount==releasePoint) {
+        releaseRegionOperation(whereRegionOperation);
+        try {opsLatch.await();} catch (InterruptedException e) {}
+      }
+    }
+  }
+  
+  /*
+   * The region operations attempt to acquire the write lock will hang while
+   * commit processing is occurring.  Before this occurs, resume commit 
processing.
+   */
+  public class ArmLockHook extends ARMLockTestHookAdapter {
+    int txCalls = 0;
+    int releasePoint = NUMBER_OF_PUTS / 2;
+    CountDownLatch putLatch = new CountDownLatch(1);
+
+    @Override
+    public void beforeLock(LocalRegion owner, CacheEvent event) {
+      if(event!=null) {
+        if (event.getOperation().isClear() || 
event.getOperation().isRegionDestroy() || event.getOperation().isClose()) {
+          releaseOps();
+        }
+      }
+    }
+  }
+
+}

Reply via email to