Repository: geode Updated Branches: refs/heads/develop c1ab3ffec -> 0fe0a1061
GEODE-2993: Rethrow CacheClosedException from AbstractGatewaySender.distribute() - rethrow CacheClosedException - Add test for cache close while enqueuing event in AEQ. - Add cleanup of disk dirs created by test. Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/0fe0a106 Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/0fe0a106 Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/0fe0a106 Branch: refs/heads/develop Commit: 0fe0a1061065f07d4b734d7055f56ad1635f1a2a Parents: c1ab3ff Author: Lynn Hughes-Godfrey <lhughesgodf...@pivotal.io> Authored: Thu May 25 15:31:16 2017 -0700 Committer: Lynn Hughes-Godfrey <lhughesgodf...@pivotal.io> Committed: Thu May 25 15:31:16 2017 -0700 ---------------------------------------------------------------------- .../cache/wan/AbstractGatewaySender.java | 1 + .../cache/wan/AsyncEventQueueTestBase.java | 2 + .../asyncqueue/AsyncEventListenerDUnitTest.java | 102 +++++++++++++++++++ 3 files changed, 105 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/0fe0a106/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java index 7ed9b51..c38d547 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java @@ -973,6 +973,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi ev.enqueueEvent(operation, clonedEvent, substituteValue); } catch (CancelException e) { logger.debug("caught cancel exception", e); + throw e; } catch (RegionDestroyedException e) { logger.warn(LocalizedMessage.create( LocalizedStrings.GatewayImpl_0_AN_EXCEPTION_OCCURRED_WHILE_QUEUEING_1_TO_PERFORM_OPERATION_2_FOR_3, http://git-wip-us.apache.org/repos/asf/geode/blob/0fe0a106/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java index 6fe7ee9..dc7a218 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java @@ -98,6 +98,7 @@ import org.apache.geode.test.dunit.LogWriterUtils; import org.apache.geode.test.dunit.VM; import org.apache.geode.test.dunit.Wait; import org.apache.geode.test.dunit.WaitCriterion; +import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase; import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase; import org.apache.geode.test.junit.categories.DistributedTest; @@ -1555,6 +1556,7 @@ public class AsyncEventQueueTestBase extends JUnit4DistributedTestCase { public static void cleanupVM() throws IOException { closeCache(); + JUnit4CacheTestCase.cleanDiskDirs(); } public static void closeCache() throws IOException { http://git-wip-us.apache.org/repos/asf/geode/blob/0fe0a106/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java index 3dd0550..795af36 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java @@ -19,6 +19,7 @@ import static org.junit.Assert.*; import static org.mockito.Matchers.any; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -29,11 +30,18 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.LongStream; +import org.apache.geode.cache.AttributesFactory; +import org.apache.geode.cache.CacheClosedException; +import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.wan.GatewayEventFilter; +import org.apache.geode.cache.wan.GatewayQueueEvent; +import org.apache.geode.internal.cache.wan.MyAsyncEventListener; import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; import org.apache.geode.cache.CacheFactory; +import org.apache.geode.cache.DataPolicy; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionDestroyedException; import org.apache.geode.cache.asyncqueue.AsyncEvent; @@ -42,11 +50,14 @@ import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory; import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl; import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl; import org.apache.geode.cache.partition.PartitionRegionHelper; +import org.apache.geode.cache.persistence.PartitionOfflineException; import org.apache.geode.cache.wan.GatewaySender.OrderPolicy; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.internal.AvailablePortHelper; +import org.apache.geode.internal.cache.ForceReattemptException; import org.apache.geode.internal.cache.wan.AsyncEventQueueTestBase; +import org.apache.geode.test.dunit.IgnoredException; import org.apache.geode.test.dunit.LogWriterUtils; import org.apache.geode.test.dunit.SerializableRunnableIF; import org.apache.geode.test.dunit.VM; @@ -1674,6 +1685,66 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { Awaitility.waitAtMost(10000, TimeUnit.MILLISECONDS).until(() -> getBucketMoved(vm2, "ln")); } + @Test + public void testCacheClosedBeforeAEQWrite() { + Integer lnPort = + (Integer) vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId(1)); + + vm1.invoke(createCacheRunnable(lnPort)); + vm2.invoke(createCacheRunnable(lnPort)); + vm3.invoke(createCacheRunnable(lnPort)); + final DistributedMember member1 = + vm1.invoke(() -> cache.getDistributedSystem().getDistributedMember()); + + vm1.invoke(() -> addAEQWithCacheCloseFilter()); + vm2.invoke(() -> addAEQWithCacheCloseFilter()); + + vm1.invoke(() -> createPersistentPartitionRegion()); + vm2.invoke(() -> createPersistentPartitionRegion()); + vm3.invoke(() -> { + AttributesFactory fact = new AttributesFactory(); + + PartitionAttributesFactory pfact = new PartitionAttributesFactory(); + pfact.setTotalNumBuckets(16); + pfact.setLocalMaxMemory(0); + fact.setPartitionAttributes(pfact.create()); + fact.setOffHeap(isOffHeap()); + Region r = cache.createRegionFactory(fact.create()).addAsyncEventQueueId("ln") + .create(getTestMethodName() + "_PR"); + + }); + + vm3.invoke(() -> { + Region r = cache.getRegion(Region.SEPARATOR + getTestMethodName() + "_PR"); + r.put(1, 1); + r.put(2, 2); + // This will trigger the gateway event filter to close the cache + try { + r.removeAll(Collections.singleton(1)); + fail("Should have received a partition offline exception"); + } catch (PartitionOfflineException expected) { + + } + }); + } + + private void createPersistentPartitionRegion() { + AttributesFactory fact = new AttributesFactory(); + + PartitionAttributesFactory pfact = new PartitionAttributesFactory(); + pfact.setTotalNumBuckets(16); + fact.setPartitionAttributes(pfact.create()); + fact.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); + fact.setOffHeap(isOffHeap()); + Region r = cache.createRegionFactory(fact.create()).addAsyncEventQueueId("ln") + .create(getTestMethodName() + "_PR"); + } + + private void addAEQWithCacheCloseFilter() { + cache.createAsyncEventQueueFactory().addGatewayEventFilter(new CloseCacheGatewayFilter()) + .setPersistent(true).setParallel(true).create("ln", new MyAsyncEventListener()); + } + private static Set<Object> getKeysSeen(VM vm, String asyncEventQueueId) { return vm.invoke(() -> { final BucketMovingAsyncEventListener listener = @@ -1690,6 +1761,37 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { }); } + private final class CloseCacheGatewayFilter implements GatewayEventFilter { + @Override + public boolean beforeEnqueue(final GatewayQueueEvent event) { + if (event.getOperation().isRemoveAll()) { + new Thread(() -> cache.close()).start(); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // ignore + } + throw new CacheClosedException(); + } + return true; + } + + @Override + public boolean beforeTransmit(final GatewayQueueEvent event) { + return false; + } + + @Override + public void afterAcknowledgement(final GatewayQueueEvent event) { + + } + + @Override + public void close() { + + } + } + private static class BucketMovingAsyncEventListener implements AsyncEventListener { private final DistributedMember destination; private boolean moved;