This is an automated email from the ASF dual-hosted git repository.
alberto pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new e2abb65 GEODE-9807: gfsh gateway sender stop in parallel (like start)
(#7108)
e2abb65 is described below
commit e2abb6579e553fafd7b48dac0fed82af27d178bf
Author: Alberto Gomez <[email protected]>
AuthorDate: Thu Jan 20 08:30:56 2022 +0100
GEODE-9807: gfsh gateway sender stop in parallel (like start) (#7108)
* GEODE-9807: gfsh Gateway sender stop in parallel (like start)
* GEODE-9807: Add unit tests
* GEODE-9807: Refactor to better test after review.
* GEODE-9807: Changes after review.
* GEODE-9807: Removed delegate class to simplify as suggested by review
* GEODE-9807: Small changes after review
* GEODE-9807: stub some class under test methods instead of passing
dependencies in constructor
---
.../geode/management/internal/i18n/CliStrings.java | 6 +-
geode-gfsh/build.gradle | 1 +
.../cli/commands/StartGatewaySenderCommand.java | 2 +-
.../cli/commands/StopGatewaySenderCommand.java | 112 ++++++++++-----
.../StopGatewaySenderOnMemberWithBeanImpl.java | 65 +++++++++
.../cli/commands/StopGatewaySenderCommandTest.java | 156 +++++++++++++++++++++
.../StopGatewaySenderOnMemberWithBeanImplTest.java | 122 ++++++++++++++++
.../StopGatewaySenderCommandDUnitTest.java | 131 +++++++++++++++--
8 files changed, 547 insertions(+), 48 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/management/internal/i18n/CliStrings.java
b/geode-core/src/main/java/org/apache/geode/management/internal/i18n/CliStrings.java
index 2372aeb..ba31f14 100644
---
a/geode-core/src/main/java/org/apache/geode/management/internal/i18n/CliStrings.java
+++
b/geode-core/src/main/java/org/apache/geode/management/internal/i18n/CliStrings.java
@@ -3013,10 +3013,14 @@ public class CliStrings {
"GatewayReceiver is not running on member {0}";
public static final String GATEWAYS_ARE_NOT_AVAILABLE_IN_CLUSTER =
"GatewaySenders or GatewayReceivers are not available in cluster";
- public static final String GATEWAY_SENDER_0_COULD_NOT_BE_INVOKED_DUE_TO_1 =
+ public static final String
GATEWAY_SENDER_START_0_COULD_NOT_BE_INVOKED_DUE_TO_1 =
"Could not invoke start gateway sender {0} operation on members due to
{1}";
public static final String
GATEWAY_SENDER_0_COULD_NOT_BE_STARTED_ON_MEMBER_DUE_TO_1 =
"Could not start gateway sender {0} on member due to {1}";
+ public static final String
GATEWAY_SENDER_STOP_0_COULD_NOT_BE_INVOKED_DUE_TO_1 =
+ "Could not invoke stop gateway sender {0} operation on members due to
{1}";
+ public static final String
GATEWAY_SENDER_0_COULD_NOT_BE_STOPPED_ON_MEMBER_DUE_TO_1 =
+ "Could not stop gateway sender {0} on member due to {1}";
public static final String GATEWAY_SENDER_0_IS_UPDATED_ON_MEMBER_1 =
"GatewaySender {0} is updated on member {1}";
/* end gateway command messages */
diff --git a/geode-gfsh/build.gradle b/geode-gfsh/build.gradle
index f101331..fb9bfc5 100644
--- a/geode-gfsh/build.gradle
+++ b/geode-gfsh/build.gradle
@@ -43,6 +43,7 @@ dependencies {
testImplementation('com.github.stephenc.findbugs:findbugs-annotations')
testImplementation('org.springframework:spring-test')
testImplementation(project(':geode-junit'))
+ testImplementation('pl.pragmatists:JUnitParams')
integrationTestImplementation(project(':geode-dunit'))
integrationTestImplementation('pl.pragmatists:JUnitParams')
diff --git
a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/StartGatewaySenderCommand.java
b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/StartGatewaySenderCommand.java
index 5bdbb8c..996e85b 100644
---
a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/StartGatewaySenderCommand.java
+++
b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/StartGatewaySenderCommand.java
@@ -129,7 +129,7 @@ public class StartGatewaySenderCommand extends GfshCommand {
futures = execService.invokeAll(callables);
} catch (InterruptedException ite) {
return ResultModel.createError(
-
CliStrings.format(CliStrings.GATEWAY_SENDER_0_COULD_NOT_BE_INVOKED_DUE_TO_1, id,
+
CliStrings.format(CliStrings.GATEWAY_SENDER_START_0_COULD_NOT_BE_INVOKED_DUE_TO_1,
id,
ite.getMessage()));
} finally {
execService.shutdown();
diff --git
a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/StopGatewaySenderCommand.java
b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/StopGatewaySenderCommand.java
index 035c419..503f30d3 100644
---
a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/StopGatewaySenderCommand.java
+++
b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/StopGatewaySenderCommand.java
@@ -15,16 +15,23 @@
package org.apache.geode.management.internal.cli.commands;
-import java.util.Set;
+import static
org.apache.geode.logging.internal.executors.LoggingExecutors.newCachedThreadPool;
-import javax.management.ObjectName;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import org.springframework.shell.core.annotation.CliCommand;
import org.springframework.shell.core.annotation.CliOption;
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.cache.Cache;
import org.apache.geode.distributed.DistributedMember;
-import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.management.GatewaySenderMXBean;
import org.apache.geode.management.cli.CliMetaData;
import org.apache.geode.management.cli.ConverterHint;
import org.apache.geode.management.cli.GfshCommand;
@@ -36,7 +43,24 @@ import
org.apache.geode.management.internal.security.ResourceOperation;
import org.apache.geode.security.ResourcePermission;
public class StopGatewaySenderCommand extends GfshCommand {
+ private final ExecutorService executorService;
+ private final StopGatewaySenderOnMember stopperOnMember;
+
+ @SuppressWarnings("unused") // invoked by spring shell
+ public StopGatewaySenderCommand() {
+ this(newCachedThreadPool("Stop Sender Command Thread ", true),
+ new StopGatewaySenderOnMemberWithBeanImpl());
+ }
+
+ @VisibleForTesting
+ StopGatewaySenderCommand(
+ ExecutorService executorService,
+ StopGatewaySenderOnMember stopperOnMember) {
+ this.executorService = executorService;
+ this.stopperOnMember = stopperOnMember;
+ }
+ @SuppressWarnings("unused") // invoked by spring shell
@CliCommand(value = CliStrings.STOP_GATEWAYSENDER, help =
CliStrings.STOP_GATEWAYSENDER__HELP)
@CliMetaData(relatedTopic = CliStrings.TOPIC_GEODE_WAN)
@ResourceOperation(resource = ResourcePermission.Resource.CLUSTER,
@@ -53,50 +77,66 @@ public class StopGatewaySenderCommand extends GfshCommand {
optionContext = ConverterHint.MEMBERIDNAME,
help = CliStrings.STOP_GATEWAYSENDER__MEMBER__HELP) String[]
onMember) {
- if (senderId != null) {
- senderId = senderId.trim();
+ Set<DistributedMember> dsMembers = findMembers(onGroup, onMember);
+
+ if (dsMembers.isEmpty()) {
+ return ResultModel.createError(CliStrings.NO_MEMBERS_FOUND_MESSAGE);
}
- InternalCache cache = (InternalCache) getCache();
- SystemManagementService service = getManagementService();
+ return executeStopGatewaySender(senderId.trim(), getCache(), dsMembers);
+ }
+
+ public ResultModel executeStopGatewaySender(String id, Cache cache,
+ Set<DistributedMember> dsMembers) {
+ List<DistributedMember> dsMembersList = new ArrayList<>(dsMembers);
+ List<Callable<List<String>>> callables = new ArrayList<>();
- GatewaySenderMXBean bean;
+ for (final DistributedMember member : dsMembersList) {
+ callables.add(() -> stopperOnMember
+ .executeStopGatewaySenderOnMember(id,
+ cache, getManagementService(), member));
+ }
- Set<DistributedMember> dsMembers = findMembers(onGroup, onMember);
- if (dsMembers.isEmpty()) {
- return ResultModel.createError(CliStrings.NO_MEMBERS_FOUND_MESSAGE);
+ List<Future<List<String>>> futures;
+ try {
+ futures = executorService.invokeAll(callables);
+ } catch (InterruptedException ite) {
+ Thread.currentThread().interrupt();
+ return ResultModel.createError(
+
CliStrings.format(CliStrings.GATEWAY_SENDER_STOP_0_COULD_NOT_BE_INVOKED_DUE_TO_1,
id,
+ ite.getMessage()));
+ } finally {
+ executorService.shutdown();
}
+ return buildResultModelFromMembersResponses(id, dsMembersList, futures);
+ }
+
+ private ResultModel buildResultModelFromMembersResponses(String id,
+ List<DistributedMember> dsMembers, List<Future<List<String>>> futures) {
ResultModel resultModel = new ResultModel();
TabularResultModel resultData =
resultModel.addTable(CliStrings.STOP_GATEWAYSENDER);
- for (DistributedMember member : dsMembers) {
- if
(cache.getDistributedSystem().getDistributedMember().getId().equals(member.getId()))
{
- bean = service.getLocalGatewaySenderMXBean(senderId);
- } else {
- ObjectName objectName = service.getGatewaySenderMBeanName(member,
senderId);
- bean = service.getMBeanProxy(objectName, GatewaySenderMXBean.class);
- }
- if (bean != null) {
- if (bean.isRunning()) {
- bean.stop();
- resultData.addMemberStatusResultRow(member.getId(),
- CliStrings.GATEWAY_OK, CliStrings.format(
- CliStrings.GATEWAY_SENDER_0_IS_STOPPED_ON_MEMBER_1,
senderId, member.getId()));
-
- } else {
- resultData.addMemberStatusResultRow(member.getId(),
- CliStrings.GATEWAY_ERROR,
-
CliStrings.format(CliStrings.GATEWAY_SENDER_0_IS_NOT_RUNNING_ON_MEMBER_1,
senderId,
- member.getId()));
- }
- } else {
+ Iterator<DistributedMember> memberIterator = dsMembers.iterator();
+ for (Future<List<String>> future : futures) {
+ DistributedMember member = memberIterator.next();
+ List<String> memberStatus;
+ try {
+ memberStatus = future.get();
+ resultData.addMemberStatusResultRow(memberStatus.get(0),
+ memberStatus.get(1), memberStatus.get(2));
+ } catch (InterruptedException | ExecutionException ite) {
resultData.addMemberStatusResultRow(member.getId(),
CliStrings.GATEWAY_ERROR,
-
CliStrings.format(CliStrings.GATEWAY_SENDER_0_IS_NOT_AVAILABLE_ON_MEMBER_1,
senderId,
- member.getId()));
+
CliStrings.format(CliStrings.GATEWAY_SENDER_0_COULD_NOT_BE_STOPPED_ON_MEMBER_DUE_TO_1,
+ id, ite.getMessage()));
}
}
-
return resultModel;
}
+
+ @FunctionalInterface
+ interface StopGatewaySenderOnMember {
+ List<String> executeStopGatewaySenderOnMember(String id, Cache cache,
+ SystemManagementService managementService, DistributedMember member);
+ }
}
diff --git
a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/StopGatewaySenderOnMemberWithBeanImpl.java
b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/StopGatewaySenderOnMemberWithBeanImpl.java
new file mode 100644
index 0000000..2582545
--- /dev/null
+++
b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/StopGatewaySenderOnMemberWithBeanImpl.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management.internal.cli.commands;
+
+import java.util.ArrayList;
+
+import javax.management.ObjectName;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.management.GatewaySenderMXBean;
+import org.apache.geode.management.internal.SystemManagementService;
+import org.apache.geode.management.internal.i18n.CliStrings;
+
+class StopGatewaySenderOnMemberWithBeanImpl
+ implements StopGatewaySenderCommand.StopGatewaySenderOnMember {
+
+ public ArrayList<String> executeStopGatewaySenderOnMember(String id, Cache
cache,
+ SystemManagementService managementService, DistributedMember member) {
+ GatewaySenderMXBean bean;
+ ArrayList<String> statusList = new ArrayList<>();
+ if
(cache.getDistributedSystem().getDistributedMember().getId().equals(member.getId()))
{
+ bean = managementService.getLocalGatewaySenderMXBean(id);
+ } else {
+ ObjectName objectName =
managementService.getGatewaySenderMBeanName(member, id);
+ bean = managementService.getMBeanProxy(objectName,
GatewaySenderMXBean.class);
+ }
+
+ if (bean == null) {
+ statusList.add(member.getId());
+ statusList.add(CliStrings.GATEWAY_ERROR);
+
statusList.add(CliStrings.format(CliStrings.GATEWAY_SENDER_0_IS_NOT_AVAILABLE_ON_MEMBER_1,
+ id, member.getId()));
+ return statusList;
+ }
+
+ if (!bean.isRunning()) {
+ statusList.add(member.getId());
+ statusList.add(CliStrings.GATEWAY_ERROR);
+ statusList.add(CliStrings.format(
+ CliStrings.GATEWAY_SENDER_0_IS_NOT_RUNNING_ON_MEMBER_1, id,
member.getId()));
+ return statusList;
+ }
+
+ bean.stop();
+ statusList.add(member.getId());
+ statusList.add(CliStrings.GATEWAY_OK);
+
statusList.add(CliStrings.format(CliStrings.GATEWAY_SENDER_0_IS_STOPPED_ON_MEMBER_1,
id,
+ member.getId()));
+ return statusList;
+ }
+}
diff --git
a/geode-gfsh/src/test/java/org/apache/geode/management/internal/cli/commands/StopGatewaySenderCommandTest.java
b/geode-gfsh/src/test/java/org/apache/geode/management/internal/cli/commands/StopGatewaySenderCommandTest.java
new file mode 100644
index 0000000..e7de7f2
--- /dev/null
+++
b/geode-gfsh/src/test/java/org/apache/geode/management/internal/cli/commands/StopGatewaySenderCommandTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management.internal.cli.commands;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.management.internal.SystemManagementService;
+import org.apache.geode.management.internal.cli.result.model.ResultModel;
+import
org.apache.geode.management.internal.cli.result.model.TabularResultModel;
+import org.apache.geode.management.internal.i18n.CliStrings;
+import org.apache.geode.test.junit.rules.GfshParserRule;
+
+public class StopGatewaySenderCommandTest {
+ @ClassRule
+ public static GfshParserRule gfsh = new GfshParserRule();
+
+ private final String senderId = "sender1";
+ private Cache cache;
+ private Set<DistributedMember> members;
+ private ExecutorService executorService;
+ private SystemManagementService managementService;
+ private StopGatewaySenderCommand.StopGatewaySenderOnMember stopperOnMember;
+
+ @Before
+ public void setUp() {
+ cache = mock(Cache.class);
+ members =
+ Stream.generate(() -> mock(DistributedMember.class)).limit(3)
+ .collect(Collectors.toSet());
+
+ executorService = mock(ExecutorService.class);
+ managementService = mock(SystemManagementService.class);
+ stopperOnMember =
mock(StopGatewaySenderCommand.StopGatewaySenderOnMember.class);
+ }
+
+ @Test
+ public void whenMissingSenderIdCommandReturnsInvalidCommandError() {
+ StopGatewaySenderCommand command = new StopGatewaySenderCommand();
+ gfsh.executeAndAssertThat(command, "stop gateway-sender")
+ .statusIsError().containsOutput("Invalid command");
+ }
+
+ @Test
+ public void whenNoMembersCommandReturnsNoMembersError() {
+ // arrange
+ Set<DistributedMember> emptySet = new HashSet<>();
+ StopGatewaySenderCommand command =
+ spy(new StopGatewaySenderCommand(executorService, stopperOnMember));
+
+ doReturn(emptySet).when(command).findMembers(any(), any());
+
+ // act and assert
+ gfsh.executeAndAssertThat(command, "stop gateway-sender --id=sender1")
+ .statusIsError().containsOutput("No Members Found");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void
stopGatewaySenderStartsOneThreadPerMemberAndBuildsOutputAccordingToFuturesOutput()
+ throws InterruptedException, ExecutionException {
+ // arrange
+ String gatewaySenderIsStoppedMsg = "GatewaySender ln is stopped on member";
+ List<Future<List<String>>> futures = new ArrayList<>();
+ for (int memberIndex = 0; memberIndex < members.size(); memberIndex++) {
+ Future<List<String>> future = mock(Future.class);
+ List<String> list = Arrays.asList("member" + memberIndex, "OK",
gatewaySenderIsStoppedMsg);
+ doReturn(list).when(future).get();
+ futures.add(future);
+ }
+ doReturn(futures).when(executorService).invokeAll(any());
+ StopGatewaySenderCommand command =
+ spy(new StopGatewaySenderCommand(executorService, stopperOnMember));
+ doReturn(members).when(command).findMembers(any(), any());
+ doReturn(managementService).when(command).getManagementService();
+
+ // act
+ ResultModel result = command.executeStopGatewaySender(senderId, cache,
members);
+
+ // assert
+ assertThat(result.isSuccessful()).isTrue();
+
+ TabularResultModel resultData =
result.getTableSection(CliStrings.STOP_GATEWAYSENDER);
+ List<String> member = resultData.getValuesInColumn("Member");
+ assertThat(member).containsExactlyInAnyOrder("member0", "member1",
"member2");
+ List<String> status = resultData.getValuesInColumn("Result");
+ assertThat(status).containsExactlyInAnyOrder("OK", "OK", "OK");
+ List<String> message = resultData.getValuesInColumn("Message");
+ assertThat(message).containsExactlyInAnyOrder(gatewaySenderIsStoppedMsg,
+ gatewaySenderIsStoppedMsg, gatewaySenderIsStoppedMsg);
+
+ ArgumentCaptor<Collection> callablesCaptor =
+ ArgumentCaptor.forClass(Collection.class);
+ verify(executorService, times(1)).invokeAll((callablesCaptor.capture()));
+ assertThat(callablesCaptor.getValue().size()).isEqualTo(members.size());
+ verify(executorService, times(1)).shutdown();
+ }
+
+ @Test
+ public void stopGatewaySenderInterruptedReturnsError() throws
InterruptedException {
+ // arrange
+ InterruptedException exception = new InterruptedException("interruption2");
+ doThrow(exception).when(executorService).invokeAll(any());
+ StopGatewaySenderCommand command =
+ spy(new StopGatewaySenderCommand(executorService, stopperOnMember));
+ doReturn(members).when(command).findMembers(any(), any());
+ doReturn(managementService).when(command).getManagementService();
+
+ // act
+ ResultModel result = command.executeStopGatewaySender(senderId, cache,
members);
+
+ // assert
+ assertThat(result.isSuccessful()).isFalse();
+ assertThat(result.getInfoSection("info").getContent().get(0)).isEqualTo(
+ "Could not invoke stop gateway sender sender1 operation on members due
to interruption2");
+ }
+
+}
diff --git
a/geode-gfsh/src/test/java/org/apache/geode/management/internal/cli/commands/StopGatewaySenderOnMemberWithBeanImplTest.java
b/geode-gfsh/src/test/java/org/apache/geode/management/internal/cli/commands/StopGatewaySenderOnMemberWithBeanImplTest.java
new file mode 100644
index 0000000..3c3dfa1
--- /dev/null
+++
b/geode-gfsh/src/test/java/org/apache/geode/management/internal/cli/commands/StopGatewaySenderOnMemberWithBeanImplTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management.internal.cli.commands;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+
+import junitparams.Parameters;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.DistributedSystem;
+import org.apache.geode.management.GatewaySenderMXBean;
+import org.apache.geode.management.internal.SystemManagementService;
+import org.apache.geode.test.junit.runners.GeodeParamsRunner;
+
+@RunWith(GeodeParamsRunner.class)
+public class StopGatewaySenderOnMemberWithBeanImplTest {
+ String senderId = "sender1";
+ String memberId = "member1";
+ String remoteMemberId = "remoteMember1";
+ Cache cache;
+ SystemManagementService managementService;
+ DistributedMember distributedMember;
+ DistributedMember remoteDistributedMember;
+ DistributedSystem distributedSystem;
+
+ @Before
+ public void setUp() {
+ cache = mock(Cache.class);
+ managementService = mock(SystemManagementService.class);
+ distributedMember = mock(DistributedMember.class);
+ remoteDistributedMember = mock(DistributedMember.class);
+ when(distributedMember.getId()).thenReturn(memberId);
+ when(remoteDistributedMember.getId()).thenReturn(remoteMemberId);
+ distributedSystem = mock(DistributedSystem.class);
+ doReturn(distributedSystem).when(cache).getDistributedSystem();
+ }
+
+ @Test
+ @Parameters({"true", "false"})
+ public void executeStopGatewaySenderOnMemberNotRunningReturnsNotRunningError(
+ boolean isLocalMember) {
+ GatewaySenderMXBean gatewaySenderMXBean =
gatewaySenderMXBean(isLocalMember, false);
+ when(gatewaySenderMXBean.isRunning()).thenReturn(false);
+
+ StopGatewaySenderCommand.StopGatewaySenderOnMember stopperWithBean =
+ new StopGatewaySenderOnMemberWithBeanImpl();
+
+ List<String> result =
stopperWithBean.executeStopGatewaySenderOnMember(senderId, cache,
+ managementService, distributedMember);
+ assertThat(result).containsExactly(memberId, "Error",
+ "GatewaySender sender1 is not running on member " + memberId + ".");
+ }
+
+ @Test
+ @Parameters({"true", "false"})
+ public void
executeStopGatewaySenderOnMemberNotAvailableReturnsNotAvailableError(
+ boolean isLocalMember) {
+ gatewaySenderMXBean(isLocalMember, true);
+
+ StopGatewaySenderCommand.StopGatewaySenderOnMember stopperWithBean =
+ new StopGatewaySenderOnMemberWithBeanImpl();
+
+ List<String> result =
stopperWithBean.executeStopGatewaySenderOnMember(senderId, cache,
+ managementService, distributedMember);
+
+ assertThat(result).containsExactly(memberId, "Error",
+ "GatewaySender sender1 is not available on member " + memberId);
+ }
+
+ @Test
+ @Parameters({"true", "false"})
+ public void executeStopGatewaySenderOnMemberRunningReturnsOk(boolean
isLocalMember) {
+ GatewaySenderMXBean gatewaySenderMXBean =
gatewaySenderMXBean(isLocalMember, false);
+ when(gatewaySenderMXBean.isRunning()).thenReturn(true);
+
+ StopGatewaySenderCommand.StopGatewaySenderOnMember stopperWithBean =
+ new StopGatewaySenderOnMemberWithBeanImpl();
+
+ List<String> result =
stopperWithBean.executeStopGatewaySenderOnMember(senderId, cache,
+ managementService, distributedMember);
+
+ assertThat(result).containsExactly(memberId, "OK",
+ "GatewaySender sender1 is stopped on member " + memberId);
+ }
+
+ private GatewaySenderMXBean gatewaySenderMXBean(boolean isLocalMember,
boolean mustBeNull) {
+ GatewaySenderMXBean gatewaySenderMXBean = mustBeNull
+ ? null
+ : mock(GatewaySenderMXBean.class);
+ if (isLocalMember) {
+
doReturn(distributedMember).when(distributedSystem).getDistributedMember();
+
doReturn(gatewaySenderMXBean).when(managementService).getLocalGatewaySenderMXBean(senderId);
+ } else {
+
doReturn(remoteDistributedMember).when(distributedSystem).getDistributedMember();
+
doReturn(gatewaySenderMXBean).when(managementService).getMBeanProxy(any(),
any());
+ }
+ return gatewaySenderMXBean;
+ }
+}
diff --git
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/StopGatewaySenderCommandDUnitTest.java
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/StopGatewaySenderCommandDUnitTest.java
index 2d02065..dce42d3 100644
---
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/StopGatewaySenderCommandDUnitTest.java
+++
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/StopGatewaySenderCommandDUnitTest.java
@@ -28,6 +28,7 @@ import java.io.Serializable;
import java.util.List;
import java.util.Properties;
+import org.assertj.core.api.Condition;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -45,7 +46,6 @@ import org.apache.geode.test.junit.categories.WanTest;
import org.apache.geode.test.junit.rules.GfshCommandRule;
@Category({WanTest.class})
-@SuppressWarnings("serial")
public class StopGatewaySenderCommandDUnitTest implements Serializable {
@Rule
@@ -77,7 +77,7 @@ public class StopGatewaySenderCommandDUnitTest implements
Serializable {
}
@Test
- public void testStopGatewaySender_ErrorConditions() throws Exception {
+ public void testStopGatewaySender_ErrorConditions() {
Integer locator1Port = locatorSite1.getPort();
// setup servers in Site #1 (London)
@@ -94,7 +94,7 @@ public class StopGatewaySenderCommandDUnitTest implements
Serializable {
}
@Test
- public void testStopGatewaySender() throws Exception {
+ public void testStopGatewaySender() {
Integer locator1Port = locatorSite1.getPort();
// setup servers in Site #1 (London)
@@ -148,7 +148,7 @@ public class StopGatewaySenderCommandDUnitTest implements
Serializable {
* test to validate that the start gateway sender starts the gateway sender
on a member
*/
@Test
- public void testStopGatewaySender_onMember() throws Exception {
+ public void testStopGatewaySender_onMember() {
Integer locator1Port = locatorSite1.getPort();
// setup servers in Site #1 (London)
@@ -184,8 +184,8 @@ public class StopGatewaySenderCommandDUnitTest implements
Serializable {
* test to validate that the start gateway sender starts the gateway sender
on a group of members
*/
@Test
- public void testStopGatewaySender_Group() throws Exception {
- Integer locator1Port = locatorSite1.getPort();
+ public void testStopGatewaySender_Group() {
+ int locator1Port = locatorSite1.getPort();
// setup servers in Site #1 (London)
server1 = startServerWithGroups(3, "SenderGroup1", locator1Port);
@@ -239,8 +239,8 @@ public class StopGatewaySenderCommandDUnitTest implements
Serializable {
* to multiple groups
*/
@Test
- public void testStopGatewaySender_MultipleGroup() throws Exception {
- Integer locator1Port = locatorSite1.getPort();
+ public void testStopGatewaySender_MultipleGroup() {
+ int locator1Port = locatorSite1.getPort();
// setup servers in Site #1 (London)
server1 = startServerWithGroups(3, "SenderGroup1", locator1Port);
@@ -307,13 +307,124 @@ public class StopGatewaySenderCommandDUnitTest
implements Serializable {
server5.invoke(() -> verifySenderState("ln", true, false));
}
- private CommandResult executeCommandWithIgnoredExceptions(String command)
throws Exception {
+ @Test
+ public void testStopGatewaySender_OneSenderNotRunning() {
+ Integer locator1Port = locatorSite1.getPort();
+
+ // setup servers in Site #1 (London)
+ server1 = clusterStartupRule.startServerVM(3, locator1Port);
+ server2 = clusterStartupRule.startServerVM(4, locator1Port);
+ server3 = clusterStartupRule.startServerVM(5, locator1Port);
+
+ server1.invoke(() -> createSender("ln", 2, false, 100, 400, false, false,
null, true));
+ server2.invoke(() -> createSender("ln", 2, false, 100, 400, false, false,
null, true));
+ server3.invoke(() -> createSender("ln", 2, false, 100, 400, false, false,
null, true));
+
+ server1.invoke(() -> startSender("ln"));
+ server2.invoke(() -> startSender("ln"));
+
+ server1.invoke(() -> verifySenderState("ln", true, false));
+ server2.invoke(() -> verifySenderState("ln", true, false));
+ server3.invoke(() -> verifySenderState("ln", false, false));
+
+ locatorSite1.invoke(
+ () -> validateGatewaySenderMXBeanProxy(getMember(server1.getVM()),
"ln", true, false));
+ locatorSite1.invoke(
+ () -> validateGatewaySenderMXBeanProxy(getMember(server2.getVM()),
"ln", true, false));
+ locatorSite1.invoke(
+ () -> validateGatewaySenderMXBeanProxy(getMember(server3.getVM()),
"ln", false, false));
+
+ String command =
+ CliStrings.STOP_GATEWAYSENDER + " --" +
CliStrings.STOP_GATEWAYSENDER__ID + "=ln";
+ CommandResult cmdResult = executeCommandWithIgnoredExceptions(command);
+ assertThat(cmdResult).isNotNull();
+ assertThat(cmdResult.getStatus()).isSameAs(Result.Status.OK);
+
+ TabularResultModel resultData = cmdResult.getResultData()
+ .getTableSection(CliStrings.STOP_GATEWAYSENDER);
+ List<String> status = resultData.getValuesInColumn("Result");
+ assertThat(status).containsExactlyInAnyOrder("OK", "OK", "Error");
+
+ List<String> message = resultData.getValuesInColumn("Message");
+ Condition<String> okMsg =
+ new Condition<>(s -> s.startsWith("GatewaySender ln is stopped on
member "), "ok");
+ Condition<String> errorMsg =
+ new Condition<>(s -> s.startsWith("GatewaySender ln is not running on
member "), "ok");
+
+ assertThat(message).haveExactly(2, okMsg);
+ assertThat(message).haveExactly(1, errorMsg);
+
+ locatorSite1.invoke(
+ () -> validateGatewaySenderMXBeanProxy(getMember(server1.getVM()),
"ln", false, false));
+ locatorSite1.invoke(
+ () -> validateGatewaySenderMXBeanProxy(getMember(server2.getVM()),
"ln", false, false));
+ locatorSite1.invoke(
+ () -> validateGatewaySenderMXBeanProxy(getMember(server3.getVM()),
"ln", false, false));
+
+ server1.invoke(() -> verifySenderState("ln", false, false));
+ server2.invoke(() -> verifySenderState("ln", false, false));
+ server3.invoke(() -> verifySenderState("ln", false, false));
+ }
+
+ @Test
+ public void testStopGatewaySender_OneSenderNotAvailable() {
+ Integer locator1Port = locatorSite1.getPort();
+
+ // setup servers in Site #1 (London)
+ server1 = clusterStartupRule.startServerVM(3, locator1Port);
+ server2 = clusterStartupRule.startServerVM(4, locator1Port);
+ server3 = clusterStartupRule.startServerVM(5, locator1Port);
+
+ server1.invoke(() -> createSender("ln", 2, false, 100, 400, false, false,
null, true));
+ server2.invoke(() -> createSender("ln", 2, false, 100, 400, false, false,
null, true));
+
+ server1.invoke(() -> startSender("ln"));
+ server2.invoke(() -> startSender("ln"));
+
+ server1.invoke(() -> verifySenderState("ln", true, false));
+ server2.invoke(() -> verifySenderState("ln", true, false));
+
+ locatorSite1.invoke(
+ () -> validateGatewaySenderMXBeanProxy(getMember(server1.getVM()),
"ln", true, false));
+ locatorSite1.invoke(
+ () -> validateGatewaySenderMXBeanProxy(getMember(server2.getVM()),
"ln", true, false));
+
+ String command =
+ CliStrings.STOP_GATEWAYSENDER + " --" +
CliStrings.STOP_GATEWAYSENDER__ID + "=ln";
+ CommandResult cmdResult = executeCommandWithIgnoredExceptions(command);
+ assertThat(cmdResult).isNotNull();
+ assertThat(cmdResult.getStatus()).isSameAs(Result.Status.OK);
+
+ TabularResultModel resultData = cmdResult.getResultData()
+ .getTableSection(CliStrings.STOP_GATEWAYSENDER);
+ List<String> status = resultData.getValuesInColumn("Result");
+ assertThat(status).containsExactlyInAnyOrder("OK", "OK", "Error");
+
+ List<String> message = resultData.getValuesInColumn("Message");
+ Condition<String> okMsg =
+ new Condition<>(s -> s.startsWith("GatewaySender ln is stopped on
member "), "ok");
+ Condition<String> errorMsg =
+ new Condition<>(s -> s.startsWith("GatewaySender ln is not available
on member "), "ok");
+
+ assertThat(message).haveExactly(2, okMsg);
+ assertThat(message).haveExactly(1, errorMsg);
+
+ locatorSite1.invoke(
+ () -> validateGatewaySenderMXBeanProxy(getMember(server1.getVM()),
"ln", false, false));
+ locatorSite1.invoke(
+ () -> validateGatewaySenderMXBeanProxy(getMember(server2.getVM()),
"ln", false, false));
+
+ server1.invoke(() -> verifySenderState("ln", false, false));
+ server2.invoke(() -> verifySenderState("ln", false, false));
+ }
+
+ private CommandResult executeCommandWithIgnoredExceptions(String command) {
try (IgnoredException ie = IgnoredException.addIgnoredException("Could not
connect")) {
return gfsh.executeCommand(command);
}
}
- private MemberVM startServerWithGroups(int index, String groups, int
locPort) throws Exception {
+ private MemberVM startServerWithGroups(int index, String groups, int
locPort) {
Properties props = new Properties();
props.setProperty(GROUPS, groups);
return clusterStartupRule.startServerVM(index, props, locPort);