This is an automated email from the ASF dual-hosted git repository.

alberto pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new e2abb65  GEODE-9807: gfsh gateway sender stop in parallel (like start) 
(#7108)
e2abb65 is described below

commit e2abb6579e553fafd7b48dac0fed82af27d178bf
Author: Alberto Gomez <[email protected]>
AuthorDate: Thu Jan 20 08:30:56 2022 +0100

    GEODE-9807: gfsh gateway sender stop in parallel (like start) (#7108)
    
    * GEODE-9807: gfsh Gateway sender stop in parallel (like start)
    
    * GEODE-9807: Add unit tests
    
    * GEODE-9807: Refactor to better test after review.
    
    * GEODE-9807: Changes after review.
    
    * GEODE-9807: Removed delegate class to simplify as suggested by review
    
    * GEODE-9807: Small changes after review
    
    * GEODE-9807: stub some class under test methods instead of passing 
dependencies in constructor
---
 .../geode/management/internal/i18n/CliStrings.java |   6 +-
 geode-gfsh/build.gradle                            |   1 +
 .../cli/commands/StartGatewaySenderCommand.java    |   2 +-
 .../cli/commands/StopGatewaySenderCommand.java     | 112 ++++++++++-----
 .../StopGatewaySenderOnMemberWithBeanImpl.java     |  65 +++++++++
 .../cli/commands/StopGatewaySenderCommandTest.java | 156 +++++++++++++++++++++
 .../StopGatewaySenderOnMemberWithBeanImplTest.java | 122 ++++++++++++++++
 .../StopGatewaySenderCommandDUnitTest.java         | 131 +++++++++++++++--
 8 files changed, 547 insertions(+), 48 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/management/internal/i18n/CliStrings.java
 
b/geode-core/src/main/java/org/apache/geode/management/internal/i18n/CliStrings.java
index 2372aeb..ba31f14 100644
--- 
a/geode-core/src/main/java/org/apache/geode/management/internal/i18n/CliStrings.java
+++ 
b/geode-core/src/main/java/org/apache/geode/management/internal/i18n/CliStrings.java
@@ -3013,10 +3013,14 @@ public class CliStrings {
       "GatewayReceiver is not running on member {0}";
   public static final String GATEWAYS_ARE_NOT_AVAILABLE_IN_CLUSTER =
       "GatewaySenders or GatewayReceivers are not available in cluster";
-  public static final String GATEWAY_SENDER_0_COULD_NOT_BE_INVOKED_DUE_TO_1 =
+  public static final String 
GATEWAY_SENDER_START_0_COULD_NOT_BE_INVOKED_DUE_TO_1 =
       "Could not invoke start gateway sender {0} operation on members due to 
{1}";
   public static final String 
GATEWAY_SENDER_0_COULD_NOT_BE_STARTED_ON_MEMBER_DUE_TO_1 =
       "Could not start gateway sender {0} on member due to {1}";
+  public static final String 
GATEWAY_SENDER_STOP_0_COULD_NOT_BE_INVOKED_DUE_TO_1 =
+      "Could not invoke stop gateway sender {0} operation on members due to 
{1}";
+  public static final String 
GATEWAY_SENDER_0_COULD_NOT_BE_STOPPED_ON_MEMBER_DUE_TO_1 =
+      "Could not stop gateway sender {0} on member due to {1}";
   public static final String GATEWAY_SENDER_0_IS_UPDATED_ON_MEMBER_1 =
       "GatewaySender {0} is updated on member {1}";
   /* end gateway command messages */
diff --git a/geode-gfsh/build.gradle b/geode-gfsh/build.gradle
index f101331..fb9bfc5 100644
--- a/geode-gfsh/build.gradle
+++ b/geode-gfsh/build.gradle
@@ -43,6 +43,7 @@ dependencies {
   testImplementation('com.github.stephenc.findbugs:findbugs-annotations')
   testImplementation('org.springframework:spring-test')
   testImplementation(project(':geode-junit'))
+  testImplementation('pl.pragmatists:JUnitParams')
 
   integrationTestImplementation(project(':geode-dunit'))
   integrationTestImplementation('pl.pragmatists:JUnitParams')
diff --git 
a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/StartGatewaySenderCommand.java
 
b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/StartGatewaySenderCommand.java
index 5bdbb8c..996e85b 100644
--- 
a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/StartGatewaySenderCommand.java
+++ 
b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/StartGatewaySenderCommand.java
@@ -129,7 +129,7 @@ public class StartGatewaySenderCommand extends GfshCommand {
       futures = execService.invokeAll(callables);
     } catch (InterruptedException ite) {
       return ResultModel.createError(
-          
CliStrings.format(CliStrings.GATEWAY_SENDER_0_COULD_NOT_BE_INVOKED_DUE_TO_1, id,
+          
CliStrings.format(CliStrings.GATEWAY_SENDER_START_0_COULD_NOT_BE_INVOKED_DUE_TO_1,
 id,
               ite.getMessage()));
     } finally {
       execService.shutdown();
diff --git 
a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/StopGatewaySenderCommand.java
 
b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/StopGatewaySenderCommand.java
index 035c419..503f30d3 100644
--- 
a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/StopGatewaySenderCommand.java
+++ 
b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/StopGatewaySenderCommand.java
@@ -15,16 +15,23 @@
 
 package org.apache.geode.management.internal.cli.commands;
 
-import java.util.Set;
+import static 
org.apache.geode.logging.internal.executors.LoggingExecutors.newCachedThreadPool;
 
-import javax.management.ObjectName;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
 import org.springframework.shell.core.annotation.CliCommand;
 import org.springframework.shell.core.annotation.CliOption;
 
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.cache.Cache;
 import org.apache.geode.distributed.DistributedMember;
-import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.management.GatewaySenderMXBean;
 import org.apache.geode.management.cli.CliMetaData;
 import org.apache.geode.management.cli.ConverterHint;
 import org.apache.geode.management.cli.GfshCommand;
@@ -36,7 +43,24 @@ import 
org.apache.geode.management.internal.security.ResourceOperation;
 import org.apache.geode.security.ResourcePermission;
 
 public class StopGatewaySenderCommand extends GfshCommand {
+  private final ExecutorService executorService;
+  private final StopGatewaySenderOnMember stopperOnMember;
+
+  @SuppressWarnings("unused") // invoked by spring shell
+  public StopGatewaySenderCommand() {
+    this(newCachedThreadPool("Stop Sender Command Thread ", true),
+        new StopGatewaySenderOnMemberWithBeanImpl());
+  }
+
+  @VisibleForTesting
+  StopGatewaySenderCommand(
+      ExecutorService executorService,
+      StopGatewaySenderOnMember stopperOnMember) {
+    this.executorService = executorService;
+    this.stopperOnMember = stopperOnMember;
+  }
 
+  @SuppressWarnings("unused") // invoked by spring shell
   @CliCommand(value = CliStrings.STOP_GATEWAYSENDER, help = 
CliStrings.STOP_GATEWAYSENDER__HELP)
   @CliMetaData(relatedTopic = CliStrings.TOPIC_GEODE_WAN)
   @ResourceOperation(resource = ResourcePermission.Resource.CLUSTER,
@@ -53,50 +77,66 @@ public class StopGatewaySenderCommand extends GfshCommand {
           optionContext = ConverterHint.MEMBERIDNAME,
           help = CliStrings.STOP_GATEWAYSENDER__MEMBER__HELP) String[] 
onMember) {
 
-    if (senderId != null) {
-      senderId = senderId.trim();
+    Set<DistributedMember> dsMembers = findMembers(onGroup, onMember);
+
+    if (dsMembers.isEmpty()) {
+      return ResultModel.createError(CliStrings.NO_MEMBERS_FOUND_MESSAGE);
     }
 
-    InternalCache cache = (InternalCache) getCache();
-    SystemManagementService service = getManagementService();
+    return executeStopGatewaySender(senderId.trim(), getCache(), dsMembers);
+  }
+
+  public ResultModel executeStopGatewaySender(String id, Cache cache,
+      Set<DistributedMember> dsMembers) {
+    List<DistributedMember> dsMembersList = new ArrayList<>(dsMembers);
+    List<Callable<List<String>>> callables = new ArrayList<>();
 
-    GatewaySenderMXBean bean;
+    for (final DistributedMember member : dsMembersList) {
+      callables.add(() -> stopperOnMember
+          .executeStopGatewaySenderOnMember(id,
+              cache, getManagementService(), member));
+    }
 
-    Set<DistributedMember> dsMembers = findMembers(onGroup, onMember);
-    if (dsMembers.isEmpty()) {
-      return ResultModel.createError(CliStrings.NO_MEMBERS_FOUND_MESSAGE);
+    List<Future<List<String>>> futures;
+    try {
+      futures = executorService.invokeAll(callables);
+    } catch (InterruptedException ite) {
+      Thread.currentThread().interrupt();
+      return ResultModel.createError(
+          
CliStrings.format(CliStrings.GATEWAY_SENDER_STOP_0_COULD_NOT_BE_INVOKED_DUE_TO_1,
 id,
+              ite.getMessage()));
+    } finally {
+      executorService.shutdown();
     }
 
+    return buildResultModelFromMembersResponses(id, dsMembersList, futures);
+  }
+
+  private ResultModel buildResultModelFromMembersResponses(String id,
+      List<DistributedMember> dsMembers, List<Future<List<String>>> futures) {
     ResultModel resultModel = new ResultModel();
     TabularResultModel resultData = 
resultModel.addTable(CliStrings.STOP_GATEWAYSENDER);
-    for (DistributedMember member : dsMembers) {
-      if 
(cache.getDistributedSystem().getDistributedMember().getId().equals(member.getId()))
 {
-        bean = service.getLocalGatewaySenderMXBean(senderId);
-      } else {
-        ObjectName objectName = service.getGatewaySenderMBeanName(member, 
senderId);
-        bean = service.getMBeanProxy(objectName, GatewaySenderMXBean.class);
-      }
-      if (bean != null) {
-        if (bean.isRunning()) {
-          bean.stop();
-          resultData.addMemberStatusResultRow(member.getId(),
-              CliStrings.GATEWAY_OK, CliStrings.format(
-                  CliStrings.GATEWAY_SENDER_0_IS_STOPPED_ON_MEMBER_1, 
senderId, member.getId()));
-
-        } else {
-          resultData.addMemberStatusResultRow(member.getId(),
-              CliStrings.GATEWAY_ERROR,
-              
CliStrings.format(CliStrings.GATEWAY_SENDER_0_IS_NOT_RUNNING_ON_MEMBER_1, 
senderId,
-                  member.getId()));
-        }
-      } else {
+    Iterator<DistributedMember> memberIterator = dsMembers.iterator();
+    for (Future<List<String>> future : futures) {
+      DistributedMember member = memberIterator.next();
+      List<String> memberStatus;
+      try {
+        memberStatus = future.get();
+        resultData.addMemberStatusResultRow(memberStatus.get(0),
+            memberStatus.get(1), memberStatus.get(2));
+      } catch (InterruptedException | ExecutionException ite) {
         resultData.addMemberStatusResultRow(member.getId(),
             CliStrings.GATEWAY_ERROR,
-            
CliStrings.format(CliStrings.GATEWAY_SENDER_0_IS_NOT_AVAILABLE_ON_MEMBER_1, 
senderId,
-                member.getId()));
+            
CliStrings.format(CliStrings.GATEWAY_SENDER_0_COULD_NOT_BE_STOPPED_ON_MEMBER_DUE_TO_1,
+                id, ite.getMessage()));
       }
     }
-
     return resultModel;
   }
+
+  @FunctionalInterface
+  interface StopGatewaySenderOnMember {
+    List<String> executeStopGatewaySenderOnMember(String id, Cache cache,
+        SystemManagementService managementService, DistributedMember member);
+  }
 }
diff --git 
a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/StopGatewaySenderOnMemberWithBeanImpl.java
 
b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/StopGatewaySenderOnMemberWithBeanImpl.java
new file mode 100644
index 0000000..2582545
--- /dev/null
+++ 
b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/StopGatewaySenderOnMemberWithBeanImpl.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management.internal.cli.commands;
+
+import java.util.ArrayList;
+
+import javax.management.ObjectName;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.management.GatewaySenderMXBean;
+import org.apache.geode.management.internal.SystemManagementService;
+import org.apache.geode.management.internal.i18n.CliStrings;
+
+class StopGatewaySenderOnMemberWithBeanImpl
+    implements StopGatewaySenderCommand.StopGatewaySenderOnMember {
+
+  public ArrayList<String> executeStopGatewaySenderOnMember(String id, Cache 
cache,
+      SystemManagementService managementService, DistributedMember member) {
+    GatewaySenderMXBean bean;
+    ArrayList<String> statusList = new ArrayList<>();
+    if 
(cache.getDistributedSystem().getDistributedMember().getId().equals(member.getId()))
 {
+      bean = managementService.getLocalGatewaySenderMXBean(id);
+    } else {
+      ObjectName objectName = 
managementService.getGatewaySenderMBeanName(member, id);
+      bean = managementService.getMBeanProxy(objectName, 
GatewaySenderMXBean.class);
+    }
+
+    if (bean == null) {
+      statusList.add(member.getId());
+      statusList.add(CliStrings.GATEWAY_ERROR);
+      
statusList.add(CliStrings.format(CliStrings.GATEWAY_SENDER_0_IS_NOT_AVAILABLE_ON_MEMBER_1,
+          id, member.getId()));
+      return statusList;
+    }
+
+    if (!bean.isRunning()) {
+      statusList.add(member.getId());
+      statusList.add(CliStrings.GATEWAY_ERROR);
+      statusList.add(CliStrings.format(
+          CliStrings.GATEWAY_SENDER_0_IS_NOT_RUNNING_ON_MEMBER_1, id, 
member.getId()));
+      return statusList;
+    }
+
+    bean.stop();
+    statusList.add(member.getId());
+    statusList.add(CliStrings.GATEWAY_OK);
+    
statusList.add(CliStrings.format(CliStrings.GATEWAY_SENDER_0_IS_STOPPED_ON_MEMBER_1,
 id,
+        member.getId()));
+    return statusList;
+  }
+}
diff --git 
a/geode-gfsh/src/test/java/org/apache/geode/management/internal/cli/commands/StopGatewaySenderCommandTest.java
 
b/geode-gfsh/src/test/java/org/apache/geode/management/internal/cli/commands/StopGatewaySenderCommandTest.java
new file mode 100644
index 0000000..e7de7f2
--- /dev/null
+++ 
b/geode-gfsh/src/test/java/org/apache/geode/management/internal/cli/commands/StopGatewaySenderCommandTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management.internal.cli.commands;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.management.internal.SystemManagementService;
+import org.apache.geode.management.internal.cli.result.model.ResultModel;
+import 
org.apache.geode.management.internal.cli.result.model.TabularResultModel;
+import org.apache.geode.management.internal.i18n.CliStrings;
+import org.apache.geode.test.junit.rules.GfshParserRule;
+
+public class StopGatewaySenderCommandTest {
+  @ClassRule
+  public static GfshParserRule gfsh = new GfshParserRule();
+
+  private final String senderId = "sender1";
+  private Cache cache;
+  private Set<DistributedMember> members;
+  private ExecutorService executorService;
+  private SystemManagementService managementService;
+  private StopGatewaySenderCommand.StopGatewaySenderOnMember stopperOnMember;
+
+  @Before
+  public void setUp() {
+    cache = mock(Cache.class);
+    members =
+        Stream.generate(() -> mock(DistributedMember.class)).limit(3)
+            .collect(Collectors.toSet());
+
+    executorService = mock(ExecutorService.class);
+    managementService = mock(SystemManagementService.class);
+    stopperOnMember = 
mock(StopGatewaySenderCommand.StopGatewaySenderOnMember.class);
+  }
+
+  @Test
+  public void whenMissingSenderIdCommandReturnsInvalidCommandError() {
+    StopGatewaySenderCommand command = new StopGatewaySenderCommand();
+    gfsh.executeAndAssertThat(command, "stop gateway-sender")
+        .statusIsError().containsOutput("Invalid command");
+  }
+
+  @Test
+  public void whenNoMembersCommandReturnsNoMembersError() {
+    // arrange
+    Set<DistributedMember> emptySet = new HashSet<>();
+    StopGatewaySenderCommand command =
+        spy(new StopGatewaySenderCommand(executorService, stopperOnMember));
+
+    doReturn(emptySet).when(command).findMembers(any(), any());
+
+    // act and assert
+    gfsh.executeAndAssertThat(command, "stop gateway-sender --id=sender1")
+        .statusIsError().containsOutput("No Members Found");
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void 
stopGatewaySenderStartsOneThreadPerMemberAndBuildsOutputAccordingToFuturesOutput()
+      throws InterruptedException, ExecutionException {
+    // arrange
+    String gatewaySenderIsStoppedMsg = "GatewaySender ln is stopped on member";
+    List<Future<List<String>>> futures = new ArrayList<>();
+    for (int memberIndex = 0; memberIndex < members.size(); memberIndex++) {
+      Future<List<String>> future = mock(Future.class);
+      List<String> list = Arrays.asList("member" + memberIndex, "OK", 
gatewaySenderIsStoppedMsg);
+      doReturn(list).when(future).get();
+      futures.add(future);
+    }
+    doReturn(futures).when(executorService).invokeAll(any());
+    StopGatewaySenderCommand command =
+        spy(new StopGatewaySenderCommand(executorService, stopperOnMember));
+    doReturn(members).when(command).findMembers(any(), any());
+    doReturn(managementService).when(command).getManagementService();
+
+    // act
+    ResultModel result = command.executeStopGatewaySender(senderId, cache, 
members);
+
+    // assert
+    assertThat(result.isSuccessful()).isTrue();
+
+    TabularResultModel resultData = 
result.getTableSection(CliStrings.STOP_GATEWAYSENDER);
+    List<String> member = resultData.getValuesInColumn("Member");
+    assertThat(member).containsExactlyInAnyOrder("member0", "member1", 
"member2");
+    List<String> status = resultData.getValuesInColumn("Result");
+    assertThat(status).containsExactlyInAnyOrder("OK", "OK", "OK");
+    List<String> message = resultData.getValuesInColumn("Message");
+    assertThat(message).containsExactlyInAnyOrder(gatewaySenderIsStoppedMsg,
+        gatewaySenderIsStoppedMsg, gatewaySenderIsStoppedMsg);
+
+    ArgumentCaptor<Collection> callablesCaptor =
+        ArgumentCaptor.forClass(Collection.class);
+    verify(executorService, times(1)).invokeAll((callablesCaptor.capture()));
+    assertThat(callablesCaptor.getValue().size()).isEqualTo(members.size());
+    verify(executorService, times(1)).shutdown();
+  }
+
+  @Test
+  public void stopGatewaySenderInterruptedReturnsError() throws 
InterruptedException {
+    // arrange
+    InterruptedException exception = new InterruptedException("interruption2");
+    doThrow(exception).when(executorService).invokeAll(any());
+    StopGatewaySenderCommand command =
+        spy(new StopGatewaySenderCommand(executorService, stopperOnMember));
+    doReturn(members).when(command).findMembers(any(), any());
+    doReturn(managementService).when(command).getManagementService();
+
+    // act
+    ResultModel result = command.executeStopGatewaySender(senderId, cache, 
members);
+
+    // assert
+    assertThat(result.isSuccessful()).isFalse();
+    assertThat(result.getInfoSection("info").getContent().get(0)).isEqualTo(
+        "Could not invoke stop gateway sender sender1 operation on members due 
to interruption2");
+  }
+
+}
diff --git 
a/geode-gfsh/src/test/java/org/apache/geode/management/internal/cli/commands/StopGatewaySenderOnMemberWithBeanImplTest.java
 
b/geode-gfsh/src/test/java/org/apache/geode/management/internal/cli/commands/StopGatewaySenderOnMemberWithBeanImplTest.java
new file mode 100644
index 0000000..3c3dfa1
--- /dev/null
+++ 
b/geode-gfsh/src/test/java/org/apache/geode/management/internal/cli/commands/StopGatewaySenderOnMemberWithBeanImplTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management.internal.cli.commands;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+
+import junitparams.Parameters;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.DistributedSystem;
+import org.apache.geode.management.GatewaySenderMXBean;
+import org.apache.geode.management.internal.SystemManagementService;
+import org.apache.geode.test.junit.runners.GeodeParamsRunner;
+
+@RunWith(GeodeParamsRunner.class)
+public class StopGatewaySenderOnMemberWithBeanImplTest {
+  String senderId = "sender1";
+  String memberId = "member1";
+  String remoteMemberId = "remoteMember1";
+  Cache cache;
+  SystemManagementService managementService;
+  DistributedMember distributedMember;
+  DistributedMember remoteDistributedMember;
+  DistributedSystem distributedSystem;
+
+  @Before
+  public void setUp() {
+    cache = mock(Cache.class);
+    managementService = mock(SystemManagementService.class);
+    distributedMember = mock(DistributedMember.class);
+    remoteDistributedMember = mock(DistributedMember.class);
+    when(distributedMember.getId()).thenReturn(memberId);
+    when(remoteDistributedMember.getId()).thenReturn(remoteMemberId);
+    distributedSystem = mock(DistributedSystem.class);
+    doReturn(distributedSystem).when(cache).getDistributedSystem();
+  }
+
+  @Test
+  @Parameters({"true", "false"})
+  public void executeStopGatewaySenderOnMemberNotRunningReturnsNotRunningError(
+      boolean isLocalMember) {
+    GatewaySenderMXBean gatewaySenderMXBean = 
gatewaySenderMXBean(isLocalMember, false);
+    when(gatewaySenderMXBean.isRunning()).thenReturn(false);
+
+    StopGatewaySenderCommand.StopGatewaySenderOnMember stopperWithBean =
+        new StopGatewaySenderOnMemberWithBeanImpl();
+
+    List<String> result = 
stopperWithBean.executeStopGatewaySenderOnMember(senderId, cache,
+        managementService, distributedMember);
+    assertThat(result).containsExactly(memberId, "Error",
+        "GatewaySender sender1 is not running on member " + memberId + ".");
+  }
+
+  @Test
+  @Parameters({"true", "false"})
+  public void 
executeStopGatewaySenderOnMemberNotAvailableReturnsNotAvailableError(
+      boolean isLocalMember) {
+    gatewaySenderMXBean(isLocalMember, true);
+
+    StopGatewaySenderCommand.StopGatewaySenderOnMember stopperWithBean =
+        new StopGatewaySenderOnMemberWithBeanImpl();
+
+    List<String> result = 
stopperWithBean.executeStopGatewaySenderOnMember(senderId, cache,
+        managementService, distributedMember);
+
+    assertThat(result).containsExactly(memberId, "Error",
+        "GatewaySender sender1 is not available on member " + memberId);
+  }
+
+  @Test
+  @Parameters({"true", "false"})
+  public void executeStopGatewaySenderOnMemberRunningReturnsOk(boolean 
isLocalMember) {
+    GatewaySenderMXBean gatewaySenderMXBean = 
gatewaySenderMXBean(isLocalMember, false);
+    when(gatewaySenderMXBean.isRunning()).thenReturn(true);
+
+    StopGatewaySenderCommand.StopGatewaySenderOnMember stopperWithBean =
+        new StopGatewaySenderOnMemberWithBeanImpl();
+
+    List<String> result = 
stopperWithBean.executeStopGatewaySenderOnMember(senderId, cache,
+        managementService, distributedMember);
+
+    assertThat(result).containsExactly(memberId, "OK",
+        "GatewaySender sender1 is stopped on member " + memberId);
+  }
+
+  private GatewaySenderMXBean gatewaySenderMXBean(boolean isLocalMember, 
boolean mustBeNull) {
+    GatewaySenderMXBean gatewaySenderMXBean = mustBeNull
+        ? null
+        : mock(GatewaySenderMXBean.class);
+    if (isLocalMember) {
+      
doReturn(distributedMember).when(distributedSystem).getDistributedMember();
+      
doReturn(gatewaySenderMXBean).when(managementService).getLocalGatewaySenderMXBean(senderId);
+    } else {
+      
doReturn(remoteDistributedMember).when(distributedSystem).getDistributedMember();
+      
doReturn(gatewaySenderMXBean).when(managementService).getMBeanProxy(any(), 
any());
+    }
+    return gatewaySenderMXBean;
+  }
+}
diff --git 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/StopGatewaySenderCommandDUnitTest.java
 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/StopGatewaySenderCommandDUnitTest.java
index 2d02065..dce42d3 100644
--- 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/StopGatewaySenderCommandDUnitTest.java
+++ 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/StopGatewaySenderCommandDUnitTest.java
@@ -28,6 +28,7 @@ import java.io.Serializable;
 import java.util.List;
 import java.util.Properties;
 
+import org.assertj.core.api.Condition;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -45,7 +46,6 @@ import org.apache.geode.test.junit.categories.WanTest;
 import org.apache.geode.test.junit.rules.GfshCommandRule;
 
 @Category({WanTest.class})
-@SuppressWarnings("serial")
 public class StopGatewaySenderCommandDUnitTest implements Serializable {
 
   @Rule
@@ -77,7 +77,7 @@ public class StopGatewaySenderCommandDUnitTest implements 
Serializable {
   }
 
   @Test
-  public void testStopGatewaySender_ErrorConditions() throws Exception {
+  public void testStopGatewaySender_ErrorConditions() {
     Integer locator1Port = locatorSite1.getPort();
 
     // setup servers in Site #1 (London)
@@ -94,7 +94,7 @@ public class StopGatewaySenderCommandDUnitTest implements 
Serializable {
   }
 
   @Test
-  public void testStopGatewaySender() throws Exception {
+  public void testStopGatewaySender() {
     Integer locator1Port = locatorSite1.getPort();
 
     // setup servers in Site #1 (London)
@@ -148,7 +148,7 @@ public class StopGatewaySenderCommandDUnitTest implements 
Serializable {
    * test to validate that the start gateway sender starts the gateway sender 
on a member
    */
   @Test
-  public void testStopGatewaySender_onMember() throws Exception {
+  public void testStopGatewaySender_onMember() {
     Integer locator1Port = locatorSite1.getPort();
 
     // setup servers in Site #1 (London)
@@ -184,8 +184,8 @@ public class StopGatewaySenderCommandDUnitTest implements 
Serializable {
    * test to validate that the start gateway sender starts the gateway sender 
on a group of members
    */
   @Test
-  public void testStopGatewaySender_Group() throws Exception {
-    Integer locator1Port = locatorSite1.getPort();
+  public void testStopGatewaySender_Group() {
+    int locator1Port = locatorSite1.getPort();
 
     // setup servers in Site #1 (London)
     server1 = startServerWithGroups(3, "SenderGroup1", locator1Port);
@@ -239,8 +239,8 @@ public class StopGatewaySenderCommandDUnitTest implements 
Serializable {
    * to multiple groups
    */
   @Test
-  public void testStopGatewaySender_MultipleGroup() throws Exception {
-    Integer locator1Port = locatorSite1.getPort();
+  public void testStopGatewaySender_MultipleGroup() {
+    int locator1Port = locatorSite1.getPort();
 
     // setup servers in Site #1 (London)
     server1 = startServerWithGroups(3, "SenderGroup1", locator1Port);
@@ -307,13 +307,124 @@ public class StopGatewaySenderCommandDUnitTest 
implements Serializable {
     server5.invoke(() -> verifySenderState("ln", true, false));
   }
 
-  private CommandResult executeCommandWithIgnoredExceptions(String command) 
throws Exception {
+  @Test
+  public void testStopGatewaySender_OneSenderNotRunning() {
+    Integer locator1Port = locatorSite1.getPort();
+
+    // setup servers in Site #1 (London)
+    server1 = clusterStartupRule.startServerVM(3, locator1Port);
+    server2 = clusterStartupRule.startServerVM(4, locator1Port);
+    server3 = clusterStartupRule.startServerVM(5, locator1Port);
+
+    server1.invoke(() -> createSender("ln", 2, false, 100, 400, false, false, 
null, true));
+    server2.invoke(() -> createSender("ln", 2, false, 100, 400, false, false, 
null, true));
+    server3.invoke(() -> createSender("ln", 2, false, 100, 400, false, false, 
null, true));
+
+    server1.invoke(() -> startSender("ln"));
+    server2.invoke(() -> startSender("ln"));
+
+    server1.invoke(() -> verifySenderState("ln", true, false));
+    server2.invoke(() -> verifySenderState("ln", true, false));
+    server3.invoke(() -> verifySenderState("ln", false, false));
+
+    locatorSite1.invoke(
+        () -> validateGatewaySenderMXBeanProxy(getMember(server1.getVM()), 
"ln", true, false));
+    locatorSite1.invoke(
+        () -> validateGatewaySenderMXBeanProxy(getMember(server2.getVM()), 
"ln", true, false));
+    locatorSite1.invoke(
+        () -> validateGatewaySenderMXBeanProxy(getMember(server3.getVM()), 
"ln", false, false));
+
+    String command =
+        CliStrings.STOP_GATEWAYSENDER + " --" + 
CliStrings.STOP_GATEWAYSENDER__ID + "=ln";
+    CommandResult cmdResult = executeCommandWithIgnoredExceptions(command);
+    assertThat(cmdResult).isNotNull();
+    assertThat(cmdResult.getStatus()).isSameAs(Result.Status.OK);
+
+    TabularResultModel resultData = cmdResult.getResultData()
+        .getTableSection(CliStrings.STOP_GATEWAYSENDER);
+    List<String> status = resultData.getValuesInColumn("Result");
+    assertThat(status).containsExactlyInAnyOrder("OK", "OK", "Error");
+
+    List<String> message = resultData.getValuesInColumn("Message");
+    Condition<String> okMsg =
+        new Condition<>(s -> s.startsWith("GatewaySender ln is stopped on 
member "), "ok");
+    Condition<String> errorMsg =
+        new Condition<>(s -> s.startsWith("GatewaySender ln is not running on 
member "), "ok");
+
+    assertThat(message).haveExactly(2, okMsg);
+    assertThat(message).haveExactly(1, errorMsg);
+
+    locatorSite1.invoke(
+        () -> validateGatewaySenderMXBeanProxy(getMember(server1.getVM()), 
"ln", false, false));
+    locatorSite1.invoke(
+        () -> validateGatewaySenderMXBeanProxy(getMember(server2.getVM()), 
"ln", false, false));
+    locatorSite1.invoke(
+        () -> validateGatewaySenderMXBeanProxy(getMember(server3.getVM()), 
"ln", false, false));
+
+    server1.invoke(() -> verifySenderState("ln", false, false));
+    server2.invoke(() -> verifySenderState("ln", false, false));
+    server3.invoke(() -> verifySenderState("ln", false, false));
+  }
+
+  @Test
+  public void testStopGatewaySender_OneSenderNotAvailable() {
+    Integer locator1Port = locatorSite1.getPort();
+
+    // setup servers in Site #1 (London)
+    server1 = clusterStartupRule.startServerVM(3, locator1Port);
+    server2 = clusterStartupRule.startServerVM(4, locator1Port);
+    server3 = clusterStartupRule.startServerVM(5, locator1Port);
+
+    server1.invoke(() -> createSender("ln", 2, false, 100, 400, false, false, 
null, true));
+    server2.invoke(() -> createSender("ln", 2, false, 100, 400, false, false, 
null, true));
+
+    server1.invoke(() -> startSender("ln"));
+    server2.invoke(() -> startSender("ln"));
+
+    server1.invoke(() -> verifySenderState("ln", true, false));
+    server2.invoke(() -> verifySenderState("ln", true, false));
+
+    locatorSite1.invoke(
+        () -> validateGatewaySenderMXBeanProxy(getMember(server1.getVM()), 
"ln", true, false));
+    locatorSite1.invoke(
+        () -> validateGatewaySenderMXBeanProxy(getMember(server2.getVM()), 
"ln", true, false));
+
+    String command =
+        CliStrings.STOP_GATEWAYSENDER + " --" + 
CliStrings.STOP_GATEWAYSENDER__ID + "=ln";
+    CommandResult cmdResult = executeCommandWithIgnoredExceptions(command);
+    assertThat(cmdResult).isNotNull();
+    assertThat(cmdResult.getStatus()).isSameAs(Result.Status.OK);
+
+    TabularResultModel resultData = cmdResult.getResultData()
+        .getTableSection(CliStrings.STOP_GATEWAYSENDER);
+    List<String> status = resultData.getValuesInColumn("Result");
+    assertThat(status).containsExactlyInAnyOrder("OK", "OK", "Error");
+
+    List<String> message = resultData.getValuesInColumn("Message");
+    Condition<String> okMsg =
+        new Condition<>(s -> s.startsWith("GatewaySender ln is stopped on 
member "), "ok");
+    Condition<String> errorMsg =
+        new Condition<>(s -> s.startsWith("GatewaySender ln is not available 
on member "), "ok");
+
+    assertThat(message).haveExactly(2, okMsg);
+    assertThat(message).haveExactly(1, errorMsg);
+
+    locatorSite1.invoke(
+        () -> validateGatewaySenderMXBeanProxy(getMember(server1.getVM()), 
"ln", false, false));
+    locatorSite1.invoke(
+        () -> validateGatewaySenderMXBeanProxy(getMember(server2.getVM()), 
"ln", false, false));
+
+    server1.invoke(() -> verifySenderState("ln", false, false));
+    server2.invoke(() -> verifySenderState("ln", false, false));
+  }
+
+  private CommandResult executeCommandWithIgnoredExceptions(String command) {
     try (IgnoredException ie = IgnoredException.addIgnoredException("Could not 
connect")) {
       return gfsh.executeCommand(command);
     }
   }
 
-  private MemberVM startServerWithGroups(int index, String groups, int 
locPort) throws Exception {
+  private MemberVM startServerWithGroups(int index, String groups, int 
locPort) {
     Properties props = new Properties();
     props.setProperty(GROUPS, groups);
     return clusterStartupRule.startServerVM(index, props, locPort);

Reply via email to