This is an automated email from the ASF dual-hosted git repository.

heybales pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 89ee39e  GEODE-5797: Move assertion to async invoke (#2635)
89ee39e is described below

commit 89ee39e9ef760bf2817c9b47a11460ef21bd95e0
Author: Helena Bales <[email protected]>
AuthorDate: Mon Oct 29 11:43:58 2018 -0700

    GEODE-5797: Move assertion to async invoke (#2635)
    
    GEODE-5797: throw CacheClosed in ColocationHelper
    
    The ColocationHelper runs in the background and wrote a suspect string
    to the logs during this test when the cache was closed at the wrong
    time, causing the test to fail.
    
    The ColocationHelper would check a static variable to get colocated
    regions, and if the cache was closed at the wrong point in this process,
    the static region would have already been reset by closing the cache,
    and the ColocationHelper would write a suspect string.
    
    To fix this test, and the behavior of the ColocationHelper, we first
    check if the cache is in the process of being cancelled. If it is, a
    CacheClosedException is thrown instead of an IllegalStateException.
    CacheClosedExceptions are ignored.
    
    * add unit test for changes to ColocationHelper. Tests that a
    CacheClosedException is thrown from getColocatedRegion when the cache is
    closed.
    * remove wait for regions to be ready as it was never the correct fix
    for this bug.
    * create the senders before the regions, since that is the correct order
    
    Signed-off-by: Dan Smith <[email protected]>
---
 .../geode/internal/cache/ColocationHelper.java     |  2 +
 .../geode/internal/cache/ColocationHelperTest.java | 23 +++++++
 .../ConcurrentParallelGatewaySenderDUnitTest.java  | 74 ++++++++++++++--------
 3 files changed, 74 insertions(+), 25 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/ColocationHelper.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/ColocationHelper.java
index 7001478..a9865f7 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/ColocationHelper.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/ColocationHelper.java
@@ -77,6 +77,7 @@ public class ColocationHelper {
     PartitionRegionConfig prConf =
         (PartitionRegionConfig) prRoot.get(getRegionIdentifier(colocatedWith));
     if (prConf == null) {
+      
partitionedRegion.getCache().getCancelCriterion().checkCancelInProgress(null);
       throw new IllegalStateException(
           String.format(
               "Region specified in 'colocated-with' (%s) for region %s does 
not exist. It should be created before setting 'colocated-with' attribute for 
this region.",
@@ -89,6 +90,7 @@ public class ColocationHelper {
       if (colocatedPR != null) {
         colocatedPR.waitOnBucketMetadataInitialization();
       } else {
+        
partitionedRegion.getCache().getCancelCriterion().checkCancelInProgress(null);
         throw new IllegalStateException(
             String.format(
                 "Region specified in 'colocated-with' (%s) for region %s does 
not exist. It should be created before setting 'colocated-with' attribute for 
this region.",
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/ColocationHelperTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/ColocationHelperTest.java
index fc7e7bb..ed2351c 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/ColocationHelperTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/ColocationHelperTest.java
@@ -15,8 +15,11 @@
 
 package org.apache.geode.internal.cache;
 
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -29,6 +32,8 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.CacheClosedException;
 import org.apache.geode.cache.PartitionAttributes;
 import org.apache.geode.cache.Region;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
@@ -126,4 +131,22 @@ public class ColocationHelperTest {
     }
     assertTrue(caughtIllegalStateException);
   }
+
+  @Test
+  public void 
testGetColocatedRegionThrowsCacheClosedExceptionWhenCacheIsClosed() {
+    when(pr.getCache()).thenReturn(cache);
+    DistributedRegion prRoot = mock(DistributedRegion.class);
+    when(cache.getRegion(PartitionedRegionHelper.PR_ROOT_REGION_NAME, true))
+        .thenReturn(prRoot);
+    when(pr.getPartitionAttributes()).thenReturn(pa);
+    when(pa.getColocatedWith()).thenReturn("region2");
+    PartitionRegionConfig partitionRegionConfig = 
mock(PartitionRegionConfig.class);
+    when(prRoot.get(any())).thenReturn(partitionRegionConfig);
+    CancelCriterion cancelCriterion = mock(CancelCriterion.class);
+    when(cache.getCancelCriterion()).thenReturn(cancelCriterion);
+    
doThrow(CacheClosedException.class).when(cancelCriterion).checkCancelInProgress(any());
+
+    assertThatThrownBy(() -> ColocationHelper.getColocatedRegion(pr))
+        .isInstanceOf(CacheClosedException.class);
+  }
 }
diff --git 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderDUnitTest.java
 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderDUnitTest.java
index 433d6e6..13ec152 100644
--- 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderDUnitTest.java
+++ 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderDUnitTest.java
@@ -558,32 +558,48 @@ public class ConcurrentParallelGatewaySenderDUnitTest 
extends WANTestBase {
   }
 
 
+  /**
+   * Data is sent to all members containing the Partitioned Region when 
senders are killed.
+   *
+   * Setup:
+   * vm0 - locator with DSID=1 - on lnPort
+   * vm1 - remote locator with DSID=2 - on nyPort - connected to lnPort
+   * vm2-3 - server with cache connected to nyPort - has partitionedRegion w/o 
sender - has receiver
+   * vm4-7 - server with cache connected to lnPort - has partitionedRegion w/ 
ln sender - has sender
+   *
+   * Test:
+   * 1. Put 5000 entries to VM7 (async)
+   * 2. Assert that VM2 received at least 10 entries from VM7 through the 
sender (sync)
+   * 3. Kill sender on VM4 (async)
+   * 4. Get number of entries from VM2 (sync)
+   * 5. Put 10000 entries to VM6, with the first 5000 entries having the same 
keys as in the first
+   * round of puts (async)
+   * 6. Assert that VM2 has twenty more entries than it did before puts on VM6 
were started (sync)
+   * 7. Kill sender on VM5 (async)
+   * 8. Wait for previously started async invocations to complete
+   * 9. Assert that VM2, VM3, VM6, and VM7 all have 10000 entries
+   * 10. Assert that the sender queues on VM6 and VM7 are empty
+   *
+   **/
   @Test
   public void testPartitionedParallelPropagationHA() throws Exception {
     IgnoredException.addIgnoredException(SocketException.class.getName()); // 
for Connection reset
-    Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
-    Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
+    String regionName = getTestMethodName() + "_PR";
 
     createCacheInVMs(nyPort, vm2, vm3);
 
-    vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + 
"_PR", null, 1, 100,
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(regionName, null, 1, 
100,
         isOffHeap()));
-    vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + 
"_PR", null, 1, 100,
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(regionName, null, 1, 
100,
         isOffHeap()));
 
     createReceiverInVMs(vm2, vm3);
 
     createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
 
-    vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + 
"_PR", "ln", 2, 100,
-        isOffHeap()));
-    vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + 
"_PR", "ln", 2, 100,
-        isOffHeap()));
-    vm6.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + 
"_PR", "ln", 2, 100,
-        isOffHeap()));
-    vm7.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + 
"_PR", "ln", 2, 100,
-        isOffHeap()));
-
     vm4.invoke(() -> WANTestBase.createConcurrentSender("ln", 2, true, 100, 
10, false, false, null,
         true, 6, OrderPolicy.KEY));
     vm5.invoke(() -> WANTestBase.createConcurrentSender("ln", 2, true, 100, 
10, false, false, null,
@@ -593,9 +609,18 @@ public class ConcurrentParallelGatewaySenderDUnitTest 
extends WANTestBase {
     vm7.invoke(() -> WANTestBase.createConcurrentSender("ln", 2, true, 100, 
10, false, false, null,
         true, 6, OrderPolicy.KEY));
 
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(regionName, "ln", 2, 
100,
+        isOffHeap()));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(regionName, "ln", 2, 
100,
+        isOffHeap()));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(regionName, "ln", 2, 
100,
+        isOffHeap()));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(regionName, "ln", 2, 
100,
+        isOffHeap()));
+
     startSenderInVMs("ln", vm4, vm5, vm6, vm7);
 
-    AsyncInvocation inv1 =
+    AsyncInvocation putsToVM7 =
         vm7.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 
5000));
 
     vm2.invoke(() -> await()
@@ -603,24 +628,23 @@ public class ConcurrentParallelGatewaySenderDUnitTest 
extends WANTestBase {
             "Failure in waiting for at least 10 events to be received by the 
receiver", true,
             (getRegionSize(getTestMethodName() + "_PR") > 10))));
 
-    AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender());
+    AsyncInvocation killsSenderFromVM4 = vm4.invokeAsync(() -> 
WANTestBase.killSender());
 
     int prevRegionSize = vm2.invoke(() -> 
WANTestBase.getRegionSize(getTestMethodName() + "_PR"));
 
-    AsyncInvocation inv3 =
+    AsyncInvocation putsToVM6 =
         vm6.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 
10000));
 
-    vm2.invoke(() -> await()
-        .untilAsserted(() -> assertEquals(
-            "Failure in waiting for additional 20 events to be received by the 
receiver ", true,
-            getRegionSize(getTestMethodName() + "_PR") > 20 + 
prevRegionSize)));
+    vm2.invoke(() -> await().untilAsserted(() -> assertEquals(
+        "Failure in waiting for additional 20 events to be received by the 
receiver ", true,
+        getRegionSize(getTestMethodName() + "_PR") > 20 + prevRegionSize)));
 
-    AsyncInvocation inv4 = vm5.invokeAsync(() -> WANTestBase.killSender());
+    AsyncInvocation killsSenderFromVM5 = vm5.invokeAsync(() -> 
WANTestBase.killSender());
 
-    inv1.join();
-    inv2.join();
-    inv3.join();
-    inv4.join();
+    putsToVM7.get();
+    killsSenderFromVM4.get();
+    putsToVM6.get();
+    killsSenderFromVM5.get();
 
     vm6.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + 
"_PR", 10000));
     vm7.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + 
"_PR", 10000));

Reply via email to