This is an automated email from the ASF dual-hosted git repository.
heybales pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 89ee39e GEODE-5797: Move assertion to async invoke (#2635)
89ee39e is described below
commit 89ee39e9ef760bf2817c9b47a11460ef21bd95e0
Author: Helena Bales <[email protected]>
AuthorDate: Mon Oct 29 11:43:58 2018 -0700
GEODE-5797: Move assertion to async invoke (#2635)
GEODE-5797: throw CacheClosed in ColocationHelper
The ColocationHelper runs in the background and wrote a suspect string
to the logs during this test when the cache was closed at the wrong
time, causing the test to fail.
The ColocationHelper would check a static variable to get colocated
regions, and if the cache was closed at the wrong point in this process,
the static region would have already been reset by closing the cache,
and the ColocationHelper would write a suspect string.
To fix this test, and the behavior of the ColocationHelper, we first
check if the cache is in the process of being cancelled. If it is, a
CacheClosedException is thrown instead of an IllegalStateException.
CacheClosedExceptions are ignored.
* add unit test for changes to ColocationHelper. Tests that a
CacheClosedException is thrown from getColocatedRegion when the cache is
closed.
* remove wait for regions to be ready as it was never the correct fix
for this bug.
* create the senders before the regions, since that is the correct order
Signed-off-by: Dan Smith <[email protected]>
---
.../geode/internal/cache/ColocationHelper.java | 2 +
.../geode/internal/cache/ColocationHelperTest.java | 23 +++++++
.../ConcurrentParallelGatewaySenderDUnitTest.java | 74 ++++++++++++++--------
3 files changed, 74 insertions(+), 25 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/ColocationHelper.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/ColocationHelper.java
index 7001478..a9865f7 100755
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/ColocationHelper.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/ColocationHelper.java
@@ -77,6 +77,7 @@ public class ColocationHelper {
PartitionRegionConfig prConf =
(PartitionRegionConfig) prRoot.get(getRegionIdentifier(colocatedWith));
if (prConf == null) {
+
partitionedRegion.getCache().getCancelCriterion().checkCancelInProgress(null);
throw new IllegalStateException(
String.format(
"Region specified in 'colocated-with' (%s) for region %s does
not exist. It should be created before setting 'colocated-with' attribute for
this region.",
@@ -89,6 +90,7 @@ public class ColocationHelper {
if (colocatedPR != null) {
colocatedPR.waitOnBucketMetadataInitialization();
} else {
+
partitionedRegion.getCache().getCancelCriterion().checkCancelInProgress(null);
throw new IllegalStateException(
String.format(
"Region specified in 'colocated-with' (%s) for region %s does
not exist. It should be created before setting 'colocated-with' attribute for
this region.",
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/ColocationHelperTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/ColocationHelperTest.java
index fc7e7bb..ed2351c 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/ColocationHelperTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/ColocationHelperTest.java
@@ -15,8 +15,11 @@
package org.apache.geode.internal.cache;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -29,6 +32,8 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.PartitionAttributes;
import org.apache.geode.cache.Region;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
@@ -126,4 +131,22 @@ public class ColocationHelperTest {
}
assertTrue(caughtIllegalStateException);
}
+
+ @Test
+ public void
testGetColocatedRegionThrowsCacheClosedExceptionWhenCacheIsClosed() {
+ when(pr.getCache()).thenReturn(cache);
+ DistributedRegion prRoot = mock(DistributedRegion.class);
+ when(cache.getRegion(PartitionedRegionHelper.PR_ROOT_REGION_NAME, true))
+ .thenReturn(prRoot);
+ when(pr.getPartitionAttributes()).thenReturn(pa);
+ when(pa.getColocatedWith()).thenReturn("region2");
+ PartitionRegionConfig partitionRegionConfig =
mock(PartitionRegionConfig.class);
+ when(prRoot.get(any())).thenReturn(partitionRegionConfig);
+ CancelCriterion cancelCriterion = mock(CancelCriterion.class);
+ when(cache.getCancelCriterion()).thenReturn(cancelCriterion);
+
doThrow(CacheClosedException.class).when(cancelCriterion).checkCancelInProgress(any());
+
+ assertThatThrownBy(() -> ColocationHelper.getColocatedRegion(pr))
+ .isInstanceOf(CacheClosedException.class);
+ }
}
diff --git
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderDUnitTest.java
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderDUnitTest.java
index 433d6e6..13ec152 100644
---
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderDUnitTest.java
+++
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderDUnitTest.java
@@ -558,32 +558,48 @@ public class ConcurrentParallelGatewaySenderDUnitTest
extends WANTestBase {
}
+ /**
+ * Data is sent to all members containing the Partitioned Region when
senders are killed.
+ *
+ * Setup:
+ * vm0 - locator with DSID=1 - on lnPort
+ * vm1 - remote locator with DSID=2 - on nyPort - connected to lnPort
+ * vm2-3 - server with cache connected to nyPort - has partitionedRegion w/o
sender - has receiver
+ * vm4-7 - server with cache connected to lnPort - has partitionedRegion w/
ln sender - has sender
+ *
+ * Test:
+ * 1. Put 5000 entries to VM7 (async)
+ * 2. Assert that VM2 received at least 10 entries from VM7 through the
sender (sync)
+ * 3. Kill sender on VM4 (async)
+ * 4. Get number of entries from VM2 (sync)
+ * 5. Put 10000 entries to VM6, with the first 5000 entries having the same
keys as in the first
+ * round of puts (async)
+ * 6. Assert that VM2 has twenty more entries than it did before puts on VM6
were started (sync)
+ * 7. Kill sender on VM5 (async)
+ * 8. Wait for previously started async invocations to complete
+ * 9. Assert that VM2, VM3, VM6, and VM7 all have 10000 entries
+ * 10. Assert that the sender queues on VM6 and VM7 are empty
+ *
+ **/
@Test
public void testPartitionedParallelPropagationHA() throws Exception {
IgnoredException.addIgnoredException(SocketException.class.getName()); //
for Connection reset
- Integer lnPort = (Integer) vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
- Integer nyPort = (Integer) vm1.invoke(() ->
WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+ Integer lnPort = vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,
lnPort));
+ String regionName = getTestMethodName() + "_PR";
createCacheInVMs(nyPort, vm2, vm3);
- vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() +
"_PR", null, 1, 100,
+ vm2.invoke(() -> WANTestBase.createPartitionedRegion(regionName, null, 1,
100,
isOffHeap()));
- vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() +
"_PR", null, 1, 100,
+ vm3.invoke(() -> WANTestBase.createPartitionedRegion(regionName, null, 1,
100,
isOffHeap()));
createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
- vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() +
"_PR", "ln", 2, 100,
- isOffHeap()));
- vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() +
"_PR", "ln", 2, 100,
- isOffHeap()));
- vm6.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() +
"_PR", "ln", 2, 100,
- isOffHeap()));
- vm7.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() +
"_PR", "ln", 2, 100,
- isOffHeap()));
-
vm4.invoke(() -> WANTestBase.createConcurrentSender("ln", 2, true, 100,
10, false, false, null,
true, 6, OrderPolicy.KEY));
vm5.invoke(() -> WANTestBase.createConcurrentSender("ln", 2, true, 100,
10, false, false, null,
@@ -593,9 +609,18 @@ public class ConcurrentParallelGatewaySenderDUnitTest
extends WANTestBase {
vm7.invoke(() -> WANTestBase.createConcurrentSender("ln", 2, true, 100,
10, false, false, null,
true, 6, OrderPolicy.KEY));
+ vm4.invoke(() -> WANTestBase.createPartitionedRegion(regionName, "ln", 2,
100,
+ isOffHeap()));
+ vm5.invoke(() -> WANTestBase.createPartitionedRegion(regionName, "ln", 2,
100,
+ isOffHeap()));
+ vm6.invoke(() -> WANTestBase.createPartitionedRegion(regionName, "ln", 2,
100,
+ isOffHeap()));
+ vm7.invoke(() -> WANTestBase.createPartitionedRegion(regionName, "ln", 2,
100,
+ isOffHeap()));
+
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
- AsyncInvocation inv1 =
+ AsyncInvocation putsToVM7 =
vm7.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR",
5000));
vm2.invoke(() -> await()
@@ -603,24 +628,23 @@ public class ConcurrentParallelGatewaySenderDUnitTest
extends WANTestBase {
"Failure in waiting for at least 10 events to be received by the
receiver", true,
(getRegionSize(getTestMethodName() + "_PR") > 10))));
- AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender());
+ AsyncInvocation killsSenderFromVM4 = vm4.invokeAsync(() ->
WANTestBase.killSender());
int prevRegionSize = vm2.invoke(() ->
WANTestBase.getRegionSize(getTestMethodName() + "_PR"));
- AsyncInvocation inv3 =
+ AsyncInvocation putsToVM6 =
vm6.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR",
10000));
- vm2.invoke(() -> await()
- .untilAsserted(() -> assertEquals(
- "Failure in waiting for additional 20 events to be received by the
receiver ", true,
- getRegionSize(getTestMethodName() + "_PR") > 20 +
prevRegionSize)));
+ vm2.invoke(() -> await().untilAsserted(() -> assertEquals(
+ "Failure in waiting for additional 20 events to be received by the
receiver ", true,
+ getRegionSize(getTestMethodName() + "_PR") > 20 + prevRegionSize)));
- AsyncInvocation inv4 = vm5.invokeAsync(() -> WANTestBase.killSender());
+ AsyncInvocation killsSenderFromVM5 = vm5.invokeAsync(() ->
WANTestBase.killSender());
- inv1.join();
- inv2.join();
- inv3.join();
- inv4.join();
+ putsToVM7.get();
+ killsSenderFromVM4.get();
+ putsToVM6.get();
+ killsSenderFromVM5.get();
vm6.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() +
"_PR", 10000));
vm7.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() +
"_PR", 10000));