This is an automated email from the ASF dual-hosted git repository. nnag pushed a commit to branch release/1.10.0 in repository https://gitbox.apache.org/repos/asf/geode.git
commit f7f675a00a7c0bcff2eec36a1fca68d1d7dba43f Author: Nabarun Nag <[email protected]> AuthorDate: Fri Sep 13 14:35:46 2019 -0700 GEODE-7179: alter async queue command to change state of event processor during creation (#4057) * Alter the state of the event processor during the creation of the AEQ * The state can be changed to paused from not paused and vice versa. Co-authored-by: Donal Evans <[email protected]> Co-authored-by: Nabarun Nag <[email protected]> Co-authored-by: Benjamin Ross <[email protected]> --- .../AlterAsyncEventQueueCommandDUnitTest.java | 147 +++++++++++++++++++-- .../cli/commands/AlterAsyncEventQueueCommand.java | 12 +- 2 files changed, 147 insertions(+), 12 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/AlterAsyncEventQueueCommandDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/AlterAsyncEventQueueCommandDUnitTest.java index ba42d27..0a3c575 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/AlterAsyncEventQueueCommandDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/AlterAsyncEventQueueCommandDUnitTest.java @@ -15,6 +15,7 @@ package org.apache.geode.management.internal.cli.commands; +import static org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl.DEFAULT_BATCH_TIME_INTERVAL; import static org.assertj.core.api.Assertions.assertThat; import org.junit.Before; @@ -23,6 +24,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.apache.geode.cache.asyncqueue.AsyncEventQueue; +import org.apache.geode.cache.wan.GatewaySender; import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.wan.MyAsyncEventListener; import org.apache.geode.test.dunit.rules.ClusterStartupRule; @@ -34,6 +36,10 @@ import org.apache.geode.test.junit.rules.GfshCommandRule; @Category({AEQTest.class}) public class AlterAsyncEventQueueCommandDUnitTest { + private static final int ALTERED_BATCH_SIZE = 200; + private static final int ALTERED_BATCH_TIME_INTERVAL = 300; + private static final int ALTERED_MAXIMUM_QUEUE_MEMORY = 400; + @Rule public ClusterStartupRule lsRule = new ClusterStartupRule(); @@ -60,22 +66,140 @@ public class AlterAsyncEventQueueCommandDUnitTest { server1.invoke(() -> { InternalCache cache = ClusterStartupRule.getCache(); AsyncEventQueue queue = cache.getAsyncEventQueue("queue1"); - assertThat(queue.getBatchSize()).isEqualTo(100); - assertThat(queue.getBatchTimeInterval()).isEqualTo(5); - assertThat(queue.getMaximumQueueMemory()).isEqualTo(100); + assertThat(queue.getBatchSize()).isEqualTo(GatewaySender.DEFAULT_BATCH_SIZE); + assertThat(queue.getBatchTimeInterval()).isEqualTo(DEFAULT_BATCH_TIME_INTERVAL); + assertThat(queue.getMaximumQueueMemory()) + .isEqualTo(GatewaySender.DEFAULT_MAXIMUM_QUEUE_MEMORY); + }); + + gfsh.executeAndAssertThat("alter async-event-queue --id=queue1 --batch-size=" + + ALTERED_BATCH_SIZE + " --batch-time-interval=" + ALTERED_BATCH_TIME_INTERVAL + + " --max-queue-memory=" + ALTERED_MAXIMUM_QUEUE_MEMORY).statusIsSuccess(); + + // verify that server1's event queue still has the default value + // without restart + server1.invoke(() -> { + InternalCache cache = ClusterStartupRule.getCache(); + AsyncEventQueue queue = cache.getAsyncEventQueue("queue1"); + assertThat(queue.getBatchSize()).isEqualTo(GatewaySender.DEFAULT_BATCH_SIZE); + assertThat(queue.getBatchTimeInterval()).isEqualTo(DEFAULT_BATCH_TIME_INTERVAL); + assertThat(queue.getMaximumQueueMemory()) + .isEqualTo(GatewaySender.DEFAULT_MAXIMUM_QUEUE_MEMORY); + assertThat(cache.getAsyncEventQueue("queue2")).isNull(); + }); + + // restart locator and server without clearing the file system + server1.stop(false); + locator.stop(false); + + locator = lsRule.startLocatorVM(0); + server1 = lsRule.startServerVM(1, "group1", locator.getPort()); + // verify that server1's queue is updated + server1.invoke(() -> { + InternalCache cache = ClusterStartupRule.getCache(); + AsyncEventQueue queue = cache.getAsyncEventQueue("queue1"); + assertThat(queue.getBatchSize()).isEqualTo(ALTERED_BATCH_SIZE); + assertThat(queue.getBatchTimeInterval()).isEqualTo(ALTERED_BATCH_TIME_INTERVAL); + assertThat(queue.getMaximumQueueMemory()).isEqualTo(ALTERED_MAXIMUM_QUEUE_MEMORY); + assertThat(cache.getAsyncEventQueue("queue2")).isNull(); + }); + } + + @Test + public void whenAlterCommandUsedToChangeFromPauseToResumeThenAEQBehaviorMustChange() + throws Exception { + gfsh.executeAndAssertThat( + "create async-event-queue --pause-event-processing=true --id=queue1 --group=group1 --listener=" + + MyAsyncEventListener.class.getName()) + .statusIsSuccess(); + + locator.waitUntilAsyncEventQueuesAreReadyOnExactlyThisManyServers("queue1", 1); + + // verify that server1's event queue has the default value + server1.invoke(() -> { + InternalCache cache = ClusterStartupRule.getCache(); + AsyncEventQueue queue = cache.getAsyncEventQueue("queue1"); + assertThat(queue.getBatchSize()).isEqualTo(GatewaySender.DEFAULT_BATCH_SIZE); + assertThat(queue.getBatchTimeInterval()).isEqualTo(DEFAULT_BATCH_TIME_INTERVAL); + assertThat(queue.isDispatchingPaused()).isTrue(); + assertThat(queue.getMaximumQueueMemory()) + .isEqualTo(GatewaySender.DEFAULT_MAXIMUM_QUEUE_MEMORY); + }); + + gfsh.executeAndAssertThat( + "alter async-event-queue --id=queue1 --pause-event-processing=false --batch-size=" + + ALTERED_BATCH_SIZE + " --batch-time-interval=" + ALTERED_BATCH_TIME_INTERVAL + + " --max-queue-memory=" + ALTERED_MAXIMUM_QUEUE_MEMORY) + .statusIsSuccess(); + + // verify that server1's event queue still has the default value + // without restart + server1.invoke(() -> { + InternalCache cache = ClusterStartupRule.getCache(); + AsyncEventQueue queue = cache.getAsyncEventQueue("queue1"); + assertThat(queue.getBatchSize()).isEqualTo(GatewaySender.DEFAULT_BATCH_SIZE); + assertThat(queue.getBatchTimeInterval()).isEqualTo(DEFAULT_BATCH_TIME_INTERVAL); + assertThat(queue.getMaximumQueueMemory()) + .isEqualTo(GatewaySender.DEFAULT_MAXIMUM_QUEUE_MEMORY); + assertThat(queue.isDispatchingPaused()).isTrue(); + assertThat(cache.getAsyncEventQueue("queue2")).isNull(); + }); + + // restart locator and server without clearing the file system + server1.stop(false); + locator.stop(false); + + locator = lsRule.startLocatorVM(0); + server1 = lsRule.startServerVM(1, "group1", locator.getPort()); + // verify that server1's queue is updated + server1.invoke(() -> { + InternalCache cache = ClusterStartupRule.getCache(); + AsyncEventQueue queue = cache.getAsyncEventQueue("queue1"); + assertThat(queue.getBatchSize()).isEqualTo(ALTERED_BATCH_SIZE); + assertThat(queue.getBatchTimeInterval()).isEqualTo(ALTERED_BATCH_TIME_INTERVAL); + assertThat(queue.getMaximumQueueMemory()).isEqualTo(ALTERED_MAXIMUM_QUEUE_MEMORY); + assertThat(queue.isDispatchingPaused()).isFalse(); + assertThat(cache.getAsyncEventQueue("queue2")).isNull(); + }); + } + + @Test + public void whenAlterCommandUsedToChangeFromResumeStateToPausedThenAEQBehaviorMustChange() + throws Exception { + gfsh.executeAndAssertThat( + "create async-event-queue --pause-event-processing=false --id=queue1 --group=group1 --listener=" + + MyAsyncEventListener.class.getName()) + .statusIsSuccess(); + + locator.waitUntilAsyncEventQueuesAreReadyOnExactlyThisManyServers("queue1", 1); + + // verify that server1's event queue has the default value + server1.invoke(() -> { + InternalCache cache = ClusterStartupRule.getCache(); + AsyncEventQueue queue = cache.getAsyncEventQueue("queue1"); + assertThat(queue.getBatchSize()).isEqualTo(GatewaySender.DEFAULT_BATCH_SIZE); + assertThat(queue.getBatchTimeInterval()).isEqualTo(DEFAULT_BATCH_TIME_INTERVAL); + assertThat(queue.isDispatchingPaused()).isFalse(); + assertThat(queue.getMaximumQueueMemory()) + .isEqualTo(GatewaySender.DEFAULT_MAXIMUM_QUEUE_MEMORY); }); - gfsh.executeAndAssertThat("alter async-event-queue --id=queue1 " + "--batch-size=200 " - + "--batch-time-interval=300 " + "--max-queue-memory=400").statusIsSuccess(); + gfsh.executeAndAssertThat( + "alter async-event-queue --id=queue1 --pause-event-processing=true --batch-size=" + + ALTERED_BATCH_SIZE + " --batch-time-interval=" + ALTERED_BATCH_TIME_INTERVAL + + " --max-queue-memory=" + ALTERED_MAXIMUM_QUEUE_MEMORY) + .statusIsSuccess(); // verify that server1's event queue still has the default value // without restart server1.invoke(() -> { InternalCache cache = ClusterStartupRule.getCache(); AsyncEventQueue queue = cache.getAsyncEventQueue("queue1"); - assertThat(queue.getBatchSize()).isEqualTo(100); - assertThat(queue.getBatchTimeInterval()).isEqualTo(5); - assertThat(queue.getMaximumQueueMemory()).isEqualTo(100); + assertThat(queue.getBatchSize()).isEqualTo(GatewaySender.DEFAULT_BATCH_SIZE); + assertThat(queue.getBatchTimeInterval()).isEqualTo(DEFAULT_BATCH_TIME_INTERVAL); + assertThat(queue.getMaximumQueueMemory()) + .isEqualTo(GatewaySender.DEFAULT_MAXIMUM_QUEUE_MEMORY); + assertThat(queue.isDispatchingPaused()).isFalse(); assertThat(cache.getAsyncEventQueue("queue2")).isNull(); }); @@ -89,9 +213,10 @@ public class AlterAsyncEventQueueCommandDUnitTest { server1.invoke(() -> { InternalCache cache = ClusterStartupRule.getCache(); AsyncEventQueue queue = cache.getAsyncEventQueue("queue1"); - assertThat(queue.getBatchSize()).isEqualTo(200); - assertThat(queue.getBatchTimeInterval()).isEqualTo(300); - assertThat(queue.getMaximumQueueMemory()).isEqualTo(400); + assertThat(queue.getBatchSize()).isEqualTo(ALTERED_BATCH_SIZE); + assertThat(queue.getBatchTimeInterval()).isEqualTo(ALTERED_BATCH_TIME_INTERVAL); + assertThat(queue.getMaximumQueueMemory()).isEqualTo(ALTERED_MAXIMUM_QUEUE_MEMORY); + assertThat(queue.isDispatchingPaused()).isTrue(); assertThat(cache.getAsyncEventQueue("queue2")).isNull(); }); } diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/AlterAsyncEventQueueCommand.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/AlterAsyncEventQueueCommand.java index ebc53de..d3d60d0 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/AlterAsyncEventQueueCommand.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/AlterAsyncEventQueueCommand.java @@ -68,6 +68,9 @@ public class AlterAsyncEventQueueCommand extends SingleGfshCommand implements static final String BATCH_TIME_INTERVAL_HELP = CREATE_ASYNC_EVENT_QUEUE__BATCHTIMEINTERVAL__HELP; static final String MAXIMUM_QUEUE_MEMORY_HELP = CREATE_ASYNC_EVENT_QUEUE__MAXIMUM_QUEUE_MEMORY__HELP; + static final String PAUSE_EVENT_PROCESSING = "pause-event-processing"; + static final String PAUSE_EVENT_PROCESSING_HELP = + "Pause event processing when the async event queue is created"; @CliCommand(value = COMMAND_NAME, help = COMMAND_HELP) @CliMetaData( @@ -80,7 +83,10 @@ public class AlterAsyncEventQueueCommand extends SingleGfshCommand implements help = BATCH_TIME_INTERVAL_HELP) Integer batchTimeInterval, @CliOption(key = MAX_QUEUE_MEMORY, help = MAXIMUM_QUEUE_MEMORY_HELP) Integer maxQueueMemory, @CliOption(key = IFEXISTS, help = IFEXISTS_HELP, specifiedDefaultValue = "true", - unspecifiedDefaultValue = "false") boolean ifExists) + unspecifiedDefaultValue = "false") boolean ifExists, + @CliOption(key = PAUSE_EVENT_PROCESSING, help = PAUSE_EVENT_PROCESSING_HELP, + specifiedDefaultValue = "true", + unspecifiedDefaultValue = "false") boolean pauseEventProcessing) throws IOException, SAXException, ParserConfigurationException, TransformerException, EntityNotFoundException { @@ -98,6 +104,7 @@ public class AlterAsyncEventQueueCommand extends SingleGfshCommand implements CacheConfig.AsyncEventQueue aeqConfiguration = new CacheConfig.AsyncEventQueue(); aeqConfiguration.setId(id); + aeqConfiguration.setPauseEventProcessing(pauseEventProcessing); if (batchSize != null) { aeqConfiguration.setBatchSize(batchSize + ""); @@ -163,6 +170,9 @@ public class AlterAsyncEventQueueCommand extends SingleGfshCommand implements if (StringUtils.isNotBlank(aeqConfiguration.getMaximumQueueMemory())) { queue.setMaximumQueueMemory(aeqConfiguration.getMaximumQueueMemory()); } + if (aeqConfiguration.isPauseEventProcessing() != null) { + queue.setPauseEventProcessing(aeqConfiguration.isPauseEventProcessing()); + } aeqConfigsHaveBeenUpdated = true; }
