Repository: geode
Updated Branches:
  refs/heads/develop c1ab3ffec -> 0fe0a1061


GEODE-2993: Rethrow CacheClosedException from AbstractGatewaySender.distribute()

- rethrow CacheClosedException
- Add test for cache close while enqueuing event in AEQ.
- Add cleanup of disk dirs created by test.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/0fe0a106
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/0fe0a106
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/0fe0a106

Branch: refs/heads/develop
Commit: 0fe0a1061065f07d4b734d7055f56ad1635f1a2a
Parents: c1ab3ff
Author: Lynn Hughes-Godfrey <lhughesgodf...@pivotal.io>
Authored: Thu May 25 15:31:16 2017 -0700
Committer: Lynn Hughes-Godfrey <lhughesgodf...@pivotal.io>
Committed: Thu May 25 15:31:16 2017 -0700

----------------------------------------------------------------------
 .../cache/wan/AbstractGatewaySender.java        |   1 +
 .../cache/wan/AsyncEventQueueTestBase.java      |   2 +
 .../asyncqueue/AsyncEventListenerDUnitTest.java | 102 +++++++++++++++++++
 3 files changed, 105 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/0fe0a106/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index 7ed9b51..c38d547 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -973,6 +973,7 @@ public abstract class AbstractGatewaySender implements 
GatewaySender, Distributi
           ev.enqueueEvent(operation, clonedEvent, substituteValue);
         } catch (CancelException e) {
           logger.debug("caught cancel exception", e);
+          throw e;
         } catch (RegionDestroyedException e) {
           logger.warn(LocalizedMessage.create(
               
LocalizedStrings.GatewayImpl_0_AN_EXCEPTION_OCCURRED_WHILE_QUEUEING_1_TO_PERFORM_OPERATION_2_FOR_3,

http://git-wip-us.apache.org/repos/asf/geode/blob/0fe0a106/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
index 6fe7ee9..dc7a218 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
@@ -98,6 +98,7 @@ import org.apache.geode.test.dunit.LogWriterUtils;
 import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.dunit.Wait;
 import org.apache.geode.test.dunit.WaitCriterion;
+import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
 import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
 import org.apache.geode.test.junit.categories.DistributedTest;
 
@@ -1555,6 +1556,7 @@ public class AsyncEventQueueTestBase extends 
JUnit4DistributedTestCase {
 
   public static void cleanupVM() throws IOException {
     closeCache();
+    JUnit4CacheTestCase.cleanDiskDirs();
   }
 
   public static void closeCache() throws IOException {

http://git-wip-us.apache.org/repos/asf/geode/blob/0fe0a106/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
index 3dd0550..795af36 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
@@ -19,6 +19,7 @@ import static org.junit.Assert.*;
 import static org.mockito.Matchers.any;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -29,11 +30,18 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.LongStream;
 
+import org.apache.geode.cache.AttributesFactory;
+import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.wan.GatewayEventFilter;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
+import org.apache.geode.internal.cache.wan.MyAsyncEventListener;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.DataPolicy;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.cache.asyncqueue.AsyncEvent;
@@ -42,11 +50,14 @@ import 
org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory;
 import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl;
 import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
 import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.cache.persistence.PartitionOfflineException;
 import org.apache.geode.cache.wan.GatewaySender.OrderPolicy;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.internal.cache.ForceReattemptException;
 import org.apache.geode.internal.cache.wan.AsyncEventQueueTestBase;
+import org.apache.geode.test.dunit.IgnoredException;
 import org.apache.geode.test.dunit.LogWriterUtils;
 import org.apache.geode.test.dunit.SerializableRunnableIF;
 import org.apache.geode.test.dunit.VM;
@@ -1674,6 +1685,66 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
     Awaitility.waitAtMost(10000, TimeUnit.MILLISECONDS).until(() -> 
getBucketMoved(vm2, "ln"));
   }
 
+  @Test
+  public void testCacheClosedBeforeAEQWrite() {
+    Integer lnPort =
+        (Integer) vm0.invoke(() -> 
AsyncEventQueueTestBase.createFirstLocatorWithDSId(1));
+
+    vm1.invoke(createCacheRunnable(lnPort));
+    vm2.invoke(createCacheRunnable(lnPort));
+    vm3.invoke(createCacheRunnable(lnPort));
+    final DistributedMember member1 =
+        vm1.invoke(() -> cache.getDistributedSystem().getDistributedMember());
+
+    vm1.invoke(() -> addAEQWithCacheCloseFilter());
+    vm2.invoke(() -> addAEQWithCacheCloseFilter());
+
+    vm1.invoke(() -> createPersistentPartitionRegion());
+    vm2.invoke(() -> createPersistentPartitionRegion());
+    vm3.invoke(() -> {
+      AttributesFactory fact = new AttributesFactory();
+
+      PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+      pfact.setTotalNumBuckets(16);
+      pfact.setLocalMaxMemory(0);
+      fact.setPartitionAttributes(pfact.create());
+      fact.setOffHeap(isOffHeap());
+      Region r = 
cache.createRegionFactory(fact.create()).addAsyncEventQueueId("ln")
+          .create(getTestMethodName() + "_PR");
+
+    });
+
+    vm3.invoke(() -> {
+      Region r = cache.getRegion(Region.SEPARATOR + getTestMethodName() + 
"_PR");
+      r.put(1, 1);
+      r.put(2, 2);
+      // This will trigger the gateway event filter to close the cache
+      try {
+        r.removeAll(Collections.singleton(1));
+        fail("Should have received a partition offline exception");
+      } catch (PartitionOfflineException expected) {
+
+      }
+    });
+  }
+
+  private void createPersistentPartitionRegion() {
+    AttributesFactory fact = new AttributesFactory();
+
+    PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+    pfact.setTotalNumBuckets(16);
+    fact.setPartitionAttributes(pfact.create());
+    fact.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
+    fact.setOffHeap(isOffHeap());
+    Region r = 
cache.createRegionFactory(fact.create()).addAsyncEventQueueId("ln")
+        .create(getTestMethodName() + "_PR");
+  }
+
+  private void addAEQWithCacheCloseFilter() {
+    cache.createAsyncEventQueueFactory().addGatewayEventFilter(new 
CloseCacheGatewayFilter())
+        .setPersistent(true).setParallel(true).create("ln", new 
MyAsyncEventListener());
+  }
+
   private static Set<Object> getKeysSeen(VM vm, String asyncEventQueueId) {
     return vm.invoke(() -> {
       final BucketMovingAsyncEventListener listener =
@@ -1690,6 +1761,37 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
     });
   }
 
+  private final class CloseCacheGatewayFilter implements GatewayEventFilter {
+    @Override
+    public boolean beforeEnqueue(final GatewayQueueEvent event) {
+      if (event.getOperation().isRemoveAll()) {
+        new Thread(() -> cache.close()).start();
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          // ignore
+        }
+        throw new CacheClosedException();
+      }
+      return true;
+    }
+
+    @Override
+    public boolean beforeTransmit(final GatewayQueueEvent event) {
+      return false;
+    }
+
+    @Override
+    public void afterAcknowledgement(final GatewayQueueEvent event) {
+
+    }
+
+    @Override
+    public void close() {
+
+    }
+  }
+
   private static class BucketMovingAsyncEventListener implements 
AsyncEventListener {
     private final DistributedMember destination;
     private boolean moved;

Reply via email to