This is an automated email from the ASF dual-hosted git repository. nnag pushed a commit to branch release/1.10.0 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 1db4ea2f8a7d9115c65f3d5c37194f8df25485d0 Author: Naburun Nag <[email protected]> AuthorDate: Tue Sep 3 13:09:34 2019 -0700 GEODE-7126: Added new API to resume AEQ event processing * New API to resume event processing when event processor is paused * All queued events will be processed --- .../asyncqueue/AsyncEventQueuePausedDUnitTest.java | 242 +++++++++++++++++++++ .../AsyncEventQueueValidationsJUnitTest.java | 21 ++ .../geode/cache/asyncqueue/AsyncEventQueue.java | 10 + .../asyncqueue/internal/AsyncEventQueueImpl.java | 9 + .../cache/xmlcache/AsyncEventQueueCreation.java | 15 ++ 5 files changed, 297 insertions(+) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueuePausedDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueuePausedDUnitTest.java new file mode 100644 index 0000000..385afca --- /dev/null +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueuePausedDUnitTest.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.wan.asyncqueue; + + +import static org.apache.geode.test.awaitility.GeodeAwaitility.await; +import static org.junit.Assert.assertTrue; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collection; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.awaitility.core.ConditionTimeoutException; +import org.jetbrains.annotations.NotNull; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import org.apache.geode.DataSerializable; +import org.apache.geode.DataSerializer; +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl; +import org.apache.geode.cache.client.ClientCache; +import org.apache.geode.cache.client.ClientCacheFactory; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.internal.cache.wan.MyAsyncEventListener; +import org.apache.geode.test.dunit.rules.ClientVM; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; +import org.apache.geode.test.junit.categories.AEQTest; + +@Category({AEQTest.class}) +@RunWith(Parameterized.class) +public class AsyncEventQueuePausedDUnitTest implements Serializable { + + @Parameterized.Parameters + public static Collection<Object[]> data() { + return Arrays.asList(new Object[][] { + // RegionType , isParallel AEQ + {RegionShortcut.PARTITION, true}, + {RegionShortcut.PARTITION, false}, + {RegionShortcut.REPLICATE, false}, + {RegionShortcut.PARTITION_PERSISTENT, true}, + {RegionShortcut.PARTITION_PERSISTENT, false}, + {RegionShortcut.PARTITION_REDUNDANT, true}, + {RegionShortcut.PARTITION_REDUNDANT, false}, + {RegionShortcut.REPLICATE_PERSISTENT, false}, + {RegionShortcut.PARTITION_REDUNDANT_PERSISTENT, true}, + {RegionShortcut.PARTITION_REDUNDANT_PERSISTENT, false} + }); + } + + @Parameterized.Parameter + public static RegionShortcut regionShortcut; + + @Parameterized.Parameter(1) + public static boolean isParallel; + + @Rule + public ClusterStartupRule lsRule = new ClusterStartupRule(4); + + private static MemberVM locator, server1, server2; + private static ClientVM client; + + @Before + public void beforeClass() throws Exception { + locator = lsRule.startLocatorVM(0); + server1 = lsRule.startServerVM(1, "group1", locator.getPort()); + server2 = lsRule.startServerVM(2, "group1", locator.getPort()); + int serverPort = server1.getPort(); + client = + lsRule.startClientVM(3, new Properties(), + clientCacheFactory -> configureClientCacheFactory(clientCacheFactory, serverPort)); + } + + private static void configureClientCacheFactory(ClientCacheFactory ccf, int... serverPorts) { + for (int serverPort : serverPorts) { + ccf.addPoolServer("localhost", serverPort); + } + ccf.setPoolReadTimeout(10 * 60 * 1000); // 10 min + ccf.setPoolSubscriptionEnabled(true); + } + + @Test + public void whenAEQCreatedInPausedStateThenListenersMustNotBeInvoked() { + final AEQandRegionProperties props = new AEQandRegionProperties(regionShortcut, isParallel); + server1.invoke(() -> { + createRegionAndDispatchingPausedAEQ(props); + }); + server2.invoke(() -> { + createRegionAndDispatchingPausedAEQ(props); + }); + client.invoke(() -> { + createClientRegion(); + }); + + server1.invoke(() -> { + validateAEQDispatchingIsPaused(); + }); + + server2.invoke(() -> { + validateAEQDispatchingIsPaused(); + }); + + // Resume dispatching. + server1.invoke(() -> { + ClusterStartupRule.getCache().getAsyncEventQueue("aeqID").resumeEventDispatching(); + }); + + server2.invoke(() -> { + ClusterStartupRule.getCache().getAsyncEventQueue("aeqID").resumeEventDispatching(); + }); + + // Validate dispatching resumed. + await().atMost(1, TimeUnit.MINUTES).until(() -> { + + final int count1 = server1.invoke(() -> getEventDispatchedSize()); + final int count2 = server2.invoke(() -> getEventDispatchedSize()); + if ((count1 + count2) == 1000) { + return true; + } else { + return false; + } + }); + + } + + @NotNull + private static Integer getEventDispatchedSize() { + Cache cache = ClusterStartupRule.getCache(); + AsyncEventQueueImpl aeq = (AsyncEventQueueImpl) cache.getAsyncEventQueue("aeqID"); + MyAsyncEventListener listener = (MyAsyncEventListener) aeq.getAsyncEventListener(); + return listener.getEventsMap().size(); + } + + private static void createClientRegion() { + ClientCache cache = ClusterStartupRule.getClientCache(); + Region region = cache.createClientRegionFactory(ClientRegionShortcut.PROXY) + .create("region"); + for (int i = 0; i < 1000; i++) { + region.put(i, i); + } + } + + private static void createRegionAndDispatchingPausedAEQ(AEQandRegionProperties props) { + Cache cache = ClusterStartupRule.getCache(); + cache.createAsyncEventQueueFactory() + .pauseEventDispatchingToListener() + .setParallel(props.isParallel()) + .setPersistent(isPersistent(props)) + .create("aeqID", new MyAsyncEventListener()); + cache.createRegionFactory(props.getRegionShortcut()) + .addAsyncEventQueueId("aeqID") + .create("region"); + } + + private static boolean isPersistent(AEQandRegionProperties props) { + switch (props.getRegionShortcut()) { + case PARTITION: + case REPLICATE: + case PARTITION_REDUNDANT: + return false; + default: + return true; + } + } + + private static void validateAEQDispatchingIsPaused() { + Cache cache = ClusterStartupRule.getCache(); + AsyncEventQueueImpl aeq = (AsyncEventQueueImpl) cache.getAsyncEventQueue("aeqID"); + assertTrue(aeq.isDispatchingPaused()); + MyAsyncEventListener listener = (MyAsyncEventListener) aeq.getAsyncEventListener(); + try { + await().atMost(10, TimeUnit.SECONDS).until(() -> listener.getEventsMap().size() > 0); + } catch (ConditionTimeoutException ex) { + // Expected Exception + } + // Ensure that the the queues are filling up + assertTrue(aeq.getSender().getQueues().stream().mapToInt(i -> i.size()).sum() == 1000); + } + + class AEQandRegionProperties implements DataSerializable, Serializable { + RegionShortcut regionShortcut; + boolean isParallel; + + public AEQandRegionProperties(RegionShortcut regionShortcut, boolean isParallel) { + this.regionShortcut = regionShortcut; + this.isParallel = isParallel; + } + + public RegionShortcut getRegionShortcut() { + return regionShortcut; + } + + public void setRegionShortcut(RegionShortcut regionShortcut) { + this.regionShortcut = regionShortcut; + } + + public boolean isParallel() { + return isParallel; + } + + public void setParallel(boolean parallel) { + isParallel = parallel; + } + + @Override + public void toData(DataOutput out) throws IOException { + DataSerializer.writeObject(regionShortcut, out); + out.writeBoolean(isParallel); + } + + @Override + public void fromData(DataInput in) throws IOException, ClassNotFoundException { + regionShortcut = DataSerializer.readObject(in); + isParallel = in.readBoolean(); + } + } + +} diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.java index 84f99c9..85d8ba3 100644 --- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.java @@ -19,6 +19,7 @@ import static org.apache.geode.distributed.ConfigurationProperties.CACHE_XML_FIL import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; import static org.apache.geode.test.awaitility.GeodeAwaitility.await; import static org.apache.geode.test.util.ResourceUtils.createTempFileFromResource; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -86,6 +87,26 @@ public class AsyncEventQueueValidationsJUnitTest { } @Test + @Parameters({"true", "false"}) + public void whenAEQCreatedInPausedStateIsUnPausedThenSenderIsResumed(boolean isParallel) { + cache = new CacheFactory().set(MCAST_PORT, "0").create(); + AsyncEventQueueFactory fact = cache.createAsyncEventQueueFactory() + .setParallel(isParallel) + .pauseEventDispatchingToListener() + .setDispatcherThreads(5); + AsyncEventQueue aeq = + fact.create("aeqID", new org.apache.geode.internal.cache.wan.MyAsyncEventListener()); + assertTrue(aeq.isDispatchingPaused()); + assertTrue(((AsyncEventQueueImpl) aeq).getSender().isPaused()); + + aeq.resumeEventDispatching(); + + assertFalse(aeq.isDispatchingPaused()); + assertFalse(((AsyncEventQueueImpl) aeq).getSender().isPaused()); + + } + + @Test public void testConcurrentParallelAsyncEventQueueAttributesOrderPolicyThread() { cache = new CacheFactory().set(MCAST_PORT, "0").create(); try { diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/AsyncEventQueue.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/AsyncEventQueue.java index a1097ff..b06137a 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/AsyncEventQueue.java +++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/AsyncEventQueue.java @@ -147,4 +147,14 @@ public interface AsyncEventQueue { */ boolean isForwardExpirationDestroy(); + /** + * Resumes the dispatching of then events queued to the listener. + */ + void resumeEventDispatching(); + + /** + * Returns whether the queue is processing queued events or is paused + */ + boolean isDispatchingPaused(); + } diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java index ae93b0a..df7c908 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java +++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java @@ -36,6 +36,15 @@ public class AsyncEventQueueImpl implements InternalAsyncEventQueue { this.asyncEventListener = asyncEventListener; } + public void resumeEventDispatching() { + this.sender.resume(); + } + + @Override + public boolean isDispatchingPaused() { + return sender.isPaused(); + } + @Override public String getId() { return getAsyncEventQueueIdFromSenderId(sender.getId()); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/AsyncEventQueueCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/AsyncEventQueueCreation.java index 7c315da..eb29f01 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/AsyncEventQueueCreation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/AsyncEventQueueCreation.java @@ -42,6 +42,7 @@ public class AsyncEventQueueCreation implements AsyncEventQueue { private int dispatcherThreads = 1; private OrderPolicy orderPolicy = OrderPolicy.KEY; private boolean forwardExpirationDestroy = false; + private boolean pauseEventDispatching = false; public AsyncEventQueueCreation() {} @@ -74,6 +75,15 @@ public class AsyncEventQueueCreation implements AsyncEventQueue { this.asyncEventListener = eventListener; } + @Override + public boolean isDispatchingPaused() { + return pauseEventDispatching; + } + + public void setPauseEventDispatching(boolean pauseEventDispatching) { + this.pauseEventDispatching = pauseEventDispatching; + } + public void addGatewayEventFilter(GatewayEventFilter filter) { this.gatewayEventFilters.add(filter); } @@ -227,4 +237,9 @@ public class AsyncEventQueueCreation implements AsyncEventQueue { public boolean isForwardExpirationDestroy() { return this.forwardExpirationDestroy; } + + @Override + public void resumeEventDispatching() { + this.pauseEventDispatching = false; + } }
