This is an automated email from the ASF dual-hosted git repository. nnag pushed a commit to branch release/1.10.0 in repository https://gitbox.apache.org/repos/asf/geode.git
commit fe4f31c737276f1d8a72b5562a60ab83fc9baccf Author: Naburun Nag <[email protected]> AuthorDate: Tue Sep 3 14:05:46 2019 -0700 GEODE-7129: Adding XML config for creating AEQ with paused event processing. * Cache XML fields added * Cache config fields added. --- .../AsyncEventQueueValidationsJUnitTest.java | 51 ++++++++++++++++++++++ ...entQueueConfiguredFromXmlStartsPaused.cache.xml | 36 +++++++++++++++ ...entQueueConfiguredFromXmlStartsPaused.cache.xml | 36 +++++++++++++++ .../internal/cache/xmlcache/CacheCreation.java | 3 ++ .../geode/internal/cache/xmlcache/CacheXml.java | 1 + .../internal/cache/xmlcache/CacheXmlParser.java | 9 ++++ .../geode.apache.org/schema/cache/cache-1.0.xsd | 1 + .../geode/cache/configuration/CacheConfig.java | 28 ++++++++++++ 8 files changed, 165 insertions(+) diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.java index 85d8ba3..40dc59c 100644 --- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.java @@ -24,9 +24,11 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.util.List; +import java.util.concurrent.TimeUnit; import junitparams.JUnitParamsRunner; import junitparams.Parameters; +import org.awaitility.core.ConditionTimeoutException; import org.junit.After; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -41,6 +43,7 @@ import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl; import org.apache.geode.cache.wan.GatewayEventFilter; import org.apache.geode.cache.wan.GatewaySender.OrderPolicy; import org.apache.geode.internal.cache.wan.AsyncEventQueueConfigurationException; +import org.apache.geode.internal.cache.wan.MyAsyncEventListener; import org.apache.geode.internal.cache.wan.MyGatewayEventFilter; import org.apache.geode.test.junit.categories.AEQTest; @@ -153,9 +156,57 @@ public class AsyncEventQueueValidationsJUnitTest { .until(() -> filter.getAfterAcknowledgementInvocations() == numPuts); } + @Test + @Parameters(method = "getCacheXmlFileBaseNamesForPauseTests") + public void whenAsyncQueuesAreStartedInPausedStateShouldNotDispatchEventsTillItIsUnpaused( + String cacheXmlFileBaseName) { + // Create cache with xml + String cacheXmlFileName = + createTempFileFromResource(getClass(), + getClass().getSimpleName() + "." + cacheXmlFileBaseName + ".cache.xml") + .getAbsolutePath(); + cache = new CacheFactory().set(MCAST_PORT, "0").set(CACHE_XML_FILE, cacheXmlFileName).create(); + + // Get AsyncEventQueue and GatewayEventFilter + AsyncEventQueue aeq = cache.getAsyncEventQueue(cacheXmlFileBaseName); + + // Get region and do puts + Region region = cache.getRegion(cacheXmlFileBaseName); + int numPuts = 1000; + for (int i = 0; i < numPuts; i++) { + region.put(i, i); + } + + MyAsyncEventListener listener = (MyAsyncEventListener) aeq.getAsyncEventListener(); + + // Ensure that no listeners are being invoked + try { + await().atMost(10, TimeUnit.SECONDS).until(() -> listener.getEventsMap().size() > 0); + } catch (ConditionTimeoutException ex) { + // Expected Exception + } + + // Ensure that the queue is filling up + await().atMost(60, TimeUnit.SECONDS).until(() -> ((AsyncEventQueueImpl) aeq).getSender() + .getQueues().stream().mapToInt(i -> i.size()).sum() == 1000); + + // Unpause the sender + aeq.resumeEventDispatching(); + + // Ensure that listener is being invoke after unpause + await().atMost(60, TimeUnit.SECONDS).until(() -> listener.getEventsMap().size() == 1000); + + + } + private Object[] getCacheXmlFileBaseNames() { return $(new Object[] {"testSerialAsyncEventQueueConfiguredFromXmlUsesFilter"}, new Object[] {"testParallelAsyncEventQueueConfiguredFromXmlUsesFilter"}); } + private Object[] getCacheXmlFileBaseNamesForPauseTests() { + return $(new Object[] {"testSerialAsyncEventQueueConfiguredFromXmlStartsPaused"}, + new Object[] {"testParallelAsyncEventQueueConfiguredFromXmlStartsPaused"}); + } + } diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.testParallelAsyncEventQueueConfiguredFromXmlStartsPaused.cache.xml b/geode-core/src/integrationTest/resources/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.testParallelAsyncEventQueueConfiguredFromXmlStartsPaused.cache.xml new file mode 100644 index 0000000..3bc15ac --- /dev/null +++ b/geode-core/src/integrationTest/resources/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.testParallelAsyncEventQueueConfiguredFromXmlStartsPaused.cache.xml @@ -0,0 +1,36 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<cache xmlns="http://geode.apache.org/schema/cache" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" copy-on-read="false" is-server="false" lock-lease="120" lock-timeout="60" search-timeout="300" version="1.0" xsi:schemaLocation="http://geode.apache.org/schema/cache http://geode.apache.org/schema/cache/cache-1.0.xsd"> + + <async-event-queue id="testParallelAsyncEventQueueConfiguredFromXmlStartsPaused" parallel="true" pause-event-processing="true"> + <gateway-event-filter> + <class-name>org.apache.geode.internal.cache.wan.MyGatewayEventFilter</class-name> + </gateway-event-filter> + <async-event-listener> + <class-name>org.apache.geode.internal.cache.wan.MyAsyncEventListener</class-name> + </async-event-listener> + </async-event-queue> + + <region name="testParallelAsyncEventQueueConfiguredFromXmlStartsPaused" refid="PARTITION"> + <region-attributes async-event-queue-ids="testParallelAsyncEventQueueConfiguredFromXmlStartsPaused"/> + </region> + +</cache> + diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.testSerialAsyncEventQueueConfiguredFromXmlStartsPaused.cache.xml b/geode-core/src/integrationTest/resources/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.testSerialAsyncEventQueueConfiguredFromXmlStartsPaused.cache.xml new file mode 100644 index 0000000..8cf4d6a --- /dev/null +++ b/geode-core/src/integrationTest/resources/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.testSerialAsyncEventQueueConfiguredFromXmlStartsPaused.cache.xml @@ -0,0 +1,36 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<cache xmlns="http://geode.apache.org/schema/cache" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" copy-on-read="false" is-server="false" lock-lease="120" lock-timeout="60" search-timeout="300" version="1.0" xsi:schemaLocation="http://geode.apache.org/schema/cache http://geode.apache.org/schema/cache/cache-1.0.xsd"> + + <async-event-queue id="testSerialAsyncEventQueueConfiguredFromXmlStartsPaused" parallel="false" pause-event-processing="true"> + <gateway-event-filter> + <class-name>org.apache.geode.internal.cache.wan.MyGatewayEventFilter</class-name> + </gateway-event-filter> + <async-event-listener> + <class-name>org.apache.geode.internal.cache.wan.MyAsyncEventListener</class-name> + </async-event-listener> + </async-event-queue> + + <region name="testSerialAsyncEventQueueConfiguredFromXmlStartsPaused" refid="PARTITION"> + <region-attributes async-event-queue-ids="testSerialAsyncEventQueueConfiguredFromXmlStartsPaused"/> + </region> + +</cache> + diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java index 83b3386..5d7b77b 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java @@ -552,6 +552,9 @@ public class CacheCreation implements InternalCache { AsyncEventQueueFactoryImpl asyncQueueFactory = (AsyncEventQueueFactoryImpl) cache.createAsyncEventQueueFactory(); asyncQueueFactory.configureAsyncEventQueue(asyncEventQueueCreation); + if (asyncEventQueueCreation.isDispatchingPaused()) { + asyncQueueFactory.pauseEventDispatchingToListener(); + } AsyncEventQueue asyncEventQueue = cache.getAsyncEventQueue(asyncEventQueueCreation.getId()); if (asyncEventQueue == null) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXml.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXml.java index f9be910..8cf1ceb 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXml.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXml.java @@ -750,6 +750,7 @@ public abstract class CacheXml implements EntityResolver2, ErrorHandler { protected static final String ASYNC_EVENT_LISTENER = "async-event-listener"; public static final String ASYNC_EVENT_QUEUE = "async-event-queue"; + public static final String PAUSE_EVENT_PROCESSING = "pause-event-processing"; protected static final String ASYNC_EVENT_QUEUE_IDS = "async-event-queue-ids"; protected static final String FORWARD_EXPIRATION_DESTROY = "forward-expiration-destroy"; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlParser.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlParser.java index 8ddecbe..0c05836 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlParser.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlParser.java @@ -2247,6 +2247,12 @@ public class CacheXmlParser extends CacheXml implements ContentHandler { asyncEventQueueCreation.setBatchSize(Integer.parseInt(batchSize)); } + // start in Paused state + String paused = atts.getValue(PAUSE_EVENT_PROCESSING); + if (paused != null) { + asyncEventQueueCreation.setPauseEventDispatching(Boolean.parseBoolean(paused)); + } // no else block needed as default is set to false. + // batch-time-interval String batchTimeInterval = atts.getValue(BATCH_TIME_INTERVAL); if (batchTimeInterval == null) { @@ -2363,6 +2369,9 @@ public class CacheXmlParser extends CacheXml implements ContentHandler { for (GatewayEventFilter gatewayEventFilter : gatewayEventFilters) { factory.addGatewayEventFilter(gatewayEventFilter); } + if (asyncEventChannelCreation.isDispatchingPaused()) { + factory.pauseEventDispatchingToListener(); + } factory.setGatewayEventSubstitutionListener( asyncEventChannelCreation.getGatewayEventSubstitutionFilter()); AsyncEventQueue asyncEventChannel = factory.create(asyncEventChannelCreation.getId(), diff --git a/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd b/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd index 1e0879f..5b47dfa 100755 --- a/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd +++ b/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd @@ -250,6 +250,7 @@ declarative caching XML file elements unless indicated otherwise. </xsd:sequence> <xsd:attribute name="id" type="xsd:string" use="required" /> <xsd:attribute name="parallel" type="xsd:boolean" use="optional" /> + <xsd:attribute name="pause-event-processing" type="xsd:boolean" use="optional" /> <xsd:attribute name="batch-size" type="xsd:string" use="optional" /> <xsd:attribute name="batch-time-interval" type="xsd:string" use="optional" /> <xsd:attribute name="enable-batch-conflation" type="xsd:boolean" use="optional" /> diff --git a/geode-management/src/main/java/org/apache/geode/cache/configuration/CacheConfig.java b/geode-management/src/main/java/org/apache/geode/cache/configuration/CacheConfig.java index 337b80b..f380846 100644 --- a/geode-management/src/main/java/org/apache/geode/cache/configuration/CacheConfig.java +++ b/geode-management/src/main/java/org/apache/geode/cache/configuration/CacheConfig.java @@ -1142,6 +1142,34 @@ public class CacheConfig { protected String orderPolicy; @XmlAttribute(name = "forward-expiration-destroy") protected Boolean forwardExpirationDestroy; + @XmlAttribute(name = "pause-event-processing") + protected Boolean pauseEventProcessing; + + /** + * Gets the value of whether the processing of the events queued is paused or not + * + * + * @return {@link Boolean} - true if queue will be created with paused processing of the events + * queued + * - false if queue will be created without pausing the processing of the events queued + * + */ + public Boolean isPauseEventProcessing() { + return pauseEventProcessing; + } + + /** + * Sets the value of whether the processing of the events queued is paused or not + * + * @param pauseEventProcessing {@link Boolean} - true if queue will be created with paused + * processing of the events queued + * - false if queue will be created without pausing the processing of the events + * queued + */ + + public void setPauseEventProcessing(Boolean pauseEventProcessing) { + this.pauseEventProcessing = pauseEventProcessing; + } /** * Gets the value of the gatewayEventFilters property.
