This is an automated email from the ASF dual-hosted git repository. jensdeppe pushed a commit to branch release/1.9.2 in repository https://gitbox.apache.org/repos/asf/geode.git
commit f11cae9de43362dace7ef0a95b5ab128bd090ef2 Author: Nabarun Nag <[email protected]> AuthorDate: Fri Sep 13 14:34:52 2019 -0700 GEODE-7128: APIs and GFSH commands to resume AEQ processing. (#4056) Co-authored-by: Benjamin Ross <[email protected]> Co-authored-by: Eric Shu <[email protected]> Co-authored-by: Nabarun Nag <[email protected]> --- .../ResumeAsyncEventQueueDispatcherDUnitTest.java | 91 ++++++++++++++++++++++ .../cli/commands/CommandAvailabilityIndicator.java | 3 +- .../ResumeAsyncEventQueueDispatcherCommand.java | 67 ++++++++++++++++ .../ResumeAsyncEventQueueDispatcherFunction.java | 54 +++++++++++++ .../management/internal/cli/i18n/CliStrings.java | 11 +++ .../sanctioned-geode-core-serializables.txt | 1 + ...ResumeAsyncEventQueueDispatcherCommandTest.java | 71 +++++++++++++++++ 7 files changed, 297 insertions(+), 1 deletion(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/ResumeAsyncEventQueueDispatcherDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/ResumeAsyncEventQueueDispatcherDUnitTest.java new file mode 100644 index 0000000..03020c2 --- /dev/null +++ b/geode-core/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/ResumeAsyncEventQueueDispatcherDUnitTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.management.internal.cli.commands; + +import static org.apache.geode.management.internal.cli.i18n.CliStrings.RESUME_ASYNCEVENTQUEUE; + +import org.junit.Rule; +import org.junit.Test; + +import org.apache.geode.internal.cache.wan.MyAsyncEventListener; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; +import org.apache.geode.test.junit.rules.GfshCommandRule; + +public class ResumeAsyncEventQueueDispatcherDUnitTest { + + public static final String CREATE_COMMAND = + "create async-event-queue --listener=" + MyAsyncEventListener.class.getName(); + + public static final String RESUME_COMMAND = RESUME_ASYNCEVENTQUEUE; + + public static final String LIST_COMMAND = "list async-event-queue"; + + @Rule + public ClusterStartupRule lsRule = new ClusterStartupRule(); + + @Rule + public GfshCommandRule gfsh = new GfshCommandRule(); + + private static MemberVM locator; + + @Test + public void create_sync_event_queue() throws Exception { + locator = lsRule.startLocatorVM(0); + lsRule.startServerVM(1, locator.getPort()); + gfsh.connectAndVerify(locator); + + // create an AEQ with start paused set to false to verify proper behavior + gfsh.executeAndAssertThat(CREATE_COMMAND + " --id=unpausedqueue --pause-event-processing=false") + .statusIsSuccess() + .tableHasRowCount(1) + .tableHasRowWithValues("Member", "Status", "Message", "server-1", "OK", "Success"); + + // verify our AEQ was created as expected + gfsh.executeAndAssertThat(LIST_COMMAND).statusIsSuccess() + .tableHasRowCount(1).tableHasRowWithValues("Member", "ID", + "Created with paused event processing", "Currently Paused", "server-1", "unpausedqueue", + "false", + "false"); + + // Issue the resume command and confirm it reports that the queue is already dispatching + gfsh.executeAndAssertThat(RESUME_COMMAND + " --id=unpausedqueue").statusIsSuccess() + .tableHasRowCount(1) + .containsOutput("Async Event Queue \"unpausedqueue\" dispatching was not paused."); + + // create an AEQ with start paused set so we have a queue to unpause + gfsh.executeAndAssertThat(CREATE_COMMAND + " --id=queue --pause-event-processing") + .statusIsSuccess() + .tableHasRowCount(1) + .tableHasRowWithValues("Member", "Status", "Message", "server-1", "OK", "Success"); + + // verify our AEQ was created as expected + gfsh.executeAndAssertThat(LIST_COMMAND).statusIsSuccess() + .tableHasRowCount(2).tableHasRowWithValues("Member", "ID", + "Created with paused event processing", "Currently Paused", "server-1", "queue", "true", + "true"); + + // Issue the resume command and confirm it reports success + gfsh.executeAndAssertThat(RESUME_COMMAND + " --id=queue").statusIsSuccess() + .tableHasRowCount(1) + .containsOutput("Async Event Queue \"queue\" dispatching was resumed successfully"); + + // list the queue to verify the result + gfsh.executeAndAssertThat(LIST_COMMAND).statusIsSuccess() + .tableHasRowCount(2).tableHasRowWithValues("Member", "ID", + "Created with paused event processing", "Currently Paused", "server-1", "queue", "true", + "false"); + } +} diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CommandAvailabilityIndicator.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CommandAvailabilityIndicator.java index a2d6049..b0d0a29 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CommandAvailabilityIndicator.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CommandAvailabilityIndicator.java @@ -44,7 +44,8 @@ public class CommandAvailabilityIndicator extends GfshCommand { CliStrings.LIST_ASYNC_EVENT_QUEUES, CliStrings.LIST_REGION, CliStrings.DESCRIBE_REGION, CliStrings.STATUS_SHARED_CONFIG, CliStrings.CREATE_GATEWAYSENDER, CliStrings.START_GATEWAYSENDER, CliStrings.PAUSE_GATEWAYSENDER, - CliStrings.RESUME_GATEWAYSENDER, CliStrings.STOP_GATEWAYSENDER, + CliStrings.RESUME_GATEWAYSENDER, CliStrings.RESUME_ASYNCEVENTQUEUE, + CliStrings.STOP_GATEWAYSENDER, CliStrings.CREATE_GATEWAYRECEIVER, CliStrings.START_GATEWAYRECEIVER, CliStrings.STOP_GATEWAYRECEIVER, CliStrings.LIST_GATEWAY, CliStrings.STATUS_GATEWAYSENDER, CliStrings.STATUS_GATEWAYRECEIVER, CliStrings.LOAD_BALANCE_GATEWAYSENDER, diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/ResumeAsyncEventQueueDispatcherCommand.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/ResumeAsyncEventQueueDispatcherCommand.java new file mode 100644 index 0000000..a6c45d4 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/ResumeAsyncEventQueueDispatcherCommand.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.management.internal.cli.commands; + +import java.util.List; +import java.util.Set; + +import org.springframework.shell.core.annotation.CliCommand; +import org.springframework.shell.core.annotation.CliOption; + +import org.apache.geode.distributed.DistributedMember; +import org.apache.geode.management.cli.ConverterHint; +import org.apache.geode.management.cli.SingleGfshCommand; +import org.apache.geode.management.internal.cli.functions.CliFunctionResult; +import org.apache.geode.management.internal.cli.functions.ResumeAsyncEventQueueDispatcherFunction; +import org.apache.geode.management.internal.cli.i18n.CliStrings; +import org.apache.geode.management.internal.cli.result.model.ResultModel; +import org.apache.geode.management.internal.security.ResourceOperation; +import org.apache.geode.security.ResourcePermission; + + +public class ResumeAsyncEventQueueDispatcherCommand extends SingleGfshCommand { + + @CliCommand(value = CliStrings.RESUME_ASYNCEVENTQUEUE, + help = CliStrings.RESUME_ASYNCEVENTQUEUE__HELP) + @ResourceOperation(resource = ResourcePermission.Resource.CLUSTER, + operation = ResourcePermission.Operation.MANAGE) + public ResultModel resumeAsyncEventQueueDispatcher( + @CliOption(key = CliStrings.RESUME_ASYNCEVENTQUEUE__ID, + mandatory = true, help = CliStrings.RESUME_ASYNCEVENTQUEUE__ID__HELP) String queueId, + + @CliOption(key = {CliStrings.GROUP, CliStrings.GROUPS}, + optionContext = ConverterHint.MEMBERGROUP, + help = CliStrings.RESUME_ASYNCEVENTQUEUE__GROUP__HELP) String[] onGroup, + @CliOption(key = {CliStrings.MEMBER, CliStrings.MEMBERS}, + optionContext = ConverterHint.MEMBERIDNAME, + help = CliStrings.RESUME_ASYNCEVENTQUEUE__MEMBER__HELP) String[] onMember) { + + if (queueId != null) { + queueId = queueId.trim(); + } + + Set<DistributedMember> targetMembers = findMembers(onGroup, onMember); + + List<CliFunctionResult> results = + executeAndGetFunctionResult(new ResumeAsyncEventQueueDispatcherFunction(), queueId, + targetMembers); + + return constructResultModel(results); + } + + ResultModel constructResultModel(List<CliFunctionResult> results) { + return ResultModel.createMemberStatusResult(results); + } +} diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/ResumeAsyncEventQueueDispatcherFunction.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/ResumeAsyncEventQueueDispatcherFunction.java new file mode 100644 index 0000000..1b8a294 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/ResumeAsyncEventQueueDispatcherFunction.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.management.internal.cli.functions; + +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.asyncqueue.AsyncEventQueue; +import org.apache.geode.cache.execute.FunctionContext; +import org.apache.geode.distributed.DistributedMember; +import org.apache.geode.management.cli.CliFunction; + +public class ResumeAsyncEventQueueDispatcherFunction extends CliFunction { + + @Override + public CliFunctionResult executeFunction(FunctionContext context) { + String AEQId = (String) context.getArguments(); + + Cache cache = context.getCache(); + DistributedMember member = cache.getDistributedSystem().getDistributedMember(); + + AsyncEventQueue queue = cache.getAsyncEventQueue(AEQId); + + if (queue == null) { + return new CliFunctionResult(member.getId(), CliFunctionResult.StatusState.ERROR, + "Async Event Queue \"" + AEQId + + " cannot be found"); + } + + if (queue.isDispatchingPaused()) { + queue.resumeEventDispatching(); + + return new CliFunctionResult(member.getId(), CliFunctionResult.StatusState.OK, + "Async Event Queue \"" + AEQId + + "\" dispatching was resumed successfully"); + } else { + return new CliFunctionResult(member.getId(), CliFunctionResult.StatusState.OK, + "Async Event Queue \"" + AEQId + + "\" dispatching was not paused."); + } + + + } +} diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/i18n/CliStrings.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/i18n/CliStrings.java index 9c20491..9497da7 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/i18n/CliStrings.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/i18n/CliStrings.java @@ -1917,6 +1917,17 @@ public class CliStrings { public static final String RESUME_GATEWAYSENDER__MEMBER__HELP = "Name/Id of the member on which to resume the Gateway Sender."; + /* resume async-event-queue */ + public static final String RESUME_ASYNCEVENTQUEUE = "resume async-event-queue-dispatcher"; + public static final String RESUME_ASYNCEVENTQUEUE__ID = "id"; + public static final String RESUME_ASYNCEVENTQUEUE__HELP = + "Resume the processing of the events in the Async Event Queue on a member or members."; + public static final String RESUME_ASYNCEVENTQUEUE__ID__HELP = "ID of the Async Event Queue."; + public static final String RESUME_ASYNCEVENTQUEUE__GROUP__HELP = + "Group(s) of members on which to resume processing of the events."; + public static final String RESUME_ASYNCEVENTQUEUE__MEMBER__HELP = + "Name/Id of the member on which to resume processing of the events."; + /* 'revoke missing-disk-store' command */ public static final String REVOKE_MISSING_DISK_STORE = "revoke missing-disk-store"; public static final String REVOKE_MISSING_DISK_STORE__HELP = diff --git a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt index 5e6695d..ec5b777 100644 --- a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt +++ b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt @@ -593,6 +593,7 @@ org/apache/geode/management/internal/cli/functions/RegionFunctionArgs,true,22049 org/apache/geode/management/internal/cli/functions/RegionFunctionArgs$EvictionAttrs,true,9015454906371076014,evictionAction:java/lang/String,maxEntryCount:java/lang/Integer,maxMemory:java/lang/Integer,objectSizer:java/lang/String org/apache/geode/management/internal/cli/functions/RegionFunctionArgs$ExpirationAttrs,true,1474255033398008063,action:org/apache/geode/cache/ExpirationAction,time:java/lang/Integer org/apache/geode/management/internal/cli/functions/RegionFunctionArgs$PartitionArgs,true,5907052187323280919,partitionResolver:java/lang/String,prColocatedWith:java/lang/String,prLocalMaxMemory:java/lang/Integer,prRecoveryDelay:java/lang/Long,prRedundantCopies:java/lang/Integer,prStartupRecoveryDelay:java/lang/Long,prTotalMaxMemory:java/lang/Long,prTotalNumBuckets:java/lang/Integer +org/apache/geode/management/internal/cli/functions/ResumeAsyncEventQueueDispatcherFunction,false org/apache/geode/management/internal/cli/functions/ShowMissingDiskStoresFunction,false org/apache/geode/management/internal/cli/functions/ShutDownFunction,true,1 org/apache/geode/management/internal/cli/functions/SizeExportLogsFunction,true,1 diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/ResumeAsyncEventQueueDispatcherCommandTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/ResumeAsyncEventQueueDispatcherCommandTest.java new file mode 100644 index 0000000..c52fd2a --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/ResumeAsyncEventQueueDispatcherCommandTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.management.internal.cli.commands; + +import static org.apache.geode.management.internal.cli.i18n.CliStrings.RESUME_ASYNCEVENTQUEUE; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; + +import org.apache.geode.cache.execute.Function; +import org.apache.geode.management.internal.cli.functions.CliFunctionResult; +import org.apache.geode.test.junit.rules.GfshParserRule; + +public class ResumeAsyncEventQueueDispatcherCommandTest { + + public static final String COMMAND = RESUME_ASYNCEVENTQUEUE; + @ClassRule + public static GfshParserRule gfsh = new GfshParserRule(); + + private ResumeAsyncEventQueueDispatcherCommand command; + + @Before + public void before() throws Exception { + command = spy(ResumeAsyncEventQueueDispatcherCommand.class); + } + + @Test + public void resumeAsyncEventQueueSuccessful() { + String queueId = "queueId"; + List<CliFunctionResult> functionResults = new ArrayList<>(); + functionResults + .add(new CliFunctionResult("member1", CliFunctionResult.StatusState.OK, "SUCCESS")); + functionResults + .add(new CliFunctionResult("member2", CliFunctionResult.StatusState.ERROR, "FAILURE")); + + doReturn(functionResults).when(command).executeAndGetFunctionResult(isA(Function.class), + isA(Object.class), isA(Set.class)); + doReturn(Collections.emptySet()).when(command).findMembers(any(), any()); + + gfsh.executeAndAssertThat(command, COMMAND + " --id=queueId"); + + verify(command).executeAndGetFunctionResult(isA(Function.class), + isA(String.class), isA(Set.class)); + + verify(command).constructResultModel(functionResults); + } +}
