http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/WanCommandPauseResumeDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/WanCommandPauseResumeDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/WanCommandPauseResumeDUnitTest.java new file mode 100644 index 0000000..609885d --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/WanCommandPauseResumeDUnitTest.java @@ -0,0 +1,688 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.wancommand; + +import com.gemstone.gemfire.distributed.DistributedMember; +import com.gemstone.gemfire.management.cli.Result; +import com.gemstone.gemfire.management.internal.cli.i18n.CliStrings; +import com.gemstone.gemfire.management.internal.cli.result.CommandResult; +import com.gemstone.gemfire.management.internal.cli.result.TabularResultData; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.List; +import java.util.Properties; + +import static com.gemstone.gemfire.distributed.ConfigurationProperties.*; +import static com.gemstone.gemfire.test.dunit.Assert.*; +import static com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter; +import static com.gemstone.gemfire.test.dunit.Wait.pause; + +@Category(DistributedTest.class) +public class WanCommandPauseResumeDUnitTest extends WANCommandTestBase { + + private static final long serialVersionUID = 1L; + + @Test + public void testPauseGatewaySender_ErrorConditions() { + + Integer punePort = (Integer) vm1.invoke(() -> createFirstLocatorWithDSId( 1 )); + + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(DISTRIBUTED_SYSTEM_ID, "1"); + props.setProperty(LOCATORS, "localhost[" + punePort + "]"); + setUpJmxManagerOnVm0ThenConnect(props); + + Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, punePort )); + + vm3.invoke(() -> createCache( punePort )); + vm3.invoke(() -> createSender( "ln", + 2, false, 100, 400, false, false, null, true )); + + final DistributedMember vm1Member = (DistributedMember) vm3.invoke(() -> getMember()); + + String command = CliStrings.PAUSE_GATEWAYSENDER + " --" + + CliStrings.PAUSE_GATEWAYSENDER__ID + "=ln --" + + CliStrings.PAUSE_GATEWAYSENDER__MEMBER + "=" + vm1Member.getId() + + " --" + CliStrings.PAUSE_GATEWAYSENDER__GROUP + "=SenderGroup1"; + CommandResult cmdResult = executeCommand(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testPauseGatewaySender stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.ERROR, cmdResult.getStatus()); + assertTrue(strCmdResult.contains(CliStrings.PROVIDE_EITHER_MEMBER_OR_GROUP_MESSAGE)); + } else { + fail("testPauseGatewaySender failed as did not get CommandResult"); + } + } + + /** + * test to validate that the start gateway sender starts the gateway sender on + * a member + */ + @Test + public void testPauseGatewaySender_onMember() { + + Integer punePort = (Integer) vm1.invoke(() -> createFirstLocatorWithDSId( 1 )); + + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(DISTRIBUTED_SYSTEM_ID, "1"); + props.setProperty(LOCATORS, "localhost[" + punePort + "]"); + setUpJmxManagerOnVm0ThenConnect(props); + + Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, punePort )); + + vm3.invoke(() -> createCache( punePort )); + vm3.invoke(() -> createSender( "ln", + 2, false, 100, 400, false, false, null, true )); + + vm3.invoke(() -> startSender( "ln" )); + + vm3.invoke(() -> verifySenderState( + "ln", true, false )); + + final DistributedMember vm1Member = (DistributedMember) vm3.invoke(() -> getMember()); + pause(10000); + String command = CliStrings.PAUSE_GATEWAYSENDER + " --" + + CliStrings.PAUSE_GATEWAYSENDER__ID + "=ln --" + + CliStrings.PAUSE_GATEWAYSENDER__MEMBER + "=" + vm1Member.getId(); + CommandResult cmdResult = executeCommand(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testPauseGatewaySender stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + assertTrue(strCmdResult.contains("is paused on member")); + } else { + fail("testPauseGatewaySender failed as did not get CommandResult"); + } + + vm3.invoke(() -> verifySenderState( + "ln", true, true )); + } + + @Test + public void testPauseGatewaySender() { + + Integer punePort = (Integer) vm1.invoke(() -> createFirstLocatorWithDSId( 1 )); + + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(DISTRIBUTED_SYSTEM_ID, "1"); + props.setProperty(LOCATORS, "localhost[" + punePort + "]"); + setUpJmxManagerOnVm0ThenConnect(props); + + Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, punePort )); + + vm3.invoke(() -> createCache( punePort )); + vm3.invoke(() -> createSender( "ln", + 2, false, 100, 400, false, false, null, true )); + vm4.invoke(() -> createCache( punePort )); + vm4.invoke(() -> createSender( "ln", + 2, false, 100, 400, false, false, null, true )); + vm5.invoke(() -> createCache( punePort )); + vm5.invoke(() -> createSender( "ln", + 2, false, 100, 400, false, false, null, true )); + + vm3.invoke(() -> startSender( "ln" )); + vm4.invoke(() -> startSender( "ln" )); + vm5.invoke(() -> startSender( "ln" )); + + vm3.invoke(() -> verifySenderState( + "ln", true, false )); + vm4.invoke(() -> verifySenderState( + "ln", true, false )); + vm5.invoke(() -> verifySenderState( + "ln", true, false )); + + pause(10000); + String command = CliStrings.PAUSE_GATEWAYSENDER + " --" + + CliStrings.PAUSE_GATEWAYSENDER__ID + "=ln"; + CommandResult cmdResult = executeCommand(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testPauseGatewaySender stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData resultData = (TabularResultData) cmdResult + .getResultData(); + List<String> status = resultData.retrieveAllValues("Result"); + assertEquals(5, status.size()); + assertTrue(status.contains("Error")); + assertTrue(status.contains("OK")); + } else { + fail("testPauseGatewaySender failed as did not get CommandResult"); + } + + vm3.invoke(() -> verifySenderState( + "ln", true, true )); + vm4.invoke(() -> verifySenderState( + "ln", true, true )); + vm5.invoke(() -> verifySenderState( + "ln", true, true )); + } + + /** + * test to validate that the start gateway sender starts the gateway sender on + * a group of members + */ + @Test + public void testPauseGatewaySender_Group() { + + Integer punePort = (Integer) vm1.invoke(() -> createFirstLocatorWithDSId( 1 )); + + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(DISTRIBUTED_SYSTEM_ID, "1"); + props.setProperty(LOCATORS, "localhost[" + punePort + "]"); + setUpJmxManagerOnVm0ThenConnect(props); + + Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, punePort )); + + vm3.invoke(() -> createCacheWithGroups( + punePort, "SenderGroup1" )); + vm3.invoke(() -> createSender( "ln", + 2, false, 100, 400, false, false, null, true )); + vm4.invoke(() -> createCacheWithGroups( + punePort, "SenderGroup1" )); + vm4.invoke(() -> createSender( "ln", + 2, false, 100, 400, false, false, null, true )); + vm5.invoke(() -> createCacheWithGroups( + punePort, "SenderGroup1" )); + vm5.invoke(() -> createSender( "ln", + 2, false, 100, 400, false, false, null, true )); + + vm3.invoke(() -> startSender( "ln" )); + vm4.invoke(() -> startSender( "ln" )); + vm5.invoke(() -> startSender( "ln" )); + + vm3.invoke(() -> verifySenderState( + "ln", true, false )); + vm4.invoke(() -> verifySenderState( + "ln", true, false )); + vm5.invoke(() -> verifySenderState( + "ln", true, false )); + + pause(10000); + String command = CliStrings.PAUSE_GATEWAYSENDER + " --" + + CliStrings.PAUSE_GATEWAYSENDER__ID + "=ln --" + + CliStrings.PAUSE_GATEWAYSENDER__GROUP + "=SenderGroup1"; + CommandResult cmdResult = executeCommand(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testPauseGatewaySender_Group stringResult : " + strCmdResult + + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData resultData = (TabularResultData) cmdResult + .getResultData(); + List<String> status = resultData.retrieveAllValues("Result"); + assertEquals(3, status.size()); + assertFalse(status.contains("Error")); + assertTrue(status.contains("OK")); + } else { + fail("testPauseGatewaySender failed as did not get CommandResult"); + } + + vm3.invoke(() -> verifySenderState( + "ln", true, true )); + vm4.invoke(() -> verifySenderState( + "ln", true, true )); + vm5.invoke(() -> verifySenderState( + "ln", true, true )); + } + + /** + * Test to validate the scenario gateway sender is started when one or more + * sender members belongs to multiple groups + */ + @Test + public void testPauseGatewaySender_MultipleGroup() { + + Integer punePort = (Integer) vm1.invoke(() -> createFirstLocatorWithDSId( 1 )); + + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(DISTRIBUTED_SYSTEM_ID, "1"); + props.setProperty(LOCATORS, "localhost[" + punePort + "]"); + setUpJmxManagerOnVm0ThenConnect(props); + + Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, punePort )); + + vm3.invoke(() -> createCacheWithGroups( + punePort, "SenderGroup1" )); + vm3.invoke(() -> createSender( "ln", + 2, false, 100, 400, false, false, null, true )); + vm4.invoke(() -> createCacheWithGroups( + punePort, "SenderGroup1" )); + vm4.invoke(() -> createSender( "ln", + 2, false, 100, 400, false, false, null, true )); + vm5.invoke(() -> createCacheWithGroups( + punePort, "SenderGroup1, SenderGroup2" )); + vm5.invoke(() -> createSender( "ln", + 2, false, 100, 400, false, false, null, true )); + vm6.invoke(() -> createCacheWithGroups( + punePort, "SenderGroup2" )); + vm6.invoke(() -> createSender( "ln", + 2, false, 100, 400, false, false, null, true )); + vm7.invoke(() -> createCacheWithGroups( + punePort, "SenderGroup3" )); + vm7.invoke(() -> createSender( "ln", + 2, false, 100, 400, false, false, null, true )); + + vm3.invoke(() -> startSender( "ln" )); + vm4.invoke(() -> startSender( "ln" )); + vm5.invoke(() -> startSender( "ln" )); + vm6.invoke(() -> startSender( "ln" )); + vm7.invoke(() -> startSender( "ln" )); + + vm3.invoke(() -> verifySenderState( + "ln", true, false )); + vm4.invoke(() -> verifySenderState( + "ln", true, false )); + vm5.invoke(() -> verifySenderState( + "ln", true, false )); + vm6.invoke(() -> verifySenderState( + "ln", true, false )); + vm7.invoke(() -> verifySenderState( + "ln", true, false )); + + pause(10000); + String command = CliStrings.PAUSE_GATEWAYSENDER + " --" + + CliStrings.PAUSE_GATEWAYSENDER__ID + "=ln --" + + CliStrings.PAUSE_GATEWAYSENDER__GROUP + "=SenderGroup1,SenderGroup2"; + CommandResult cmdResult = executeCommand(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testPauseGatewaySender_Group stringResult : " + strCmdResult + + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + TabularResultData resultData = (TabularResultData) cmdResult + .getResultData(); + List<String> status = resultData.retrieveAllValues("Result"); + assertEquals(4, status.size()); + assertFalse(status.contains("Error")); + assertTrue(status.contains("OK")); + } else { + fail("testPauseGatewaySender failed as did not get CommandResult"); + } + + vm3.invoke(() -> verifySenderState( + "ln", true, true )); + vm4.invoke(() -> verifySenderState( + "ln", true, true )); + vm5.invoke(() -> verifySenderState( + "ln", true, true )); + vm6.invoke(() -> verifySenderState( + "ln", true, true )); + vm7.invoke(() -> verifySenderState( + "ln", true, false )); + } + + @Test + public void testResumeGatewaySender_ErrorConditions() { + + Integer punePort = (Integer) vm1.invoke(() -> createFirstLocatorWithDSId( 1 )); + + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(DISTRIBUTED_SYSTEM_ID, "1"); + props.setProperty(LOCATORS, "localhost[" + punePort + "]"); + setUpJmxManagerOnVm0ThenConnect(props); + + Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, punePort )); + + vm3.invoke(() -> createCache( punePort )); + vm3.invoke(() -> createSender( "ln", + 2, false, 100, 400, false, false, null, true )); + + final DistributedMember vm1Member = (DistributedMember) vm3.invoke(() -> getMember()); + + String command = CliStrings.RESUME_GATEWAYSENDER + " --" + + CliStrings.RESUME_GATEWAYSENDER__ID + "=ln --" + + CliStrings.RESUME_GATEWAYSENDER__MEMBER + "=" + vm1Member.getId() + + " --" + CliStrings.RESUME_GATEWAYSENDER__GROUP + "=SenderGroup1"; + CommandResult cmdResult = executeCommand(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testResumeGatewaySender_ErrorConditions stringResult : " + + strCmdResult + ">>>>"); + assertEquals(Result.Status.ERROR, cmdResult.getStatus()); + assertTrue(strCmdResult.contains(CliStrings.PROVIDE_EITHER_MEMBER_OR_GROUP_MESSAGE)); + } else { + fail("testPauseGatewaySender failed as did not get CommandResult"); + } + } + + @Test + public void testResumeGatewaySender() { + + Integer punePort = (Integer) vm1.invoke(() -> createFirstLocatorWithDSId( 1 )); + + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(DISTRIBUTED_SYSTEM_ID, "1"); + props.setProperty(LOCATORS, "localhost[" + punePort + "]"); + setUpJmxManagerOnVm0ThenConnect(props); + + Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, punePort )); + + vm3.invoke(() -> createCache( punePort )); + vm3.invoke(() -> createSender( "ln", + 2, false, 100, 400, false, false, null, true )); + vm4.invoke(() -> createCache( punePort )); + vm4.invoke(() -> createSender( "ln", + 2, false, 100, 400, false, false, null, true )); + vm5.invoke(() -> createCache( punePort )); + vm5.invoke(() -> createSender( "ln", + 2, false, 100, 400, false, false, null, true )); + + vm3.invoke(() -> startSender( "ln" )); + vm4.invoke(() -> startSender( "ln" )); + vm5.invoke(() -> startSender( "ln" )); + + vm3.invoke(() -> verifySenderState( + "ln", true, false )); + vm4.invoke(() -> verifySenderState( + "ln", true, false )); + vm5.invoke(() -> verifySenderState( + "ln", true, false )); + + vm3.invoke(() -> pauseSender( "ln" )); + vm4.invoke(() -> pauseSender( "ln" )); + vm5.invoke(() -> pauseSender( "ln" )); + + vm3.invoke(() -> verifySenderState( + "ln", true, true )); + vm4.invoke(() -> verifySenderState( + "ln", true, true )); + vm5.invoke(() -> verifySenderState( + "ln", true, true )); + + pause(10000); + String command = CliStrings.RESUME_GATEWAYSENDER + " --" + + CliStrings.RESUME_GATEWAYSENDER__ID + "=ln"; + CommandResult cmdResult = executeCommand(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testResumeGatewaySender stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData resultData = (TabularResultData) cmdResult + .getResultData(); + List<String> status = resultData.retrieveAllValues("Result"); + assertEquals(5, status.size()); + assertTrue(status.contains("Error")); + assertTrue(status.contains("OK")); + } else { + fail("testResumeGatewaySender failed as did not get CommandResult"); + } + + vm3.invoke(() -> verifySenderState( + "ln", true, false )); + vm4.invoke(() -> verifySenderState( + "ln", true, false )); + vm5.invoke(() -> verifySenderState( + "ln", true, false )); + } + + /** + * test to validate that the start gateway sender starts the gateway sender on + * a member + */ + @Test + public void testResumeGatewaySender_onMember() { + + Integer punePort = (Integer) vm1.invoke(() -> createFirstLocatorWithDSId( 1 )); + + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(DISTRIBUTED_SYSTEM_ID, "1"); + props.setProperty(LOCATORS, "localhost[" + punePort + "]"); + setUpJmxManagerOnVm0ThenConnect(props); + + Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, punePort )); + + vm3.invoke(() -> createCache( punePort )); + vm3.invoke(() -> createSender( "ln", + 2, false, 100, 400, false, false, null, true )); + + vm3.invoke(() -> startSender( "ln" )); + + vm3.invoke(() -> verifySenderState( + "ln", true, false )); + + vm3.invoke(() -> pauseSender( "ln" )); + + vm3.invoke(() -> verifySenderState( + "ln", true, true )); + + final DistributedMember vm1Member = (DistributedMember) vm3.invoke(() -> getMember()); + pause(10000); + String command = CliStrings.RESUME_GATEWAYSENDER + " --" + + CliStrings.RESUME_GATEWAYSENDER__ID + "=ln --" + + CliStrings.RESUME_GATEWAYSENDER__MEMBER + "=" + vm1Member.getId(); + CommandResult cmdResult = executeCommand(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testResumeGatewaySender stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + assertTrue(strCmdResult.contains("is resumed on member")); + } else { + fail("testResumeGatewaySender failed as did not get CommandResult"); + } + + vm3.invoke(() -> verifySenderState( + "ln", true, false )); + } + + /** + * test to validate that the start gateway sender starts the gateway sender on + * a group of members + */ + @Test + public void testResumeGatewaySender_Group() { + + Integer punePort = (Integer) vm1.invoke(() -> createFirstLocatorWithDSId( 1 )); + + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(DISTRIBUTED_SYSTEM_ID, "1"); + props.setProperty(LOCATORS, "localhost[" + punePort + "]"); + setUpJmxManagerOnVm0ThenConnect(props); + + Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, punePort )); + + vm3.invoke(() -> createCacheWithGroups( + punePort, "SenderGroup1" )); + vm3.invoke(() -> createSender( "ln", + 2, false, 100, 400, false, false, null, true )); + vm4.invoke(() -> createCacheWithGroups( + punePort, "SenderGroup1" )); + vm4.invoke(() -> createSender( "ln", + 2, false, 100, 400, false, false, null, true )); + vm5.invoke(() -> createCacheWithGroups( + punePort, "SenderGroup1" )); + vm5.invoke(() -> createSender( "ln", + 2, false, 100, 400, false, false, null, true )); + + vm3.invoke(() -> startSender( "ln" )); + vm4.invoke(() -> startSender( "ln" )); + vm5.invoke(() -> startSender( "ln" )); + + vm3.invoke(() -> verifySenderState( + "ln", true, false )); + vm4.invoke(() -> verifySenderState( + "ln", true, false )); + vm5.invoke(() -> verifySenderState( + "ln", true, false )); + + vm3.invoke(() -> pauseSender( "ln" )); + vm4.invoke(() -> pauseSender( "ln" )); + vm5.invoke(() -> pauseSender( "ln" )); + + vm3.invoke(() -> verifySenderState( + "ln", true, true )); + vm4.invoke(() -> verifySenderState( + "ln", true, true )); + vm5.invoke(() -> verifySenderState( + "ln", true, true )); + + pause(10000); + String command = CliStrings.RESUME_GATEWAYSENDER + " --" + + CliStrings.RESUME_GATEWAYSENDER__ID + "=ln --" + + CliStrings.RESUME_GATEWAYSENDER__GROUP + "=SenderGroup1"; + CommandResult cmdResult = executeCommand(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testResumeGatewaySender stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData resultData = (TabularResultData) cmdResult + .getResultData(); + List<String> status = resultData.retrieveAllValues("Result"); + assertEquals(3, status.size()); + assertFalse(status.contains("Error")); + assertTrue(status.contains("OK")); + } else { + fail("testResumeGatewaySender failed as did not get CommandResult"); + } + + vm3.invoke(() -> verifySenderState( + "ln", true, false )); + vm4.invoke(() -> verifySenderState( + "ln", true, false )); + vm5.invoke(() -> verifySenderState( + "ln", true, false )); + } + + /** + * Test to validate the scenario gateway sender is started when one or more + * sender members belongs to multiple groups + */ + @Test + public void testResumeGatewaySender_MultipleGroup() { + + Integer punePort = (Integer) vm1.invoke(() -> createFirstLocatorWithDSId( 1 )); + + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(DISTRIBUTED_SYSTEM_ID, "1"); + props.setProperty(LOCATORS, "localhost[" + punePort + "]"); + setUpJmxManagerOnVm0ThenConnect(props); + + Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, punePort )); + + vm3.invoke(() -> createCacheWithGroups( + punePort, "SenderGroup1" )); + vm3.invoke(() -> createSender( "ln", + 2, false, 100, 400, false, false, null, true )); + vm4.invoke(() -> createCacheWithGroups( + punePort, "SenderGroup1" )); + vm4.invoke(() -> createSender( "ln", + 2, false, 100, 400, false, false, null, true )); + vm5.invoke(() -> createCacheWithGroups( + punePort, "SenderGroup1, SenderGroup2" )); + vm5.invoke(() -> createSender( "ln", + 2, false, 100, 400, false, false, null, true )); + vm6.invoke(() -> createCacheWithGroups( + punePort, "SenderGroup2" )); + vm6.invoke(() -> createSender( "ln", + 2, false, 100, 400, false, false, null, true )); + vm7.invoke(() -> createCacheWithGroups( + punePort, "SenderGroup3" )); + vm7.invoke(() -> createSender( "ln", + 2, false, 100, 400, false, false, null, true )); + + vm3.invoke(() -> startSender( "ln" )); + vm4.invoke(() -> startSender( "ln" )); + vm5.invoke(() -> startSender( "ln" )); + vm6.invoke(() -> startSender( "ln" )); + vm7.invoke(() -> startSender( "ln" )); + + vm3.invoke(() -> verifySenderState( + "ln", true, false )); + vm4.invoke(() -> verifySenderState( + "ln", true, false )); + vm5.invoke(() -> verifySenderState( + "ln", true, false )); + vm6.invoke(() -> verifySenderState( + "ln", true, false )); + vm7.invoke(() -> verifySenderState( + "ln", true, false )); + + vm3.invoke(() -> pauseSender( "ln" )); + vm4.invoke(() -> pauseSender( "ln" )); + vm5.invoke(() -> pauseSender( "ln" )); + vm6.invoke(() -> pauseSender( "ln" )); + vm7.invoke(() -> pauseSender( "ln" )); + + vm3.invoke(() -> verifySenderState( + "ln", true, true )); + vm4.invoke(() -> verifySenderState( + "ln", true, true )); + vm5.invoke(() -> verifySenderState( + "ln", true, true )); + vm6.invoke(() -> verifySenderState( + "ln", true, true )); + vm7.invoke(() -> verifySenderState( + "ln", true, true )); + + pause(10000); + String command = CliStrings.RESUME_GATEWAYSENDER + " --" + + CliStrings.RESUME_GATEWAYSENDER__ID + "=ln --" + + CliStrings.RESUME_GATEWAYSENDER__GROUP + + "=SenderGroup1,SenderGroup2"; + CommandResult cmdResult = executeCommand(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testResumeGatewaySender stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + TabularResultData resultData = (TabularResultData) cmdResult + .getResultData(); + List<String> status = resultData.retrieveAllValues("Result"); + assertEquals(4, status.size()); + assertFalse(status.contains("Error")); + assertTrue(status.contains("OK")); + } else { + fail("testResumeGatewaySender failed as did not get CommandResult"); + } + + vm3.invoke(() -> verifySenderState( + "ln", true, false )); + vm4.invoke(() -> verifySenderState( + "ln", true, false )); + vm5.invoke(() -> verifySenderState( + "ln", true, false )); + vm6.invoke(() -> verifySenderState( + "ln", true, false )); + vm7.invoke(() -> verifySenderState( + "ln", true, true )); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/WanCommandStatusDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/WanCommandStatusDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/WanCommandStatusDUnitTest.java new file mode 100644 index 0000000..9271614 --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/WanCommandStatusDUnitTest.java @@ -0,0 +1,546 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.wancommand; + +import com.gemstone.gemfire.distributed.DistributedMember; +import com.gemstone.gemfire.management.cli.Result; +import com.gemstone.gemfire.management.internal.cli.i18n.CliStrings; +import com.gemstone.gemfire.management.internal.cli.result.CommandResult; +import com.gemstone.gemfire.management.internal.cli.result.CompositeResultData; +import com.gemstone.gemfire.management.internal.cli.result.TabularResultData; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.List; +import java.util.Properties; + +import static com.gemstone.gemfire.distributed.ConfigurationProperties.LOCATORS; +import static com.gemstone.gemfire.distributed.ConfigurationProperties.MCAST_PORT; +import static com.gemstone.gemfire.test.dunit.Assert.*; +import static com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter; +import static com.gemstone.gemfire.test.dunit.Wait.pause; + +@Category(DistributedTest.class) +public class WanCommandStatusDUnitTest extends WANCommandTestBase{ + + private static final long serialVersionUID = 1L; + + @Test + public void testGatewaySenderStatus(){ + + Integer lnPort = (Integer) vm1.invoke(() -> createFirstLocatorWithDSId( 1 )); + + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, "localhost[" + lnPort + "]"); + setUpJmxManagerOnVm0ThenConnect(props); + + Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, lnPort )); + + vm6.invoke(() -> createAndStartReceiver( nyPort )); + + vm3.invoke(() -> createCache( lnPort )); + vm3.invoke(() -> createSender( + "ln_Serial", 2, false, 100, 400, false, false, null, true )); + vm3.invoke(() -> createSender( + "ln_Parallel", 2, true, 100, 400, false, false, null, true)); + + vm4.invoke(() -> createCache( lnPort )); + vm4.invoke(() -> createSender( + "ln_Serial", 2, false, 100, 400, false, false, null, true)); + vm4.invoke(() -> createSender( + "ln_Parallel", 2, true, 100, 400, false, false, null, true)); + + vm5.invoke(() -> createCache( lnPort )); + vm5.invoke(() -> createSender( + "ln_Serial", 2, false, 100, 400, false, false, null, true)); + vm5.invoke(() -> createSender( + "ln_Parallel", 2, true, 100, 400, false, false, null, true)); + + pause(10000); + String command = CliStrings.STATUS_GATEWAYSENDER + " --" + + CliStrings.STATUS_GATEWAYSENDER__ID + "=ln_Serial"; + CommandResult cmdResult = executeCommand(command); + if (cmdResult != null) { + TabularResultData tableResultData = + ((CompositeResultData)cmdResult.getResultData()).retrieveSection(CliStrings.SECTION_GATEWAY_SENDER_AVAILABLE).retrieveTable(CliStrings.TABLE_GATEWAY_SENDER); + List<String> result_Status = tableResultData.retrieveAllValues(CliStrings.RESULT_STATUS); + assertEquals(3, result_Status.size()); + assertFalse(result_Status.contains(CliStrings.GATEWAY_RUNNING)); + + tableResultData = + ((CompositeResultData)cmdResult.getResultData()).retrieveSection(CliStrings.SECTION_GATEWAY_SENDER_NOT_AVAILABLE).retrieveTable(CliStrings.TABLE_GATEWAY_SENDER); + List<String> result_hosts = tableResultData.retrieveAllValues(CliStrings.RESULT_HOST_MEMBER); + assertEquals(2, result_hosts.size()); + + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testGatewaySenderStatus : " + strCmdResult + ">>>>> "); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + } else { + fail("testListGatewaySender failed as did not get CommandResult"); + } + + vm3.invoke(() -> startSender( "ln_Serial" )); + vm3.invoke(() -> startSender( "ln_Parallel" )); + + vm4.invoke(() -> startSender( "ln_Serial" )); + vm4.invoke(() -> startSender( "ln_Parallel" )); + + vm5.invoke(() -> startSender( "ln_Serial" )); + vm5.invoke(() -> startSender( "ln_Parallel" )); + + pause(10000); + command = CliStrings.STATUS_GATEWAYSENDER + " --" + + CliStrings.STATUS_GATEWAYSENDER__ID + "=ln_Serial"; + cmdResult = executeCommand(command); + if (cmdResult != null) { + TabularResultData tableResultData = + ((CompositeResultData)cmdResult.getResultData()).retrieveSection(CliStrings.SECTION_GATEWAY_SENDER_AVAILABLE).retrieveTable(CliStrings.TABLE_GATEWAY_SENDER); + List<String> result_Status = tableResultData.retrieveAllValues(CliStrings.RESULT_STATUS); + assertEquals(3, result_Status.size()); + assertFalse(result_Status.contains(CliStrings.GATEWAY_NOT_RUNNING)); + + tableResultData = + ((CompositeResultData)cmdResult.getResultData()).retrieveSection(CliStrings.SECTION_GATEWAY_SENDER_NOT_AVAILABLE).retrieveTable(CliStrings.TABLE_GATEWAY_SENDER); + List<String> result_hosts = tableResultData.retrieveAllValues(CliStrings.RESULT_HOST_MEMBER); + assertEquals(2, result_hosts.size()); + + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testGatewaySenderStatus : " + strCmdResult + ">>>>> "); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + } else { + fail("testListGatewaySender failed as did not get CommandResult"); + } + } + + @Test + public void testGatewaySenderStatus_OnMember(){ + + Integer lnPort = (Integer) vm1.invoke(() -> createFirstLocatorWithDSId( 1 )); + + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, "localhost[" + lnPort + "]"); + setUpJmxManagerOnVm0ThenConnect(props); + + Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, lnPort )); + + vm6.invoke(() -> createAndStartReceiver( nyPort )); + + vm3.invoke(() -> createCache( lnPort )); + vm3.invoke(() -> createSender( + "ln_Serial", 2, false, 100, 400, false, false, null, true )); + vm3.invoke(() -> createSender( + "ln_Parallel", 2, true, 100, 400, false, false, null, true)); + + vm4.invoke(() -> createCache( lnPort )); + vm4.invoke(() -> createSender( + "ln_Serial", 2, false, 100, 400, false, false, null, true)); + vm4.invoke(() -> createSender( + "ln_Parallel", 2, true, 100, 400, false, false, null, true)); + + vm5.invoke(() -> createCache( lnPort )); + + final DistributedMember vm1Member = (DistributedMember) vm3.invoke(() -> getMember()); + + pause(10000); + String command = CliStrings.STATUS_GATEWAYSENDER + " --" + + CliStrings.STATUS_GATEWAYSENDER__ID + "=ln_Serial --" + + CliStrings.STATUS_GATEWAYSENDER__MEMBER + "=" + vm1Member.getId(); + + CommandResult cmdResult = executeCommand(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info("testGatewaySenderStatus_OnMember : " + strCmdResult + ">>>>> "); + TabularResultData tableResultData = + ((CompositeResultData)cmdResult.getResultData()).retrieveSection(CliStrings.SECTION_GATEWAY_SENDER_AVAILABLE).retrieveTable(CliStrings.TABLE_GATEWAY_SENDER); + List<String> result_Status = tableResultData.retrieveAllValues(CliStrings.RESULT_STATUS); + assertEquals(1, result_Status.size()); + assertFalse(result_Status.contains(CliStrings.GATEWAY_RUNNING)); + + + assertEquals(Result.Status.OK, cmdResult.getStatus()); + } else { + fail("testListGatewaySender failed as did not get CommandResult"); + } + + vm3.invoke(() -> startSender( "ln_Serial" )); + vm3.invoke(() -> startSender( "ln_Parallel" )); + + vm4.invoke(() -> startSender( "ln_Serial" )); + vm4.invoke(() -> startSender( "ln_Parallel" )); + + pause(10000); + command = CliStrings.STATUS_GATEWAYSENDER + " --" + + CliStrings.STATUS_GATEWAYSENDER__ID + "=ln_Serial --" + + CliStrings.STATUS_GATEWAYSENDER__MEMBER + "=" + vm1Member.getId(); + + cmdResult = executeCommand(command); + if (cmdResult != null) { +// TabularResultData tableResultData = +// (TabularResultData) cmdResult.getResultData(); +// List<String> result_Status = tableResultData.retrieveAllValues(CliStrings.RESULT_STATUS); +// assertIndexDetailsEquals(1, result_Status.size()); +// assertFalse(result_Status.contains(CliStrings.GATEWAY_NOT_RUNNING)); + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info("testGatewaySenderStatus_OnMember : " + strCmdResult + ">>>>> "); + TabularResultData tableResultData = + ((CompositeResultData)cmdResult.getResultData()).retrieveSection(CliStrings.SECTION_GATEWAY_SENDER_AVAILABLE).retrieveTable(CliStrings.TABLE_GATEWAY_SENDER); + List<String> result_Status = tableResultData.retrieveAllValues(CliStrings.RESULT_STATUS); + assertEquals(1, result_Status.size()); + assertFalse(result_Status.contains(CliStrings.GATEWAY_NOT_RUNNING)); + + assertEquals(Result.Status.OK, cmdResult.getStatus()); + } else { + fail("testListGatewaySender failed as did not get CommandResult"); + } + + final DistributedMember vm5Member = (DistributedMember) vm5.invoke(() -> getMember()); + + command = CliStrings.STATUS_GATEWAYSENDER + " --" + + CliStrings.STATUS_GATEWAYSENDER__ID + "=ln_Serial --" + + CliStrings.STATUS_GATEWAYSENDER__MEMBER + "=" + vm5Member.getId(); + cmdResult = executeCommand(command); + if (cmdResult != null) { +// ErrorResultData errorResultData = +// (ErrorResultData) cmdResult.getResultData(); + assertTrue(cmdResult != null); + + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info("testGatewaySenderStatus_OnMember : " + strCmdResult + ">>>>> "); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + } else { + fail("testListGatewaySender failed as did not get CommandResult"); + } + } + + @Test + public void testGatewaySenderStatus_OnGroups(){ + + Integer lnPort = (Integer) vm1.invoke(() -> createFirstLocatorWithDSId( 1 )); + + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, "localhost[" + lnPort + "]"); + setUpJmxManagerOnVm0ThenConnect(props); + + Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, lnPort )); + + vm7.invoke(() -> createAndStartReceiver( nyPort )); + + vm3.invoke(() -> createCacheWithGroups( lnPort, "Serial_Sender, Parallel_Sender")); + vm3.invoke(() -> createSender( + "ln_Serial", 2, false, 100, 400, false, false, null, true )); + vm3.invoke(() -> createSender( + "ln_Parallel", 2, true, 100, 400, false, false, null, true)); + + vm4.invoke(() -> createCacheWithGroups( lnPort,"Serial_Sender, Parallel_Sender")); + vm4.invoke(() -> createSender( + "ln_Serial", 2, false, 100, 400, false, false, null, true)); + vm4.invoke(() -> createSender( + "ln_Parallel", 2, true, 100, 400, false, false, null, true)); + + vm5.invoke(() -> createCacheWithGroups( lnPort,"Parallel_Sender")); + vm5.invoke(() -> createSender( + "ln_Serial", 2, false, 100, 400, false, false, null, true)); + vm5.invoke(() -> createSender( + "ln_Parallel", 2, true, 100, 400, false, false, null, true)); + + vm6.invoke(() -> createCacheWithGroups( lnPort,"Serial_Sender")); + + final DistributedMember vm1Member = (DistributedMember) vm3.invoke(() -> getMember()); + + pause(10000); + String command = CliStrings.STATUS_GATEWAYSENDER + " --" + + CliStrings.STATUS_GATEWAYSENDER__ID + "=ln_Serial --" + CliStrings.STATUS_GATEWAYSENDER__GROUP + "=Serial_Sender"; + + CommandResult cmdResult = executeCommand(command); + if (cmdResult != null) { + TabularResultData tableResultData = + ((CompositeResultData)cmdResult.getResultData()).retrieveSection(CliStrings.SECTION_GATEWAY_SENDER_AVAILABLE).retrieveTable(CliStrings.TABLE_GATEWAY_SENDER); + List<String> result_Status = tableResultData.retrieveAllValues(CliStrings.RESULT_STATUS); + assertEquals(2, result_Status.size()); + assertFalse(result_Status.contains(CliStrings.GATEWAY_RUNNING)); + + tableResultData = + ((CompositeResultData)cmdResult.getResultData()).retrieveSection(CliStrings.SECTION_GATEWAY_SENDER_NOT_AVAILABLE).retrieveTable(CliStrings.TABLE_GATEWAY_SENDER); + List<String> result_hosts = tableResultData.retrieveAllValues(CliStrings.RESULT_HOST_MEMBER); + assertEquals(1, result_hosts.size()); + + + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testGatewaySenderStatus_OnGroups : " + strCmdResult + ">>>>> "); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + } else { + fail("testListGatewaySender failed as did not get CommandResult"); + } + + vm3.invoke(() -> startSender( "ln_Serial" )); + vm3.invoke(() -> startSender( "ln_Parallel" )); + + vm4.invoke(() -> startSender( "ln_Serial" )); + vm4.invoke(() -> startSender( "ln_Parallel" )); + + pause(10000); + command = CliStrings.STATUS_GATEWAYSENDER + " --" + + CliStrings.STATUS_GATEWAYSENDER__ID + "=ln_Serial --" + CliStrings.STATUS_GATEWAYSENDER__GROUP + "=Serial_Sender"; + + cmdResult = executeCommand(command); + if (cmdResult != null) { + TabularResultData tableResultData = + ((CompositeResultData)cmdResult.getResultData()).retrieveSection(CliStrings.SECTION_GATEWAY_SENDER_AVAILABLE).retrieveTable(CliStrings.TABLE_GATEWAY_SENDER); + List<String> result_Status = tableResultData.retrieveAllValues(CliStrings.RESULT_STATUS); + assertEquals(2, result_Status.size()); + assertFalse(result_Status.contains(CliStrings.GATEWAY_NOT_RUNNING)); + + tableResultData = + ((CompositeResultData)cmdResult.getResultData()).retrieveSection(CliStrings.SECTION_GATEWAY_SENDER_NOT_AVAILABLE).retrieveTable(CliStrings.TABLE_GATEWAY_SENDER); + List<String> result_hosts = tableResultData.retrieveAllValues(CliStrings.RESULT_HOST_MEMBER); + assertEquals(1, result_hosts.size()); + + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testGatewaySenderStatus_OnGroups : " + strCmdResult + ">>>>> "); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + } else { + fail("testListGatewaySender failed as did not get CommandResult"); + } + } + + @Test + public void testGatewayReceiverStatus(){ + + Integer lnPort = (Integer) vm1.invoke(() -> createFirstLocatorWithDSId( 1 )); + + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, "localhost[" + lnPort + "]"); + setUpJmxManagerOnVm0ThenConnect(props); + + Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, lnPort )); + + vm6.invoke(() -> createAndStartReceiver( nyPort )); + + vm3.invoke(() -> createAndStartReceiver( lnPort )); + vm4.invoke(() -> createAndStartReceiver( lnPort )); + vm5.invoke(() -> createAndStartReceiver( lnPort )); + + pause(10000); + String command = CliStrings.STATUS_GATEWAYRECEIVER; + CommandResult cmdResult = executeCommand(command); + + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testGatewayReceiverStatus : " + strCmdResult + ">>>>> "); + + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData tableResultData = + ((CompositeResultData)cmdResult.getResultData()).retrieveSection(CliStrings.SECTION_GATEWAY_RECEIVER_AVAILABLE).retrieveTable(CliStrings.TABLE_GATEWAY_RECEIVER); + + List<String> result_Status = tableResultData.retrieveAllValues(CliStrings.RESULT_STATUS); + assertEquals(3, result_Status.size()); + assertFalse(result_Status.contains(CliStrings.GATEWAY_NOT_RUNNING)); + + tableResultData = + ((CompositeResultData)cmdResult.getResultData()).retrieveSection(CliStrings.SECTION_GATEWAY_RECEIVER_NOT_AVAILABLE).retrieveTable(CliStrings.TABLE_GATEWAY_RECEIVER); + List<String> result_hosts = tableResultData.retrieveAllValues(CliStrings.RESULT_HOST_MEMBER); + assertEquals(2, result_hosts.size()); + + } else { + fail("testGatewayReceiverStatus failed as did not get CommandResult"); + } + + vm3.invoke(() -> stopReceiver()); + vm4.invoke(() -> stopReceiver()); + vm5.invoke(() -> stopReceiver()); + + pause(10000); + + command = CliStrings.STATUS_GATEWAYRECEIVER; + cmdResult = executeCommand(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testGatewayReceiverStatus : " + strCmdResult + ">>>>> "); + + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData tableResultData = + ((CompositeResultData)cmdResult.getResultData()).retrieveSection(CliStrings.SECTION_GATEWAY_RECEIVER_AVAILABLE).retrieveTable(CliStrings.TABLE_GATEWAY_RECEIVER); + + List<String> result_Status = tableResultData.retrieveAllValues(CliStrings.RESULT_STATUS); + assertEquals(3, result_Status.size()); + assertFalse(result_Status.contains(CliStrings.GATEWAY_RUNNING)); + + tableResultData = + ((CompositeResultData)cmdResult.getResultData()).retrieveSection(CliStrings.SECTION_GATEWAY_RECEIVER_NOT_AVAILABLE).retrieveTable(CliStrings.TABLE_GATEWAY_RECEIVER); + List<String> result_hosts = tableResultData.retrieveAllValues(CliStrings.RESULT_HOST_MEMBER); + assertEquals(2, result_hosts.size()); + + } else { + fail("testGatewayReceiverStatus failed as did not get CommandResult"); + } + } + + @Test + public void testGatewayReceiverStatus_OnMember(){ + + Integer lnPort = (Integer) vm1.invoke(() -> createFirstLocatorWithDSId( 1 )); + + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, "localhost[" + lnPort + "]"); + setUpJmxManagerOnVm0ThenConnect(props); + + Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, lnPort )); + + vm6.invoke(() -> createAndStartReceiver( nyPort )); + + vm3.invoke(() -> createAndStartReceiver( lnPort )); + vm4.invoke(() -> createAndStartReceiver( lnPort )); + vm5.invoke(() -> createAndStartReceiver( lnPort )); + + final DistributedMember vm3Member = (DistributedMember) vm3.invoke(() -> getMember()); + + pause(10000); + String command = CliStrings.STATUS_GATEWAYRECEIVER+ " --" + + CliStrings.STATUS_GATEWAYRECEIVER__MEMBER + "=" + vm3Member.getId(); + + CommandResult cmdResult = executeCommand(command); + + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info("testGatewayReceiverStatus : " + strCmdResult + ">>>>> "); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + //TabularResultData tableResultData = (TabularResultData) cmdResult.getResultData(); + //List<String> result_Status = tableResultData.retrieveAllValues(CliStrings.RESULT_STATUS); + //assertIndexDetailsEquals(1, result_Status.size()); + //assertFalse(strCmdResult.contains(CliStrings.GATEWAY_NOT_RUNNING)); + TabularResultData tableResultData = + ((CompositeResultData)cmdResult.getResultData()).retrieveSection(CliStrings.SECTION_GATEWAY_RECEIVER_AVAILABLE).retrieveTable(CliStrings.TABLE_GATEWAY_RECEIVER); + List<String> result_Status = tableResultData.retrieveAllValues(CliStrings.RESULT_STATUS); + assertEquals(1, result_Status.size()); + assertFalse(result_Status.contains(CliStrings.GATEWAY_NOT_RUNNING)); + } else { + fail("testGatewayReceiverStatus failed as did not get CommandResult"); + } + + vm3.invoke(() -> stopReceiver()); + vm4.invoke(() -> stopReceiver()); + vm5.invoke(() -> stopReceiver()); + + pause(10000); + + command = CliStrings.STATUS_GATEWAYRECEIVER+ " --" + + CliStrings.STATUS_GATEWAYRECEIVER__MEMBER + "=" + vm3Member.getId(); + + cmdResult = executeCommand(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testGatewayReceiverStatus : " + strCmdResult + ">>>>> "); + +// TabularResultData tableResultData = +// (TabularResultData) cmdResult.getResultData(); +// List<String> result_Status = tableResultData.retrieveAllValues(CliStrings.RESULT_STATUS); +// assertIndexDetailsEquals(1, result_Status.size()); +// assertFalse(result_Status.contains(CliStrings.GATEWAY_RUNNING)); + + TabularResultData tableResultData = + ((CompositeResultData)cmdResult.getResultData()).retrieveSection(CliStrings.SECTION_GATEWAY_RECEIVER_AVAILABLE).retrieveTable(CliStrings.TABLE_GATEWAY_RECEIVER); + List<String> result_Status = tableResultData.retrieveAllValues(CliStrings.RESULT_STATUS); + assertEquals(1, result_Status.size()); + assertFalse(result_Status.contains(CliStrings.GATEWAY_RUNNING)); + } else { + fail("testGatewayReceiverStatus failed as did not get CommandResult"); + } + } + + @Test + public void testGatewayReceiverStatus_OnGroups(){ + + Integer lnPort = (Integer) vm1.invoke(() -> createFirstLocatorWithDSId( 1 )); + + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, "localhost[" + lnPort + "]"); + setUpJmxManagerOnVm0ThenConnect(props); + + Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, lnPort )); + + vm7.invoke(() -> createAndStartReceiver( nyPort )); + + vm3.invoke(() -> createAndStartReceiverWithGroup( lnPort, "RG1, RG2" )); + vm4.invoke(() -> createAndStartReceiverWithGroup( lnPort, "RG1, RG2" )); + vm5.invoke(() -> createAndStartReceiverWithGroup( lnPort, "RG1" )); + vm6.invoke(() -> createAndStartReceiverWithGroup( lnPort, "RG2" )); + + pause(10000); + String command = CliStrings.STATUS_GATEWAYRECEIVER + " --" + + CliStrings.STATUS_GATEWAYRECEIVER__GROUP + "=RG1"; + + CommandResult cmdResult = executeCommand(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testGatewayReceiverStatus : " + strCmdResult + ">>>>> "); + + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData tableResultData = + ((CompositeResultData)cmdResult.getResultData()).retrieveSection(CliStrings.SECTION_GATEWAY_RECEIVER_AVAILABLE).retrieveTable(CliStrings.TABLE_GATEWAY_RECEIVER); + + List<String> result_Status = tableResultData.retrieveAllValues(CliStrings.RESULT_STATUS); + assertEquals(3, result_Status.size()); + assertFalse(result_Status.contains(CliStrings.GATEWAY_NOT_RUNNING)); + + } else { + fail("testGatewayReceiverStatus failed as did not get CommandResult"); + } + + vm3.invoke(() -> stopReceiver()); + vm4.invoke(() -> stopReceiver()); + vm5.invoke(() -> stopReceiver()); + + pause(10000); + command = CliStrings.STATUS_GATEWAYRECEIVER + " --"+ CliStrings.STATUS_GATEWAYRECEIVER__GROUP + "=RG1"; + + cmdResult = executeCommand(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testGatewayReceiverStatus_OnGroups : " + strCmdResult + ">>>>> "); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData tableResultData = + ((CompositeResultData)cmdResult.getResultData()).retrieveSection(CliStrings.SECTION_GATEWAY_RECEIVER_AVAILABLE).retrieveTable(CliStrings.TABLE_GATEWAY_RECEIVER); + + List<String> result_Status = tableResultData.retrieveAllValues(CliStrings.RESULT_STATUS); + assertEquals(3, result_Status.size()); + assertFalse(result_Status.contains(CliStrings.GATEWAY_RUNNING)); + + } else { + fail("testGatewayReceiverStatus failed as did not get CommandResult"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/management/WANManagementDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/management/WANManagementDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/management/WANManagementDUnitTest.java new file mode 100644 index 0000000..c21cf26 --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/management/WANManagementDUnitTest.java @@ -0,0 +1,513 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.management; + +import org.junit.experimental.categories.Category; +import org.junit.Test; + +import static org.junit.Assert.*; + +import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; +import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +import java.util.Map; + +import javax.management.MBeanServer; +import javax.management.ObjectName; + +import com.gemstone.gemfire.cache.Cache; +import com.gemstone.gemfire.distributed.DistributedMember; +import com.gemstone.gemfire.distributed.Locator; +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; +import com.gemstone.gemfire.management.internal.MBeanJMXAdapter; +import com.gemstone.gemfire.test.dunit.Host; +import com.gemstone.gemfire.test.dunit.LogWriterUtils; +import com.gemstone.gemfire.test.dunit.SerializableRunnable; +import com.gemstone.gemfire.test.dunit.VM; + +/** + * Tests for WAN artifacts like Sender and Receiver. The purpose of this test is + * not to check WAN functionality , but to verify ManagementServices running + * properly and reflecting WAN behaviour and data properly + * + * + */ +@Category(DistributedTest.class) +public class WANManagementDUnitTest extends ManagementTestBase { + + private static final long serialVersionUID = 1L; + + + public static MBeanServer mbeanServer = MBeanJMXAdapter.mbeanServer; + + public WANManagementDUnitTest() throws Exception { + super(); + } + + @Test + public void testMBeanCallback() throws Exception { + + VM nyLocator = getManagedNodeList().get(0); + VM nyReceiver = getManagedNodeList().get(1); + VM puneSender = getManagedNodeList().get(2); + VM managing = getManagingNode(); + VM puneLocator = Host.getLocator(); + + int punePort = (Integer) puneLocator.invoke(() -> WANManagementDUnitTest.getLocatorPort()); + + Integer nyPort = (Integer)nyLocator.invoke(() -> WANTestBase.createFirstRemoteLocator( 12, punePort )); + + + + puneSender.invoke(() -> WANTestBase.createCache( punePort )); + managing.invoke(() -> WANTestBase.createManagementCache( punePort )); + startManagingNode(managing); + + // keep a larger batch to minimize number of exception occurrences in the + // log + puneSender.invoke(() -> WANTestBase.createSender( "pn", + 12, true, 100, 300, false, false, null, true )); + managing.invoke(() -> WANTestBase.createSender( "pn", + 12, true, 100, 300, false, false, null, true )); + + + puneSender.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", "pn", 1, 100, false )); + managing.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", "pn", 1, 100, false )); + + nyReceiver.invoke(() -> WANTestBase.createCache( nyPort )); + nyReceiver.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", null, 1, 100, false )); + nyReceiver.invoke(() -> WANTestBase.createReceiver()); + + WANTestBase.startSenderInVMs("pn", puneSender, managing); + + // make sure all the senders are running before doing any puts + puneSender.invoke(() -> WANTestBase.waitForSenderRunningState( "pn" )); + managing.invoke(() -> WANTestBase.waitForSenderRunningState( "pn" )); + + + + + checkSenderMBean(puneSender, getTestMethodName() + "_PR"); + checkSenderMBean(managing, getTestMethodName() + "_PR"); + + checkReceiverMBean(nyReceiver); + + stopGatewaySender(puneSender); + startGatewaySender(puneSender); + + DistributedMember puneMember = (DistributedMember) puneSender.invoke(() -> WANManagementDUnitTest.getMember()); + + checkProxySender(managing, puneMember); + checkSenderNavigationAPIS(managing, puneMember); + + } + + @Test + public void testReceiverMBean() throws Exception { + + VM nyLocator = getManagedNodeList().get(0); + VM nyReceiver = getManagedNodeList().get(1); + VM puneSender = getManagedNodeList().get(2); + VM managing = getManagingNode(); + VM puneLocator = Host.getLocator(); + + int punePort = (Integer) puneLocator.invoke(() -> WANManagementDUnitTest.getLocatorPort()); + + Integer nyPort = (Integer) nyLocator.invoke(() -> WANTestBase.createFirstRemoteLocator( 12, punePort )); + + puneSender.invoke(() -> WANTestBase.createCache( punePort )); + + nyReceiver.invoke(() -> WANTestBase.createCache( nyPort )); + nyReceiver.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", null, 1, 100, false )); + nyReceiver.invoke(() -> WANTestBase.createReceiver()); + + // keep a larger batch to minimize number of exception occurrences in the + // log + puneSender.invoke(() -> WANTestBase.createSender( "pn", + 12, true, 100, 300, false, false, null, true )); + + puneSender.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", "pn", 1, 100, false )); + + puneSender.invoke(() -> WANTestBase.startSender( "pn" )); + + // make sure all the senders are running before doing any puts + puneSender.invoke(() -> WANTestBase.waitForSenderRunningState( "pn" )); + + managing.invoke(() -> WANTestBase.createManagementCache( nyPort )); + startManagingNode(managing); + + + checkSenderMBean(puneSender, getTestMethodName() + "_PR"); + checkReceiverMBean(nyReceiver); + + DistributedMember nyMember = (DistributedMember) nyReceiver.invoke(() -> WANManagementDUnitTest.getMember()); + + checkProxyReceiver(managing, nyMember); + checkReceiverNavigationAPIS(managing, nyMember); + + + } + + + @Test + public void testAsyncEventQueue() throws Exception { + + VM nyLocator = getManagedNodeList().get(0); + VM nyReceiver = getManagedNodeList().get(1); + VM puneSender = getManagedNodeList().get(2); + VM managing = getManagingNode(); + VM puneLocator = Host.getLocator(); + + int punePort = (Integer) puneLocator.invoke(() -> WANManagementDUnitTest.getLocatorPort()); + + Integer nyPort = (Integer)nyLocator.invoke(() -> WANTestBase.createFirstRemoteLocator( 12, punePort )); + + + + puneSender.invoke(() -> WANTestBase.createCache( punePort )); + managing.invoke(() -> WANTestBase.createManagementCache( punePort )); + startManagingNode(managing); + + + puneSender.invoke(() -> WANTestBase.createAsyncEventQueue( + "pn", false, 100, 100, false, false, "puneSender", false )); + managing.invoke(() -> WANTestBase.createAsyncEventQueue( + "pn", false, 100, 100, false, false, "managing", false )); + + + puneSender.invoke(() -> WANTestBase.createReplicatedRegionWithAsyncEventQueue( + getTestMethodName() + "_RR", "pn", false )); + managing.invoke(() -> WANTestBase.createReplicatedRegionWithAsyncEventQueue( + getTestMethodName() + "_RR", "pn", false )); + + WANTestBase.createCacheInVMs(nyPort, nyReceiver); + nyReceiver.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", null, 1, 100, false )); + nyReceiver.invoke(() -> WANTestBase.createReceiver()); + + checkAsyncQueueMBean(puneSender); + checkAsyncQueueMBean(managing); + + + DistributedMember puneMember = (DistributedMember) puneSender.invoke(() -> WANManagementDUnitTest.getMember()); + + checkProxyAsyncQueue(managing, puneMember); + + } + + + @SuppressWarnings("serial") + protected void checkSenderNavigationAPIS(final VM vm, + final DistributedMember senderMember) { + SerializableRunnable checkNavigationAPIS = new SerializableRunnable( + "Check Sender Navigation APIs") { + public void run() { + Cache cache = GemFireCacheImpl.getInstance(); + ManagementService service = ManagementService + .getManagementService(cache); + DistributedSystemMXBean bean = service.getDistributedSystemMXBean(); + ObjectName expectedName = service.getGatewaySenderMBeanName( + senderMember, "pn"); + try { + ObjectName actualName = bean.fetchGatewaySenderObjectName( + senderMember.getId(), "pn"); + assertEquals(expectedName, actualName); + } catch (Exception e) { + fail("Sender Navigation Failed " + e); + } + + assertEquals(2, bean.listGatewaySenderObjectNames().length); + try { + assertEquals(1, bean.listGatewaySenderObjectNames(senderMember + .getId()).length); + } catch (Exception e) { + fail("Sender Navigation Failed " + e); + } + + } + }; + vm.invoke(checkNavigationAPIS); + } + + @SuppressWarnings("serial") + protected void checkReceiverNavigationAPIS(final VM vm, + final DistributedMember receiverMember) { + SerializableRunnable checkNavigationAPIS = new SerializableRunnable( + "Check Receiver Navigation APIs") { + public void run() { + Cache cache = GemFireCacheImpl.getInstance(); + ManagementService service = ManagementService + .getManagementService(cache); + DistributedSystemMXBean bean = service.getDistributedSystemMXBean(); + ObjectName expectedName = service + .getGatewayReceiverMBeanName(receiverMember); + try { + ObjectName actualName = bean + .fetchGatewayReceiverObjectName(receiverMember.getId()); + assertEquals(expectedName, actualName); + } catch (Exception e) { + fail("Receiver Navigation Failed " + e); + } + + assertEquals(1, bean.listGatewayReceiverObjectNames().length); + + } + }; + vm.invoke(checkNavigationAPIS); + } + + private static int getLocatorPort(){ + return Locator.getLocators().get(0).getPort(); + } + private static DistributedMember getMember(){ + return GemFireCacheImpl.getInstance().getMyId(); + } + + /** + * Checks Proxy GatewaySender + * + * @param vm + * reference to VM + */ + @SuppressWarnings("serial") + protected void checkProxySender(final VM vm, final DistributedMember senderMember) { + SerializableRunnable checkProxySender = new SerializableRunnable("Check Proxy Sender") { + public void run() { + Cache cache = GemFireCacheImpl.getInstance(); + ManagementService service = ManagementService + .getManagementService(cache); + GatewaySenderMXBean bean = null; + try { + bean = MBeanUtil.getGatewaySenderMbeanProxy(senderMember, "pn"); + } catch (Exception e) { + fail("Could not obtain Sender Proxy in desired time " + e); + } + assertNotNull(bean); + final ObjectName senderMBeanName = service.getGatewaySenderMBeanName( + senderMember, "pn"); + try { + MBeanUtil.printBeanDetails(senderMBeanName); + } catch (Exception e) { + fail("Error while Printing Bean Details " + e); + } + + if(service.isManager()){ + DistributedSystemMXBean dsBean = service.getDistributedSystemMXBean(); + Map<String, Boolean> dsMap = dsBean.viewRemoteClusterStatus(); + + LogWriterUtils.getLogWriter().info( + "<ExpectedString> Ds Map is: " + dsMap + + "</ExpectedString> "); + + } + + } + }; + vm.invoke(checkProxySender); + } + + /** + * Checks Proxy GatewayReceiver + * + * @param vm + * reference to VM + */ + @SuppressWarnings("serial") + protected void checkProxyReceiver(final VM vm, + final DistributedMember senderMember) { + SerializableRunnable checkProxySender = new SerializableRunnable( + "Check Proxy Receiver") { + public void run() { + Cache cache = GemFireCacheImpl.getInstance(); + ManagementService service = ManagementService + .getManagementService(cache); + GatewayReceiverMXBean bean = null; + try { + bean = MBeanUtil.getGatewayReceiverMbeanProxy(senderMember); + } catch (Exception e) { + fail("Could not obtain Sender Proxy in desired time " + e); + } + assertNotNull(bean); + final ObjectName receiverMBeanName = service + .getGatewayReceiverMBeanName(senderMember); + try { + MBeanUtil.printBeanDetails(receiverMBeanName); + } catch (Exception e) { + fail("Error while Printing Bean Details " + e); + } + + } + }; + vm.invoke(checkProxySender); + } + + + /** + * stops a gateway sender + * + * @param vm + * reference to VM + */ + @SuppressWarnings("serial") + protected void stopGatewaySender(final VM vm) { + SerializableRunnable stopGatewaySender = new SerializableRunnable("Stop Gateway Sender") { + public void run() { + Cache cache = GemFireCacheImpl.getInstance(); + ManagementService service = ManagementService + .getManagementService(cache); + GatewaySenderMXBean bean = service.getLocalGatewaySenderMXBean("pn"); + assertNotNull(bean); + bean.stop(); + assertFalse(bean.isRunning()); + } + }; + vm.invoke(stopGatewaySender); + } + + /** + * start a gateway sender + * + * @param vm + * reference to VM + */ + @SuppressWarnings("serial") + protected void startGatewaySender(final VM vm) { + SerializableRunnable stopGatewaySender = new SerializableRunnable("Start Gateway Sender") { + public void run() { + Cache cache = GemFireCacheImpl.getInstance(); + ManagementService service = ManagementService + .getManagementService(cache); + GatewaySenderMXBean bean = service.getLocalGatewaySenderMXBean("pn"); + assertNotNull(bean); + bean.start(); + assertTrue(bean.isRunning()); + } + }; + vm.invoke(stopGatewaySender); + } + + + + /** + * Checks whether a GatewayReceiverMBean is created or not + * + * @param vm + * reference to VM + */ + @SuppressWarnings("serial") + protected void checkReceiverMBean(final VM vm) { + SerializableRunnable checkMBean = new SerializableRunnable("Check Receiver MBean") { + public void run() { + Cache cache = GemFireCacheImpl.getInstance(); + ManagementService service = ManagementService + .getManagementService(cache); + GatewayReceiverMXBean bean = service.getLocalGatewayReceiverMXBean(); + assertNotNull(bean); + } + }; + vm.invoke(checkMBean); + } + + /** + * Checks whether a GatewayReceiverMBean is created or not + * + * @param vm + * reference to VM + */ + @SuppressWarnings("serial") + protected void checkSenderMBean(final VM vm, final String regionPath) { + SerializableRunnable checkMBean = new SerializableRunnable("Check Sender MBean") { + public void run() { + Cache cache = GemFireCacheImpl.getInstance(); + ManagementService service = ManagementService + .getManagementService(cache); + + GatewaySenderMXBean bean = service.getLocalGatewaySenderMXBean("pn"); + assertNotNull(bean); + assertTrue(bean.isConnected()); + + ObjectName regionBeanName = service.getRegionMBeanName(cache + .getDistributedSystem().getDistributedMember(), "/" + regionPath); + RegionMXBean rBean = service.getMBeanInstance(regionBeanName, + RegionMXBean.class); + assertTrue(rBean.isGatewayEnabled()); + + + } + }; + vm.invoke(checkMBean); + } + + /** + * Checks whether a Async Queue MBean is created or not + * + * @param vm + * reference to VM + */ + @SuppressWarnings("serial") + protected void checkAsyncQueueMBean(final VM vm) { + SerializableRunnable checkAsyncQueueMBean = new SerializableRunnable( + "Check Async Queue MBean") { + public void run() { + Cache cache = GemFireCacheImpl.getInstance(); + ManagementService service = ManagementService + .getManagementService(cache); + AsyncEventQueueMXBean bean = service.getLocalAsyncEventQueueMXBean("pn"); + assertNotNull(bean); + // Already in started State + } + }; + vm.invoke(checkAsyncQueueMBean); + } + + /** + * Checks Proxy Async Queue + * + * @param vm + * reference to VM + */ + @SuppressWarnings("serial") + protected void checkProxyAsyncQueue(final VM vm, + final DistributedMember senderMember) { + SerializableRunnable checkProxyAsyncQueue = new SerializableRunnable( + "Check Proxy Async Queue") { + public void run() { + Cache cache = GemFireCacheImpl.getInstance(); + ManagementService service = ManagementService + .getManagementService(cache); + AsyncEventQueueMXBean bean = null; + try { + bean = MBeanUtil.getAsyncEventQueueMBeanProxy(senderMember, "pn"); + } catch (Exception e) { + fail("Could not obtain Sender Proxy in desired time " + e); + } + assertNotNull(bean); + final ObjectName queueMBeanName = service.getAsyncEventQueueMBeanName( + senderMember, "pn"); + + try { + MBeanUtil.printBeanDetails(queueMBeanName); + } catch (Exception e) { + fail("Error while Printing Bean Details " + e); + } + + } + }; + vm.invoke(checkProxyAsyncQueue); + } +}
