This is an automated email from the ASF dual-hosted git repository.

jensdeppe pushed a commit to branch release/1.9.2
in repository https://gitbox.apache.org/repos/asf/geode.git

commit f11cae9de43362dace7ef0a95b5ab128bd090ef2
Author: Nabarun Nag <[email protected]>
AuthorDate: Fri Sep 13 14:34:52 2019 -0700

    GEODE-7128: APIs and GFSH commands to resume AEQ processing. (#4056)
    
    Co-authored-by: Benjamin Ross <[email protected]>
    Co-authored-by: Eric Shu <[email protected]>
    Co-authored-by: Nabarun Nag <[email protected]>
---
 .../ResumeAsyncEventQueueDispatcherDUnitTest.java  | 91 ++++++++++++++++++++++
 .../cli/commands/CommandAvailabilityIndicator.java |  3 +-
 .../ResumeAsyncEventQueueDispatcherCommand.java    | 67 ++++++++++++++++
 .../ResumeAsyncEventQueueDispatcherFunction.java   | 54 +++++++++++++
 .../management/internal/cli/i18n/CliStrings.java   | 11 +++
 .../sanctioned-geode-core-serializables.txt        |  1 +
 ...ResumeAsyncEventQueueDispatcherCommandTest.java | 71 +++++++++++++++++
 7 files changed, 297 insertions(+), 1 deletion(-)

diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/ResumeAsyncEventQueueDispatcherDUnitTest.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/ResumeAsyncEventQueueDispatcherDUnitTest.java
new file mode 100644
index 0000000..03020c2
--- /dev/null
+++ 
b/geode-core/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/ResumeAsyncEventQueueDispatcherDUnitTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.cli.commands;
+
+import static 
org.apache.geode.management.internal.cli.i18n.CliStrings.RESUME_ASYNCEVENTQUEUE;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.internal.cache.wan.MyAsyncEventListener;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+
+public class ResumeAsyncEventQueueDispatcherDUnitTest {
+
+  public static final String CREATE_COMMAND =
+      "create async-event-queue --listener=" + 
MyAsyncEventListener.class.getName();
+
+  public static final String RESUME_COMMAND = RESUME_ASYNCEVENTQUEUE;
+
+  public static final String LIST_COMMAND = "list async-event-queue";
+
+  @Rule
+  public ClusterStartupRule lsRule = new ClusterStartupRule();
+
+  @Rule
+  public GfshCommandRule gfsh = new GfshCommandRule();
+
+  private static MemberVM locator;
+
+  @Test
+  public void create_sync_event_queue() throws Exception {
+    locator = lsRule.startLocatorVM(0);
+    lsRule.startServerVM(1, locator.getPort());
+    gfsh.connectAndVerify(locator);
+
+    // create an AEQ with start paused set to false to verify proper behavior
+    gfsh.executeAndAssertThat(CREATE_COMMAND + " --id=unpausedqueue 
--pause-event-processing=false")
+        .statusIsSuccess()
+        .tableHasRowCount(1)
+        .tableHasRowWithValues("Member", "Status", "Message", "server-1", 
"OK", "Success");
+
+    // verify our AEQ was created as expected
+    gfsh.executeAndAssertThat(LIST_COMMAND).statusIsSuccess()
+        .tableHasRowCount(1).tableHasRowWithValues("Member", "ID",
+            "Created with paused event processing", "Currently Paused", 
"server-1", "unpausedqueue",
+            "false",
+            "false");
+
+    // Issue the resume command and confirm it reports that the queue is 
already dispatching
+    gfsh.executeAndAssertThat(RESUME_COMMAND + " 
--id=unpausedqueue").statusIsSuccess()
+        .tableHasRowCount(1)
+        .containsOutput("Async Event Queue \"unpausedqueue\" dispatching was 
not paused.");
+
+    // create an AEQ with start paused set so we have a queue to unpause
+    gfsh.executeAndAssertThat(CREATE_COMMAND + " --id=queue 
--pause-event-processing")
+        .statusIsSuccess()
+        .tableHasRowCount(1)
+        .tableHasRowWithValues("Member", "Status", "Message", "server-1", 
"OK", "Success");
+
+    // verify our AEQ was created as expected
+    gfsh.executeAndAssertThat(LIST_COMMAND).statusIsSuccess()
+        .tableHasRowCount(2).tableHasRowWithValues("Member", "ID",
+            "Created with paused event processing", "Currently Paused", 
"server-1", "queue", "true",
+            "true");
+
+    // Issue the resume command and confirm it reports success
+    gfsh.executeAndAssertThat(RESUME_COMMAND + " --id=queue").statusIsSuccess()
+        .tableHasRowCount(1)
+        .containsOutput("Async Event Queue \"queue\" dispatching was resumed 
successfully");
+
+    // list the queue to verify the result
+    gfsh.executeAndAssertThat(LIST_COMMAND).statusIsSuccess()
+        .tableHasRowCount(2).tableHasRowWithValues("Member", "ID",
+            "Created with paused event processing", "Currently Paused", 
"server-1", "queue", "true",
+            "false");
+  }
+}
diff --git 
a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CommandAvailabilityIndicator.java
 
b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CommandAvailabilityIndicator.java
index a2d6049..b0d0a29 100644
--- 
a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CommandAvailabilityIndicator.java
+++ 
b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CommandAvailabilityIndicator.java
@@ -44,7 +44,8 @@ public class CommandAvailabilityIndicator extends GfshCommand 
{
       CliStrings.LIST_ASYNC_EVENT_QUEUES, CliStrings.LIST_REGION, 
CliStrings.DESCRIBE_REGION,
       CliStrings.STATUS_SHARED_CONFIG, CliStrings.CREATE_GATEWAYSENDER,
       CliStrings.START_GATEWAYSENDER, CliStrings.PAUSE_GATEWAYSENDER,
-      CliStrings.RESUME_GATEWAYSENDER, CliStrings.STOP_GATEWAYSENDER,
+      CliStrings.RESUME_GATEWAYSENDER, CliStrings.RESUME_ASYNCEVENTQUEUE,
+      CliStrings.STOP_GATEWAYSENDER,
       CliStrings.CREATE_GATEWAYRECEIVER, CliStrings.START_GATEWAYRECEIVER,
       CliStrings.STOP_GATEWAYRECEIVER, CliStrings.LIST_GATEWAY, 
CliStrings.STATUS_GATEWAYSENDER,
       CliStrings.STATUS_GATEWAYRECEIVER, CliStrings.LOAD_BALANCE_GATEWAYSENDER,
diff --git 
a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/ResumeAsyncEventQueueDispatcherCommand.java
 
b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/ResumeAsyncEventQueueDispatcherCommand.java
new file mode 100644
index 0000000..a6c45d4
--- /dev/null
+++ 
b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/ResumeAsyncEventQueueDispatcherCommand.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.cli.commands;
+
+import java.util.List;
+import java.util.Set;
+
+import org.springframework.shell.core.annotation.CliCommand;
+import org.springframework.shell.core.annotation.CliOption;
+
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.management.cli.ConverterHint;
+import org.apache.geode.management.cli.SingleGfshCommand;
+import org.apache.geode.management.internal.cli.functions.CliFunctionResult;
+import 
org.apache.geode.management.internal.cli.functions.ResumeAsyncEventQueueDispatcherFunction;
+import org.apache.geode.management.internal.cli.i18n.CliStrings;
+import org.apache.geode.management.internal.cli.result.model.ResultModel;
+import org.apache.geode.management.internal.security.ResourceOperation;
+import org.apache.geode.security.ResourcePermission;
+
+
+public class ResumeAsyncEventQueueDispatcherCommand extends SingleGfshCommand {
+
+  @CliCommand(value = CliStrings.RESUME_ASYNCEVENTQUEUE,
+      help = CliStrings.RESUME_ASYNCEVENTQUEUE__HELP)
+  @ResourceOperation(resource = ResourcePermission.Resource.CLUSTER,
+      operation = ResourcePermission.Operation.MANAGE)
+  public ResultModel resumeAsyncEventQueueDispatcher(
+      @CliOption(key = CliStrings.RESUME_ASYNCEVENTQUEUE__ID,
+          mandatory = true, help = 
CliStrings.RESUME_ASYNCEVENTQUEUE__ID__HELP) String queueId,
+
+      @CliOption(key = {CliStrings.GROUP, CliStrings.GROUPS},
+          optionContext = ConverterHint.MEMBERGROUP,
+          help = CliStrings.RESUME_ASYNCEVENTQUEUE__GROUP__HELP) String[] 
onGroup,
+      @CliOption(key = {CliStrings.MEMBER, CliStrings.MEMBERS},
+          optionContext = ConverterHint.MEMBERIDNAME,
+          help = CliStrings.RESUME_ASYNCEVENTQUEUE__MEMBER__HELP) String[] 
onMember) {
+
+    if (queueId != null) {
+      queueId = queueId.trim();
+    }
+
+    Set<DistributedMember> targetMembers = findMembers(onGroup, onMember);
+
+    List<CliFunctionResult> results =
+        executeAndGetFunctionResult(new 
ResumeAsyncEventQueueDispatcherFunction(), queueId,
+            targetMembers);
+
+    return constructResultModel(results);
+  }
+
+  ResultModel constructResultModel(List<CliFunctionResult> results) {
+    return ResultModel.createMemberStatusResult(results);
+  }
+}
diff --git 
a/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/ResumeAsyncEventQueueDispatcherFunction.java
 
b/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/ResumeAsyncEventQueueDispatcherFunction.java
new file mode 100644
index 0000000..1b8a294
--- /dev/null
+++ 
b/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/ResumeAsyncEventQueueDispatcherFunction.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.cli.functions;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.management.cli.CliFunction;
+
+public class ResumeAsyncEventQueueDispatcherFunction extends CliFunction {
+
+  @Override
+  public CliFunctionResult executeFunction(FunctionContext context) {
+    String AEQId = (String) context.getArguments();
+
+    Cache cache = context.getCache();
+    DistributedMember member = 
cache.getDistributedSystem().getDistributedMember();
+
+    AsyncEventQueue queue = cache.getAsyncEventQueue(AEQId);
+
+    if (queue == null) {
+      return new CliFunctionResult(member.getId(), 
CliFunctionResult.StatusState.ERROR,
+          "Async Event Queue \"" + AEQId +
+              " cannot be found");
+    }
+
+    if (queue.isDispatchingPaused()) {
+      queue.resumeEventDispatching();
+
+      return new CliFunctionResult(member.getId(), 
CliFunctionResult.StatusState.OK,
+          "Async Event Queue \"" + AEQId
+              + "\" dispatching was resumed successfully");
+    } else {
+      return new CliFunctionResult(member.getId(), 
CliFunctionResult.StatusState.OK,
+          "Async Event Queue \"" + AEQId
+              + "\" dispatching was not paused.");
+    }
+
+
+  }
+}
diff --git 
a/geode-core/src/main/java/org/apache/geode/management/internal/cli/i18n/CliStrings.java
 
b/geode-core/src/main/java/org/apache/geode/management/internal/cli/i18n/CliStrings.java
index 9c20491..9497da7 100644
--- 
a/geode-core/src/main/java/org/apache/geode/management/internal/cli/i18n/CliStrings.java
+++ 
b/geode-core/src/main/java/org/apache/geode/management/internal/cli/i18n/CliStrings.java
@@ -1917,6 +1917,17 @@ public class CliStrings {
   public static final String RESUME_GATEWAYSENDER__MEMBER__HELP =
       "Name/Id of the member on which to resume the Gateway Sender.";
 
+  /* resume async-event-queue */
+  public static final String RESUME_ASYNCEVENTQUEUE = "resume 
async-event-queue-dispatcher";
+  public static final String RESUME_ASYNCEVENTQUEUE__ID = "id";
+  public static final String RESUME_ASYNCEVENTQUEUE__HELP =
+      "Resume the processing of the events in the Async Event Queue on a 
member or members.";
+  public static final String RESUME_ASYNCEVENTQUEUE__ID__HELP = "ID of the 
Async Event Queue.";
+  public static final String RESUME_ASYNCEVENTQUEUE__GROUP__HELP =
+      "Group(s) of members on which to resume processing of the events.";
+  public static final String RESUME_ASYNCEVENTQUEUE__MEMBER__HELP =
+      "Name/Id of the member on which to resume processing of the events.";
+
   /* 'revoke missing-disk-store' command */
   public static final String REVOKE_MISSING_DISK_STORE = "revoke 
missing-disk-store";
   public static final String REVOKE_MISSING_DISK_STORE__HELP =
diff --git 
a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
 
b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
index 5e6695d..ec5b777 100644
--- 
a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
+++ 
b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
@@ -593,6 +593,7 @@ 
org/apache/geode/management/internal/cli/functions/RegionFunctionArgs,true,22049
 
org/apache/geode/management/internal/cli/functions/RegionFunctionArgs$EvictionAttrs,true,9015454906371076014,evictionAction:java/lang/String,maxEntryCount:java/lang/Integer,maxMemory:java/lang/Integer,objectSizer:java/lang/String
 
org/apache/geode/management/internal/cli/functions/RegionFunctionArgs$ExpirationAttrs,true,1474255033398008063,action:org/apache/geode/cache/ExpirationAction,time:java/lang/Integer
 
org/apache/geode/management/internal/cli/functions/RegionFunctionArgs$PartitionArgs,true,5907052187323280919,partitionResolver:java/lang/String,prColocatedWith:java/lang/String,prLocalMaxMemory:java/lang/Integer,prRecoveryDelay:java/lang/Long,prRedundantCopies:java/lang/Integer,prStartupRecoveryDelay:java/lang/Long,prTotalMaxMemory:java/lang/Long,prTotalNumBuckets:java/lang/Integer
+org/apache/geode/management/internal/cli/functions/ResumeAsyncEventQueueDispatcherFunction,false
 
org/apache/geode/management/internal/cli/functions/ShowMissingDiskStoresFunction,false
 org/apache/geode/management/internal/cli/functions/ShutDownFunction,true,1
 
org/apache/geode/management/internal/cli/functions/SizeExportLogsFunction,true,1
diff --git 
a/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/ResumeAsyncEventQueueDispatcherCommandTest.java
 
b/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/ResumeAsyncEventQueueDispatcherCommandTest.java
new file mode 100644
index 0000000..c52fd2a
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/ResumeAsyncEventQueueDispatcherCommandTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management.internal.cli.commands;
+
+import static 
org.apache.geode.management.internal.cli.i18n.CliStrings.RESUME_ASYNCEVENTQUEUE;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.management.internal.cli.functions.CliFunctionResult;
+import org.apache.geode.test.junit.rules.GfshParserRule;
+
+public class ResumeAsyncEventQueueDispatcherCommandTest {
+
+  public static final String COMMAND = RESUME_ASYNCEVENTQUEUE;
+  @ClassRule
+  public static GfshParserRule gfsh = new GfshParserRule();
+
+  private ResumeAsyncEventQueueDispatcherCommand command;
+
+  @Before
+  public void before() throws Exception {
+    command = spy(ResumeAsyncEventQueueDispatcherCommand.class);
+  }
+
+  @Test
+  public void resumeAsyncEventQueueSuccessful() {
+    String queueId = "queueId";
+    List<CliFunctionResult> functionResults = new ArrayList<>();
+    functionResults
+        .add(new CliFunctionResult("member1", 
CliFunctionResult.StatusState.OK, "SUCCESS"));
+    functionResults
+        .add(new CliFunctionResult("member2", 
CliFunctionResult.StatusState.ERROR, "FAILURE"));
+
+    
doReturn(functionResults).when(command).executeAndGetFunctionResult(isA(Function.class),
+        isA(Object.class), isA(Set.class));
+    doReturn(Collections.emptySet()).when(command).findMembers(any(), any());
+
+    gfsh.executeAndAssertThat(command, COMMAND + " --id=queueId");
+
+    verify(command).executeAndGetFunctionResult(isA(Function.class),
+        isA(String.class), isA(Set.class));
+
+    verify(command).constructResultModel(functionResults);
+  }
+}

Reply via email to