This is an automated email from the ASF dual-hosted git repository. nnag pushed a commit to branch revert-4046-feature/GEODE-7179 in repository https://gitbox.apache.org/repos/asf/geode.git
commit f70ca5ed3652459f6ca54b40cc2a8ba450c54a27 Author: Nabarun Nag <[email protected]> AuthorDate: Fri Sep 13 11:05:17 2019 -0700 Revert "GEODE-7179: Add option to AlterAsyncEventQueue gfsh command for pause-event processing" --- .../AlterAsyncEventQueueCommandDUnitTest.java | 147 ++------------------- .../cli/commands/AlterAsyncEventQueueCommand.java | 12 +- 2 files changed, 12 insertions(+), 147 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/AlterAsyncEventQueueCommandDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/AlterAsyncEventQueueCommandDUnitTest.java index 0a3c575..ba42d27 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/AlterAsyncEventQueueCommandDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/AlterAsyncEventQueueCommandDUnitTest.java @@ -15,7 +15,6 @@ package org.apache.geode.management.internal.cli.commands; -import static org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl.DEFAULT_BATCH_TIME_INTERVAL; import static org.assertj.core.api.Assertions.assertThat; import org.junit.Before; @@ -24,7 +23,6 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.apache.geode.cache.asyncqueue.AsyncEventQueue; -import org.apache.geode.cache.wan.GatewaySender; import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.wan.MyAsyncEventListener; import org.apache.geode.test.dunit.rules.ClusterStartupRule; @@ -36,10 +34,6 @@ import org.apache.geode.test.junit.rules.GfshCommandRule; @Category({AEQTest.class}) public class AlterAsyncEventQueueCommandDUnitTest { - private static final int ALTERED_BATCH_SIZE = 200; - private static final int ALTERED_BATCH_TIME_INTERVAL = 300; - private static final int ALTERED_MAXIMUM_QUEUE_MEMORY = 400; - @Rule public ClusterStartupRule lsRule = new ClusterStartupRule(); @@ -66,140 +60,22 @@ public class AlterAsyncEventQueueCommandDUnitTest { server1.invoke(() -> { InternalCache cache = ClusterStartupRule.getCache(); AsyncEventQueue queue = cache.getAsyncEventQueue("queue1"); - assertThat(queue.getBatchSize()).isEqualTo(GatewaySender.DEFAULT_BATCH_SIZE); - assertThat(queue.getBatchTimeInterval()).isEqualTo(DEFAULT_BATCH_TIME_INTERVAL); - assertThat(queue.getMaximumQueueMemory()) - .isEqualTo(GatewaySender.DEFAULT_MAXIMUM_QUEUE_MEMORY); - }); - - gfsh.executeAndAssertThat("alter async-event-queue --id=queue1 --batch-size=" - + ALTERED_BATCH_SIZE + " --batch-time-interval=" + ALTERED_BATCH_TIME_INTERVAL - + " --max-queue-memory=" + ALTERED_MAXIMUM_QUEUE_MEMORY).statusIsSuccess(); - - // verify that server1's event queue still has the default value - // without restart - server1.invoke(() -> { - InternalCache cache = ClusterStartupRule.getCache(); - AsyncEventQueue queue = cache.getAsyncEventQueue("queue1"); - assertThat(queue.getBatchSize()).isEqualTo(GatewaySender.DEFAULT_BATCH_SIZE); - assertThat(queue.getBatchTimeInterval()).isEqualTo(DEFAULT_BATCH_TIME_INTERVAL); - assertThat(queue.getMaximumQueueMemory()) - .isEqualTo(GatewaySender.DEFAULT_MAXIMUM_QUEUE_MEMORY); - assertThat(cache.getAsyncEventQueue("queue2")).isNull(); - }); - - // restart locator and server without clearing the file system - server1.stop(false); - locator.stop(false); - - locator = lsRule.startLocatorVM(0); - server1 = lsRule.startServerVM(1, "group1", locator.getPort()); - // verify that server1's queue is updated - server1.invoke(() -> { - InternalCache cache = ClusterStartupRule.getCache(); - AsyncEventQueue queue = cache.getAsyncEventQueue("queue1"); - assertThat(queue.getBatchSize()).isEqualTo(ALTERED_BATCH_SIZE); - assertThat(queue.getBatchTimeInterval()).isEqualTo(ALTERED_BATCH_TIME_INTERVAL); - assertThat(queue.getMaximumQueueMemory()).isEqualTo(ALTERED_MAXIMUM_QUEUE_MEMORY); - assertThat(cache.getAsyncEventQueue("queue2")).isNull(); - }); - } - - @Test - public void whenAlterCommandUsedToChangeFromPauseToResumeThenAEQBehaviorMustChange() - throws Exception { - gfsh.executeAndAssertThat( - "create async-event-queue --pause-event-processing=true --id=queue1 --group=group1 --listener=" - + MyAsyncEventListener.class.getName()) - .statusIsSuccess(); - - locator.waitUntilAsyncEventQueuesAreReadyOnExactlyThisManyServers("queue1", 1); - - // verify that server1's event queue has the default value - server1.invoke(() -> { - InternalCache cache = ClusterStartupRule.getCache(); - AsyncEventQueue queue = cache.getAsyncEventQueue("queue1"); - assertThat(queue.getBatchSize()).isEqualTo(GatewaySender.DEFAULT_BATCH_SIZE); - assertThat(queue.getBatchTimeInterval()).isEqualTo(DEFAULT_BATCH_TIME_INTERVAL); - assertThat(queue.isDispatchingPaused()).isTrue(); - assertThat(queue.getMaximumQueueMemory()) - .isEqualTo(GatewaySender.DEFAULT_MAXIMUM_QUEUE_MEMORY); - }); - - gfsh.executeAndAssertThat( - "alter async-event-queue --id=queue1 --pause-event-processing=false --batch-size=" - + ALTERED_BATCH_SIZE + " --batch-time-interval=" + ALTERED_BATCH_TIME_INTERVAL - + " --max-queue-memory=" + ALTERED_MAXIMUM_QUEUE_MEMORY) - .statusIsSuccess(); - - // verify that server1's event queue still has the default value - // without restart - server1.invoke(() -> { - InternalCache cache = ClusterStartupRule.getCache(); - AsyncEventQueue queue = cache.getAsyncEventQueue("queue1"); - assertThat(queue.getBatchSize()).isEqualTo(GatewaySender.DEFAULT_BATCH_SIZE); - assertThat(queue.getBatchTimeInterval()).isEqualTo(DEFAULT_BATCH_TIME_INTERVAL); - assertThat(queue.getMaximumQueueMemory()) - .isEqualTo(GatewaySender.DEFAULT_MAXIMUM_QUEUE_MEMORY); - assertThat(queue.isDispatchingPaused()).isTrue(); - assertThat(cache.getAsyncEventQueue("queue2")).isNull(); - }); - - // restart locator and server without clearing the file system - server1.stop(false); - locator.stop(false); - - locator = lsRule.startLocatorVM(0); - server1 = lsRule.startServerVM(1, "group1", locator.getPort()); - // verify that server1's queue is updated - server1.invoke(() -> { - InternalCache cache = ClusterStartupRule.getCache(); - AsyncEventQueue queue = cache.getAsyncEventQueue("queue1"); - assertThat(queue.getBatchSize()).isEqualTo(ALTERED_BATCH_SIZE); - assertThat(queue.getBatchTimeInterval()).isEqualTo(ALTERED_BATCH_TIME_INTERVAL); - assertThat(queue.getMaximumQueueMemory()).isEqualTo(ALTERED_MAXIMUM_QUEUE_MEMORY); - assertThat(queue.isDispatchingPaused()).isFalse(); - assertThat(cache.getAsyncEventQueue("queue2")).isNull(); - }); - } - - @Test - public void whenAlterCommandUsedToChangeFromResumeStateToPausedThenAEQBehaviorMustChange() - throws Exception { - gfsh.executeAndAssertThat( - "create async-event-queue --pause-event-processing=false --id=queue1 --group=group1 --listener=" - + MyAsyncEventListener.class.getName()) - .statusIsSuccess(); - - locator.waitUntilAsyncEventQueuesAreReadyOnExactlyThisManyServers("queue1", 1); - - // verify that server1's event queue has the default value - server1.invoke(() -> { - InternalCache cache = ClusterStartupRule.getCache(); - AsyncEventQueue queue = cache.getAsyncEventQueue("queue1"); - assertThat(queue.getBatchSize()).isEqualTo(GatewaySender.DEFAULT_BATCH_SIZE); - assertThat(queue.getBatchTimeInterval()).isEqualTo(DEFAULT_BATCH_TIME_INTERVAL); - assertThat(queue.isDispatchingPaused()).isFalse(); - assertThat(queue.getMaximumQueueMemory()) - .isEqualTo(GatewaySender.DEFAULT_MAXIMUM_QUEUE_MEMORY); + assertThat(queue.getBatchSize()).isEqualTo(100); + assertThat(queue.getBatchTimeInterval()).isEqualTo(5); + assertThat(queue.getMaximumQueueMemory()).isEqualTo(100); }); - gfsh.executeAndAssertThat( - "alter async-event-queue --id=queue1 --pause-event-processing=true --batch-size=" - + ALTERED_BATCH_SIZE + " --batch-time-interval=" + ALTERED_BATCH_TIME_INTERVAL - + " --max-queue-memory=" + ALTERED_MAXIMUM_QUEUE_MEMORY) - .statusIsSuccess(); + gfsh.executeAndAssertThat("alter async-event-queue --id=queue1 " + "--batch-size=200 " + + "--batch-time-interval=300 " + "--max-queue-memory=400").statusIsSuccess(); // verify that server1's event queue still has the default value // without restart server1.invoke(() -> { InternalCache cache = ClusterStartupRule.getCache(); AsyncEventQueue queue = cache.getAsyncEventQueue("queue1"); - assertThat(queue.getBatchSize()).isEqualTo(GatewaySender.DEFAULT_BATCH_SIZE); - assertThat(queue.getBatchTimeInterval()).isEqualTo(DEFAULT_BATCH_TIME_INTERVAL); - assertThat(queue.getMaximumQueueMemory()) - .isEqualTo(GatewaySender.DEFAULT_MAXIMUM_QUEUE_MEMORY); - assertThat(queue.isDispatchingPaused()).isFalse(); + assertThat(queue.getBatchSize()).isEqualTo(100); + assertThat(queue.getBatchTimeInterval()).isEqualTo(5); + assertThat(queue.getMaximumQueueMemory()).isEqualTo(100); assertThat(cache.getAsyncEventQueue("queue2")).isNull(); }); @@ -213,10 +89,9 @@ public class AlterAsyncEventQueueCommandDUnitTest { server1.invoke(() -> { InternalCache cache = ClusterStartupRule.getCache(); AsyncEventQueue queue = cache.getAsyncEventQueue("queue1"); - assertThat(queue.getBatchSize()).isEqualTo(ALTERED_BATCH_SIZE); - assertThat(queue.getBatchTimeInterval()).isEqualTo(ALTERED_BATCH_TIME_INTERVAL); - assertThat(queue.getMaximumQueueMemory()).isEqualTo(ALTERED_MAXIMUM_QUEUE_MEMORY); - assertThat(queue.isDispatchingPaused()).isTrue(); + assertThat(queue.getBatchSize()).isEqualTo(200); + assertThat(queue.getBatchTimeInterval()).isEqualTo(300); + assertThat(queue.getMaximumQueueMemory()).isEqualTo(400); assertThat(cache.getAsyncEventQueue("queue2")).isNull(); }); } diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/AlterAsyncEventQueueCommand.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/AlterAsyncEventQueueCommand.java index 9c14a45..2f33da1 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/AlterAsyncEventQueueCommand.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/AlterAsyncEventQueueCommand.java @@ -68,9 +68,6 @@ public class AlterAsyncEventQueueCommand extends SingleGfshCommand implements static final String BATCH_TIME_INTERVAL_HELP = CREATE_ASYNC_EVENT_QUEUE__BATCHTIMEINTERVAL__HELP; static final String MAXIMUM_QUEUE_MEMORY_HELP = CREATE_ASYNC_EVENT_QUEUE__MAXIMUM_QUEUE_MEMORY__HELP; - static final String PAUSE_EVENT_PROCESSING = "pause-event-processing"; - static final String PAUSE_EVENT_PROCESSING_HELP = - "Pause event processing when the async event queue is created"; @CliCommand(value = COMMAND_NAME, help = COMMAND_HELP) @CliMetaData( @@ -83,10 +80,7 @@ public class AlterAsyncEventQueueCommand extends SingleGfshCommand implements help = BATCH_TIME_INTERVAL_HELP) Integer batchTimeInterval, @CliOption(key = MAX_QUEUE_MEMORY, help = MAXIMUM_QUEUE_MEMORY_HELP) Integer maxQueueMemory, @CliOption(key = IFEXISTS, help = IFEXISTS_HELP, specifiedDefaultValue = "true", - unspecifiedDefaultValue = "false") boolean ifExists, - @CliOption(key = PAUSE_EVENT_PROCESSING, help = PAUSE_EVENT_PROCESSING_HELP, - specifiedDefaultValue = "true", - unspecifiedDefaultValue = "false") boolean pauseEventProcessing) + unspecifiedDefaultValue = "false") boolean ifExists) throws IOException, SAXException, ParserConfigurationException, TransformerException, EntityNotFoundException { @@ -104,7 +98,6 @@ public class AlterAsyncEventQueueCommand extends SingleGfshCommand implements CacheConfig.AsyncEventQueue aeqConfiguration = new CacheConfig.AsyncEventQueue(); aeqConfiguration.setId(id); - aeqConfiguration.setPauseEventProcessing(pauseEventProcessing); if (batchSize != null) { aeqConfiguration.setBatchSize(batchSize + ""); @@ -170,9 +163,6 @@ public class AlterAsyncEventQueueCommand extends SingleGfshCommand implements if (StringUtils.isNotBlank(aeqConfiguration.getMaximumQueueMemory())) { queue.setMaximumQueueMemory(aeqConfiguration.getMaximumQueueMemory()); } - if (aeqConfiguration.isPauseEventProcessing() != null) { - queue.setPauseEventProcessing(aeqConfiguration.isPauseEventProcessing()); - } aeqConfigsHaveBeenUpdated = true; }
