Repository: geode Updated Branches: refs/heads/feature/close_cache_aeq_enqueue 070494d37 -> 4c02fe35c
Add dunit test for cache close while enqueuing event in AEQ. Rethrow CacheClosedException encountered while enqueuing event. Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/f84159ab Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/f84159ab Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/f84159ab Branch: refs/heads/feature/close_cache_aeq_enqueue Commit: f84159ab3b7a59d423ff6afc0f456a98ab30c235 Parents: 070494d Author: Lynn Hughes-Godfrey <[email protected]> Authored: Wed May 24 13:43:25 2017 -0700 Committer: Lynn Hughes-Godfrey <[email protected]> Committed: Wed May 24 13:43:25 2017 -0700 ---------------------------------------------------------------------- .../cache/wan/AbstractGatewaySender.java | 1 + .../asyncqueue/AsyncEventListenerDUnitTest.java | 120 ++++++++++++------- 2 files changed, 79 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/f84159ab/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java index 7ed9b51..c38d547 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java @@ -973,6 +973,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi ev.enqueueEvent(operation, clonedEvent, substituteValue); } catch (CancelException e) { logger.debug("caught cancel exception", e); + throw e; } catch (RegionDestroyedException e) { logger.warn(LocalizedMessage.create( LocalizedStrings.GatewayImpl_0_AN_EXCEPTION_OCCURRED_WHILE_QUEUEING_1_TO_PERFORM_OPERATION_2_FOR_3, http://git-wip-us.apache.org/repos/asf/geode/blob/f84159ab/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java index ac89b48..74aa776 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java @@ -41,6 +41,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.apache.geode.cache.CacheFactory; +import org.apache.geode.cache.DataPolicy; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionDestroyedException; import org.apache.geode.cache.asyncqueue.AsyncEvent; @@ -49,11 +50,14 @@ import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory; import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl; import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl; import org.apache.geode.cache.partition.PartitionRegionHelper; +import org.apache.geode.cache.persistence.PartitionOfflineException; import org.apache.geode.cache.wan.GatewaySender.OrderPolicy; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.internal.AvailablePortHelper; +import org.apache.geode.internal.cache.ForceReattemptException; import org.apache.geode.internal.cache.wan.AsyncEventQueueTestBase; +import org.apache.geode.test.dunit.IgnoredException; import org.apache.geode.test.dunit.LogWriterUtils; import org.apache.geode.test.dunit.SerializableRunnableIF; import org.apache.geode.test.dunit.VM; @@ -1688,51 +1692,43 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { vm1.invoke(createCacheRunnable(lnPort)); vm2.invoke(createCacheRunnable(lnPort)); + vm3.invoke(createCacheRunnable(lnPort)); final DistributedMember member1 = vm1.invoke(() -> cache.getDistributedSystem().getDistributedMember()); - vm1.invoke(() -> { - cache.createAsyncEventQueueFactory().addGatewayEventFilter(new GatewayEventFilter() { - @Override - public boolean beforeEnqueue(final GatewayQueueEvent event) { -// if (event.getOperation().isDestroy()) { - if (event.getOperation().isRemoveAll()) { - new Thread(() -> cache.close()).start(); - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - //ignore - } - throw new CacheClosedException(); - } - return true; - }; - - @Override - public boolean beforeTransmit(final GatewayQueueEvent event) { - return false; - } - - @Override - public void afterAcknowledgement(final GatewayQueueEvent event) { - - } - - @Override - public void close() { - - } - }).create("ln", new MyAsyncEventListener()); - }); + vm1.invoke(() -> addAEQWithCacheCloseFilter()); + vm2.invoke(() -> addAEQWithCacheCloseFilter()); - vm1.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( - getTestMethodName() + "_PR", "ln", isOffHeap())); + //vm1.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( + //getTestMethodName() + "_PR", "ln", isOffHeap())); + + vm1.invoke(() -> { + AttributesFactory fact = new AttributesFactory(); + PartitionAttributesFactory pfact = new PartitionAttributesFactory(); + pfact.setTotalNumBuckets(16); + fact.setPartitionAttributes(pfact.create()); + fact.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); + fact.setOffHeap(isOffHeap()); + Region r = cache.createRegionFactory(fact.create()).addAsyncEventQueueId("ln") + .create(getTestMethodName() + "_PR"); + }); vm2.invoke(() -> { AttributesFactory fact = new AttributesFactory(); PartitionAttributesFactory pfact = new PartitionAttributesFactory(); pfact.setTotalNumBuckets(16); + fact.setPartitionAttributes(pfact.create()); + fact.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); + fact.setOffHeap(isOffHeap()); + Region r = cache.createRegionFactory(fact.create()).addAsyncEventQueueId("ln") + .create(getTestMethodName() + "_PR"); + }); + vm3.invoke(() -> { + AttributesFactory fact = new AttributesFactory(); + + PartitionAttributesFactory pfact = new PartitionAttributesFactory(); + pfact.setTotalNumBuckets(16); pfact.setLocalMaxMemory(0); fact.setPartitionAttributes(pfact.create()); fact.setOffHeap(isOffHeap()); @@ -1741,20 +1737,27 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { }); - vm1.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln")); -// vm2.invoke(() -> AsyncEventQueueTestBase.doPuts(getTestMethodName() + "_PR", 3)); - vm2.invoke(() -> { + vm3.invoke(() -> { Region r = cache.getRegion(Region.SEPARATOR + getTestMethodName() + "_PR"); r.put(1, 1); r.put(2, 2); - r.removeAll(Collections.singleton(1)); - r.remove(1); + //This will trigger the gateway event filter to close the cache + try { + r.removeAll(Collections.singleton(1)); + fail("Should have received a partition offline exception"); + } catch(PartitionOfflineException expected) { + + } }); vm1.invoke(() -> Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> cache.isClosed())); -// vm1.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln")); } +private void addAEQWithCacheCloseFilter() { + cache.createAsyncEventQueueFactory().addGatewayEventFilter(new CloseCacheGatewayFilter()).setPersistent(true).setParallel(true) + .create("ln", new MyAsyncEventListener()); +} + private static Set<Object> getKeysSeen(VM vm, String asyncEventQueueId) { return vm.invoke(() -> { final BucketMovingAsyncEventListener listener = @@ -1771,7 +1774,40 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { }); } - private static class BucketMovingAsyncEventListener implements AsyncEventListener { + private final class CloseCacheGatewayFilter implements GatewayEventFilter { + @Override + public boolean beforeEnqueue(final GatewayQueueEvent event) { + // if (event.getOperation().isDestroy()) { + if (event.getOperation().isRemoveAll()) { + System.out.println("cacheCloseFilter: isRemoveAll = true"); + new Thread(() -> cache.close()).start(); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // ignore + } + throw new CacheClosedException(); + } + return true; + } + + @Override + public boolean beforeTransmit(final GatewayQueueEvent event) { + return false; + } + + @Override + public void afterAcknowledgement(final GatewayQueueEvent event) { + + } + + @Override + public void close() { + + } +} + +private static class BucketMovingAsyncEventListener implements AsyncEventListener { private final DistributedMember destination; private boolean moved; private Set<Object> keysSeen = new HashSet<Object>();
