http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/WanCommandCreateGatewayReceiverDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/WanCommandCreateGatewayReceiverDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/WanCommandCreateGatewayReceiverDUnitTest.java new file mode 100644 index 0000000..9b0953e --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/WanCommandCreateGatewayReceiverDUnitTest.java @@ -0,0 +1,630 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.wancommand; + +import com.gemstone.gemfire.cache.wan.GatewayReceiver; +import com.gemstone.gemfire.distributed.DistributedMember; +import com.gemstone.gemfire.management.cli.Result; +import com.gemstone.gemfire.management.internal.cli.i18n.CliStrings; +import com.gemstone.gemfire.management.internal.cli.result.CommandResult; +import com.gemstone.gemfire.management.internal.cli.result.TabularResultData; +import com.gemstone.gemfire.test.dunit.Host; +import com.gemstone.gemfire.test.dunit.VM; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import static com.gemstone.gemfire.distributed.ConfigurationProperties.LOCATORS; +import static com.gemstone.gemfire.distributed.ConfigurationProperties.MCAST_PORT; +import static com.gemstone.gemfire.test.dunit.Assert.*; +import static com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter; + +/** + * DUnit tests for 'create gateway-receiver' command. + */ +@Category(DistributedTest.class) +public class WanCommandCreateGatewayReceiverDUnitTest extends WANCommandTestBase { + + private static final long serialVersionUID = 1L; + + /** + * GatewayReceiver with all default attributes + */ + @Test + public void testCreateGatewayReceiverWithDefault() { + + VM puneLocator = Host.getLocator(); + int punePort = (Integer)puneLocator.invoke(() -> getLocatorPort()); + + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, "localhost[" + punePort + + "]"); + setUpJmxManagerOnVm0ThenConnect(props); + + Integer nyPort = (Integer)vm2.invoke(() -> createFirstRemoteLocator( 2, punePort )); + + vm3.invoke(() -> createCache( punePort )); + vm4.invoke(() -> createCache( punePort )); + vm5.invoke(() -> createCache( punePort )); + + String command = CliStrings.CREATE_GATEWAYRECEIVER; + CommandResult cmdResult = executeCommand(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testCreateGatewayReceiver stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData resultData = (TabularResultData)cmdResult + .getResultData(); + List<String> status = resultData.retrieveAllValues("Status"); + assertEquals(4, status.size());//expected size 4 includes the manager node + // verify there is no error in the status + for (int i = 0; i < status.size(); i++) { + assertTrue("GatewayReceiver creation failed with: " + status.get(i), + status.get(i).indexOf("ERROR:") == -1); + } + } + else { + fail("testCreateGatewayReceiver failed as did not get CommandResult"); + } + + vm3.invoke(() -> verifyReceiverCreationWithAttributes( !GatewayReceiver.DEFAULT_MANUAL_START, + GatewayReceiver.DEFAULT_START_PORT, + GatewayReceiver.DEFAULT_END_PORT, + GatewayReceiver.DEFAULT_BIND_ADDRESS, + GatewayReceiver.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, + GatewayReceiver.DEFAULT_SOCKET_BUFFER_SIZE, null )); + vm4.invoke(() -> verifyReceiverCreationWithAttributes( !GatewayReceiver.DEFAULT_MANUAL_START, + GatewayReceiver.DEFAULT_START_PORT, + GatewayReceiver.DEFAULT_END_PORT, + GatewayReceiver.DEFAULT_BIND_ADDRESS, + GatewayReceiver.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, + GatewayReceiver.DEFAULT_SOCKET_BUFFER_SIZE, null )); + vm5.invoke(() -> verifyReceiverCreationWithAttributes( !GatewayReceiver.DEFAULT_MANUAL_START, + GatewayReceiver.DEFAULT_START_PORT, + GatewayReceiver.DEFAULT_END_PORT, + GatewayReceiver.DEFAULT_BIND_ADDRESS, + GatewayReceiver.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, + GatewayReceiver.DEFAULT_SOCKET_BUFFER_SIZE, null )); + } + + /** + * GatewayReceiver with given attributes + */ + @Test + public void testCreateGatewayReceiver() { + + VM puneLocator = Host.getLocator(); + int punePort = (Integer)puneLocator.invoke(() -> getLocatorPort()); + + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, "localhost[" + punePort + + "]"); + setUpJmxManagerOnVm0ThenConnect(props); + + Integer nyPort = (Integer)vm2.invoke(() -> createFirstRemoteLocator( 2, punePort )); + + vm3.invoke(() -> createCache( punePort )); + vm4.invoke(() -> createCache( punePort )); + vm5.invoke(() -> createCache( punePort )); + + String command = CliStrings.CREATE_GATEWAYRECEIVER + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__MANUALSTART+ "=true" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__BINDADDRESS + "=localhost" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__STARTPORT + "=10000" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__ENDPORT + "=11000" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__MAXTIMEBETWEENPINGS + "=100000" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__SOCKETBUFFERSIZE + "=512000"; + CommandResult cmdResult = executeCommand(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testCreateGatewayReceiver stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData resultData = (TabularResultData)cmdResult + .getResultData(); + List<String> status = resultData.retrieveAllValues("Status"); + assertEquals(4, status.size());//expected size 4 includes the manager node + // verify there is no error in the status + for (int i = 0; i < status.size(); i++) { + assertTrue("GatewayReceiver creation failed with: " + status.get(i), + status.get(i).indexOf("ERROR:") == -1); + } + } + else { + fail("testCreateGatewayReceiver failed as did not get CommandResult"); + } + + vm3.invoke(() -> verifyReceiverCreationWithAttributes( false, 10000, + 11000, "localhost", 100000, 512000, null )); + vm4.invoke(() -> verifyReceiverCreationWithAttributes( false, 10000, + 11000, "localhost", 100000, 512000, null )); + vm5.invoke(() -> verifyReceiverCreationWithAttributes( false, 10000, + 11000, "localhost", 100000, 512000, null )); + } + + /** + * GatewayReceiver with given attributes and a single GatewayTransportFilter. + */ + @Test + public void testCreateGatewayReceiverWithGatewayTransportFilter() { + + VM puneLocator = Host.getLocator(); + int punePort = (Integer)puneLocator.invoke(() -> getLocatorPort()); + + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, "localhost[" + punePort + + "]"); + setUpJmxManagerOnVm0ThenConnect(props); + + Integer nyPort = (Integer)vm2.invoke(() -> createFirstRemoteLocator( 2, punePort )); + + vm3.invoke(() -> createCache( punePort )); + vm4.invoke(() -> createCache( punePort )); + vm5.invoke(() -> createCache( punePort )); + + String command = CliStrings.CREATE_GATEWAYRECEIVER + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__MANUALSTART+ "=false" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__BINDADDRESS + "=localhost" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__STARTPORT + "=10000" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__ENDPORT + "=11000" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__MAXTIMEBETWEENPINGS + "=100000" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__SOCKETBUFFERSIZE + "=512000" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__GATEWAYTRANSPORTFILTER + "=com.gemstone.gemfire.cache30.MyGatewayTransportFilter1"; + CommandResult cmdResult = executeCommand(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testCreateGatewayReceiver stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData resultData = (TabularResultData)cmdResult + .getResultData(); + List<String> status = resultData.retrieveAllValues("Status"); + assertEquals(4, status.size());//expected size 4 includes the manager node + // verify there is no error in the status + for (int i = 0; i < status.size(); i++) { + assertTrue("GatewayReceiver creation failed with: " + status.get(i), + status.get(i).indexOf("ERROR:") == -1); + } + } + else { + fail("testCreateGatewayReceiver failed as did not get CommandResult"); + } + + List<String> transportFilters = new ArrayList<String>(); + transportFilters.add("com.gemstone.gemfire.cache30.MyGatewayTransportFilter1"); + + vm3.invoke(() -> verifyReceiverCreationWithAttributes( true, 10000, + 11000, "localhost", 100000, 512000, transportFilters )); + vm4.invoke(() -> verifyReceiverCreationWithAttributes( true, 10000, + 11000, "localhost", 100000, 512000, transportFilters )); + vm5.invoke(() -> verifyReceiverCreationWithAttributes( true, 10000, + 11000, "localhost", 100000, 512000, transportFilters )); + } + + /** + * GatewayReceiver with given attributes and multiple GatewayTransportFilters. + */ + @Test + public void testCreateGatewayReceiverWithMultipleGatewayTransportFilters() { + + VM puneLocator = Host.getLocator(); + int punePort = (Integer)puneLocator.invoke(() -> getLocatorPort()); + + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, "localhost[" + punePort + + "]"); + setUpJmxManagerOnVm0ThenConnect(props); + + Integer nyPort = (Integer)vm2.invoke(() -> createFirstRemoteLocator( 2, punePort )); + + vm3.invoke(() -> createCache( punePort )); + vm4.invoke(() -> createCache( punePort )); + vm5.invoke(() -> createCache( punePort )); + + String command = CliStrings.CREATE_GATEWAYRECEIVER + " --" + + CliStrings.CREATE_GATEWAYRECEIVER__BINDADDRESS + "=localhost" + " --" + + CliStrings.CREATE_GATEWAYRECEIVER__STARTPORT + "=10000" + " --" + + CliStrings.CREATE_GATEWAYRECEIVER__ENDPORT + "=11000" + " --" + + CliStrings.CREATE_GATEWAYRECEIVER__MAXTIMEBETWEENPINGS + "=100000" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__SOCKETBUFFERSIZE + + "=512000" + " --" + + CliStrings.CREATE_GATEWAYRECEIVER__GATEWAYTRANSPORTFILTER + + "=com.gemstone.gemfire.cache30.MyGatewayTransportFilter1,com.gemstone.gemfire.cache30.MyGatewayTransportFilter2"; + CommandResult cmdResult = executeCommand(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testCreateGatewayReceiver stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData resultData = (TabularResultData)cmdResult + .getResultData(); + List<String> status = resultData.retrieveAllValues("Status"); + assertEquals(4, status.size());//expected size 4 includes the manager node + // verify there is no error in the status + for (int i = 0; i < status.size(); i++) { + assertTrue("GatewayReceiver creation failed with: " + status.get(i), + status.get(i).indexOf("ERROR:") == -1); + } + } + else { + fail("testCreateGatewayReceiver failed as did not get CommandResult"); + } + + List<String> transportFilters = new ArrayList<String>(); + transportFilters.add("com.gemstone.gemfire.cache30.MyGatewayTransportFilter1"); + transportFilters.add("com.gemstone.gemfire.cache30.MyGatewayTransportFilter2"); + + vm3.invoke(() -> verifyReceiverCreationWithAttributes( !GatewayReceiver.DEFAULT_MANUAL_START, 10000, + 11000, "localhost", 100000, 512000, transportFilters )); + vm4.invoke(() -> verifyReceiverCreationWithAttributes( !GatewayReceiver.DEFAULT_MANUAL_START, 10000, + 11000, "localhost", 100000, 512000, transportFilters )); + vm5.invoke(() -> verifyReceiverCreationWithAttributes( !GatewayReceiver.DEFAULT_MANUAL_START, 10000, + 11000, "localhost", 100000, 512000, transportFilters )); + } + + /** + * GatewayReceiver with given attributes. + * Error scenario where startPort is greater than endPort. + */ + @Test + public void testCreateGatewayReceiver_Error() { + + VM puneLocator = Host.getLocator(); + int punePort = (Integer)puneLocator.invoke(() -> getLocatorPort()); + + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, "localhost[" + punePort + + "]"); + setUpJmxManagerOnVm0ThenConnect(props); + + Integer nyPort = (Integer)vm2.invoke(() -> createFirstRemoteLocator( 2, punePort )); + + vm3.invoke(() -> createCache( punePort )); + vm4.invoke(() -> createCache( punePort )); + vm5.invoke(() -> createCache( punePort )); + + String command = CliStrings.CREATE_GATEWAYRECEIVER + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__BINDADDRESS + "=localhost" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__STARTPORT + "=11000" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__ENDPORT + "=10000" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__MAXTIMEBETWEENPINGS + "=100000" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__SOCKETBUFFERSIZE + "=512000"; + CommandResult cmdResult = executeCommand(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testCreateGatewayReceiver stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData resultData = (TabularResultData) cmdResult + .getResultData(); + List<String> status = resultData.retrieveAllValues("Status"); + assertEquals(4, status.size());// expected size 4 includes the manager + // node + // verify there is no error in the status + for (int i = 0; i < status.size(); i++) { + assertTrue("GatewayReceiver creation should have failed", status.get(i) + .indexOf("ERROR:") != -1); + } + } else { + fail("testCreateGatewayReceiver failed as did not get CommandResult"); + } + } + + /** + * GatewayReceiver with given attributes on the given member. + */ + @Test + public void testCreateGatewayReceiver_onMember() { + + VM puneLocator = Host.getLocator(); + int punePort = (Integer)puneLocator.invoke(() -> getLocatorPort()); + + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, "localhost[" + punePort + + "]"); + setUpJmxManagerOnVm0ThenConnect(props); + + Integer nyPort = (Integer)vm2.invoke(() -> createFirstRemoteLocator( 2, punePort )); + + vm3.invoke(() -> createCache( punePort )); + vm4.invoke(() -> createCache( punePort )); + vm5.invoke(() -> createCache( punePort )); + + final DistributedMember vm3Member = (DistributedMember) vm3.invoke(() -> getMember()); + + String command = CliStrings.CREATE_GATEWAYRECEIVER + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__MANUALSTART + "=true" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__BINDADDRESS + "=localhost" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__STARTPORT + "=10000" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__ENDPORT + "=11000" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__MAXTIMEBETWEENPINGS + "=100000" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__SOCKETBUFFERSIZE + "=512000" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__MEMBER + "=" + vm3Member.getId(); + CommandResult cmdResult = executeCommand(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testCreateGatewayReceiver stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData resultData = (TabularResultData)cmdResult + .getResultData(); + List<String> status = resultData.retrieveAllValues("Status"); + assertEquals(1, status.size()); + // verify there is no error in the status + for (int i = 0; i < status.size(); i++) { + assertTrue("GatewayReceiver creation failed with: " + status.get(i), + status.get(i).indexOf("ERROR:") == -1); + } + } + else { + fail("testCreateGatewayReceiver failed as did not get CommandResult"); + } + + vm3.invoke(() -> verifyReceiverCreationWithAttributes( false, 10000, + 11000, "localhost", 100000, 512000, null )); + } + + /** + * GatewayReceiver with given attributes on multiple members. + */ + @Test + public void testCreateGatewayReceiver_onMultipleMembers() { + + VM puneLocator = Host.getLocator(); + int punePort = (Integer)puneLocator.invoke(() -> getLocatorPort()); + + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, "localhost[" + punePort + + "]"); + setUpJmxManagerOnVm0ThenConnect(props); + + Integer nyPort = (Integer)vm2.invoke(() -> createFirstRemoteLocator( 2, punePort )); + + vm3.invoke(() -> createCache( punePort )); + vm4.invoke(() -> createCache( punePort )); + vm5.invoke(() -> createCache( punePort )); + + final DistributedMember vm3Member = (DistributedMember) vm3.invoke(() -> getMember()); + final DistributedMember vm4Member = (DistributedMember) vm4.invoke(() -> getMember()); + + String command = CliStrings.CREATE_GATEWAYRECEIVER + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__MANUALSTART + "=true" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__BINDADDRESS + "=localhost" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__STARTPORT + "=10000" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__ENDPORT + "=11000" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__MAXTIMEBETWEENPINGS + "=100000" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__SOCKETBUFFERSIZE + "=512000" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__MEMBER + "=" + vm3Member.getId() + "," + vm4Member.getId(); + CommandResult cmdResult = executeCommand(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testCreateGatewayReceiver stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData resultData = (TabularResultData)cmdResult + .getResultData(); + List<String> status = resultData.retrieveAllValues("Status"); + assertEquals(2, status.size()); + // verify there is no error in the status + for (int i = 0; i < status.size(); i++) { + assertTrue("GatewayReceiver creation failed with: " + status.get(i), + status.get(i).indexOf("ERROR:") == -1); + } + } + else { + fail("testCreateGatewayReceiver failed as did not get CommandResult"); + } + + vm3.invoke(() -> verifyReceiverCreationWithAttributes( false, 10000, + 11000, "localhost", 100000, 512000, null )); + vm4.invoke(() -> verifyReceiverCreationWithAttributes( false, 10000, + 11000, "localhost", 100000, 512000, null )); + } + + /** + * GatewayReceiver with given attributes on the given group. + */ + @Test + public void testCreateGatewayReceiver_onGroup() { + + VM puneLocator = Host.getLocator(); + int punePort = (Integer)puneLocator.invoke(() -> getLocatorPort()); + + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, "localhost[" + punePort + + "]"); + setUpJmxManagerOnVm0ThenConnect(props); + + Integer nyPort = (Integer)vm2.invoke(() -> createFirstRemoteLocator( 2, punePort )); + + vm3.invoke(() -> createCacheWithGroups( punePort, "receiverGroup1" )); + vm4.invoke(() -> createCacheWithGroups( punePort, "receiverGroup1" )); + vm5.invoke(() -> createCacheWithGroups( punePort, "receiverGroup1" )); + + String command = CliStrings.CREATE_GATEWAYRECEIVER + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__MANUALSTART + "=true" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__BINDADDRESS + "=localhost" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__STARTPORT + "=10000" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__ENDPORT + "=11000" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__MAXTIMEBETWEENPINGS + "=100000" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__SOCKETBUFFERSIZE + "=512000" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__GROUP + "=receiverGroup1"; + CommandResult cmdResult = executeCommand(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testCreateGatewayReceiver stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData resultData = (TabularResultData)cmdResult + .getResultData(); + List<String> status = resultData.retrieveAllValues("Status"); + assertEquals(3, status.size());// + // verify there is no error in the status + for (int i = 0; i < status.size(); i++) { + assertTrue("GatewayReceiver creation failed with: " + status.get(i), + status.get(i).indexOf("ERROR:") == -1); + } + } + else { + fail("testCreateGatewayReceiver failed as did not get CommandResult"); + } + + vm3.invoke(() -> verifyReceiverCreationWithAttributes( false, 10000, + 11000, "localhost", 100000, 512000, null )); + vm4.invoke(() -> verifyReceiverCreationWithAttributes( false, 10000, + 11000, "localhost", 100000, 512000, null )); + vm5.invoke(() -> verifyReceiverCreationWithAttributes( false, 10000, + 11000, "localhost", 100000, 512000, null )); + } + + /** + * GatewayReceiver with given attributes on the given group. + * Only 2 of 3 members are part of the group. + */ + @Test + public void testCreateGatewayReceiver_onGroup_Scenario2() { + + VM puneLocator = Host.getLocator(); + int punePort = (Integer)puneLocator.invoke(() -> getLocatorPort()); + + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, "localhost[" + punePort + + "]"); + setUpJmxManagerOnVm0ThenConnect(props); + + Integer nyPort = (Integer)vm2.invoke(() -> createFirstRemoteLocator( 2, punePort )); + + vm3.invoke(() -> createCacheWithGroups( punePort, "receiverGroup1" )); + vm4.invoke(() -> createCacheWithGroups( punePort, "receiverGroup1" )); + vm5.invoke(() -> createCacheWithGroups( punePort, "receiverGroup2" )); + + String command = CliStrings.CREATE_GATEWAYRECEIVER + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__MANUALSTART + "=true" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__BINDADDRESS + "=localhost" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__STARTPORT + "=10000" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__ENDPORT + "=11000" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__MAXTIMEBETWEENPINGS + "=100000" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__SOCKETBUFFERSIZE + "=512000" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__GROUP + "=receiverGroup1"; + CommandResult cmdResult = executeCommand(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testCreateGatewayReceiver stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData resultData = (TabularResultData)cmdResult + .getResultData(); + List<String> status = resultData.retrieveAllValues("Status"); + assertEquals(2, status.size());// + // verify there is no error in the status + for (int i = 0; i < status.size(); i++) { + assertTrue("GatewayReceiver creation failed with: " + status.get(i), + status.get(i).indexOf("ERROR:") == -1); + } + } + else { + fail("testCreateGatewayReceiver failed as did not get CommandResult"); + } + + vm3.invoke(() -> verifyReceiverCreationWithAttributes( false, 10000, + 11000, "localhost", 100000, 512000, null )); + vm4.invoke(() -> verifyReceiverCreationWithAttributes( false, 10000, + 11000, "localhost", 100000, 512000, null )); + } + + /** + * GatewayReceiver with given attributes on multiple groups. + */ + @Test + public void testCreateGatewayReceiver_onMultipleGroups() { + + VM puneLocator = Host.getLocator(); + int punePort = (Integer)puneLocator.invoke(() -> getLocatorPort()); + + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, "localhost[" + punePort + + "]"); + setUpJmxManagerOnVm0ThenConnect(props); + + Integer nyPort = (Integer)vm2.invoke(() -> createFirstRemoteLocator( 2, punePort )); + + vm3.invoke(() -> createCacheWithGroups( punePort, "receiverGroup1" )); + vm4.invoke(() -> createCacheWithGroups( punePort, "receiverGroup1" )); + vm5.invoke(() -> createCacheWithGroups( punePort, "receiverGroup2" )); + + String command = CliStrings.CREATE_GATEWAYRECEIVER + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__MANUALSTART + "=true" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__BINDADDRESS + "=localhost" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__STARTPORT + "=10000" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__ENDPORT + "=11000" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__MAXTIMEBETWEENPINGS + "=100000" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__SOCKETBUFFERSIZE + "=512000" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__GROUP + "=receiverGroup1,receiverGroup2"; + CommandResult cmdResult = executeCommand(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testCreateGatewayReceiver stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData resultData = (TabularResultData)cmdResult + .getResultData(); + List<String> status = resultData.retrieveAllValues("Status"); + assertEquals(3, status.size());// + // verify there is no error in the status + for (int i = 0; i < status.size(); i++) { + assertTrue("GatewayReceiver creation failed with: " + status.get(i), + status.get(i).indexOf("ERROR:") == -1); + } + } + else { + fail("testCreateGatewayReceiver failed as did not get CommandResult"); + } + + vm3.invoke(() -> verifyReceiverCreationWithAttributes( false, 10000, + 11000, "localhost", 100000, 512000, null )); + vm4.invoke(() -> verifyReceiverCreationWithAttributes( false, 10000, + 11000, "localhost", 100000, 512000, null )); + vm5.invoke(() -> verifyReceiverCreationWithAttributes( false, 10000, + 11000, "localhost", 100000, 512000, null )); + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/WanCommandCreateGatewaySenderDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/WanCommandCreateGatewaySenderDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/WanCommandCreateGatewaySenderDUnitTest.java new file mode 100644 index 0000000..c084e76 --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/WanCommandCreateGatewaySenderDUnitTest.java @@ -0,0 +1,706 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.wancommand; + +import com.gemstone.gemfire.cache.wan.GatewaySender; +import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy; +import com.gemstone.gemfire.distributed.DistributedMember; +import com.gemstone.gemfire.internal.cache.wan.GatewaySenderException; +import com.gemstone.gemfire.management.cli.Result; +import com.gemstone.gemfire.management.internal.cli.i18n.CliStrings; +import com.gemstone.gemfire.management.internal.cli.result.CommandResult; +import com.gemstone.gemfire.management.internal.cli.result.TabularResultData; +import com.gemstone.gemfire.test.dunit.IgnoredException; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import static com.gemstone.gemfire.distributed.ConfigurationProperties.*; +import static com.gemstone.gemfire.test.dunit.Assert.*; +import static com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter; + +@Category(DistributedTest.class) +public class WanCommandCreateGatewaySenderDUnitTest extends WANCommandTestBase { + + private static final long serialVersionUID = 1L; + + private CommandResult executeCommandWithIgnoredExceptions(String command) { + final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); + try { + CommandResult commandResult = executeCommand(command); + return commandResult; + } finally { + exln.remove(); + } + } + + + /** + * GatewaySender with all default attributes + */ + @Test + public void testCreateGatewaySenderWithDefault() { + + Integer punePort = (Integer) vm1.invoke(() -> createFirstLocatorWithDSId( 1 )); + + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(DISTRIBUTED_SYSTEM_ID, "1"); + props.setProperty(LOCATORS, "localhost[" + punePort + "]"); + setUpJmxManagerOnVm0ThenConnect(props); + + Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, punePort )); + + vm3.invoke(() -> createCache( punePort )); + vm4.invoke(() -> createCache( punePort )); + vm5.invoke(() -> createCache( punePort )); + + String command = CliStrings.CREATE_GATEWAYSENDER + " --" + + CliStrings.CREATE_GATEWAYSENDER__ID + "=ln" + + " --" + CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID + "=2"; + CommandResult cmdResult = executeCommandWithIgnoredExceptions(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testCreateGatewaySender stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData resultData = (TabularResultData) cmdResult + .getResultData(); + List<String> status = resultData.retrieveAllValues("Status"); + assertEquals(5, status.size()); + for (int i = 0; i < status.size(); i++) { + assertTrue("GatewaySender creation failed with: " + status.get(i), status.get(i).indexOf("ERROR:") == -1); + } + } else { + fail("testCreateGatewaySender failed as did not get CommandResult"); + } + + vm3.invoke(() -> verifySenderState( + "ln", true, false )); + vm4.invoke(() -> verifySenderState( + "ln", true, false )); + vm5.invoke(() -> verifySenderState( + "ln", true, false )); + } + + /** + * GatewaySender with given attribute values + */ + @Test + public void testCreateGatewaySender() { + + Integer punePort = (Integer) vm1.invoke(() -> createFirstLocatorWithDSId( 1 )); + + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(DISTRIBUTED_SYSTEM_ID, "1"); + props.setProperty(LOCATORS, "localhost[" + punePort + "]"); + setUpJmxManagerOnVm0ThenConnect(props); + + Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, punePort )); + + vm3.invoke(() -> createCache( punePort )); + vm4.invoke(() -> createCache( punePort )); + vm5.invoke(() -> createCache( punePort )); + + int socketReadTimeout = GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT+1000; + String command = CliStrings.CREATE_GATEWAYSENDER + " --" + + CliStrings.CREATE_GATEWAYSENDER__ID + "=ln" + + " --" + CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID + "=2" + + " --" + CliStrings.CREATE_GATEWAYSENDER__PARALLEL + "=false" + + " --" + CliStrings.CREATE_GATEWAYSENDER__MANUALSTART + "=true" + + " --" + CliStrings.CREATE_GATEWAYSENDER__SOCKETBUFFERSIZE + "=1000" + + " --" + CliStrings.CREATE_GATEWAYSENDER__SOCKETREADTIMEOUT + "=" + socketReadTimeout + + " --" + CliStrings.CREATE_GATEWAYSENDER__ENABLEBATCHCONFLATION + "=true" + + " --" + CliStrings.CREATE_GATEWAYSENDER__BATCHSIZE + "=1000" + + " --" + CliStrings.CREATE_GATEWAYSENDER__BATCHTIMEINTERVAL + "=5000" + + " --" + CliStrings.CREATE_GATEWAYSENDER__ENABLEPERSISTENCE + "=true" + + " --" + CliStrings.CREATE_GATEWAYSENDER__DISKSYNCHRONOUS + "=false" + + " --" + CliStrings.CREATE_GATEWAYSENDER__MAXQUEUEMEMORY + "=1000" + + " --" + CliStrings.CREATE_GATEWAYSENDER__ALERTTHRESHOLD + "=100" + + " --" + CliStrings.CREATE_GATEWAYSENDER__DISPATCHERTHREADS + "=2" + + " --" + CliStrings.CREATE_GATEWAYSENDER__ORDERPOLICY + "=THREAD"; + CommandResult cmdResult = executeCommandWithIgnoredExceptions(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testCreateGatewaySender stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData resultData = (TabularResultData) cmdResult + .getResultData(); + List<String> status = resultData.retrieveAllValues("Status"); + assertEquals(5, status.size()); + for (int i = 0; i < status.size(); i++) { + assertTrue("GatewaySender creation failed with: " + status.get(i), status.get(i).indexOf("ERROR:") == -1); + } + } else { + fail("testCreateGatewaySender failed as did not get CommandResult"); + } + + vm3.invoke(() -> verifySenderState( + "ln", false, false )); + vm4.invoke(() -> verifySenderState( + "ln", false, false )); + vm5.invoke(() -> verifySenderState( + "ln", false, false )); + + vm3.invoke(() -> verifySenderAttributes( "ln", 2, false, true, 1000, socketReadTimeout, true, 1000, 5000, + true, false, 1000, 100, 2, OrderPolicy.THREAD, null, null )); + vm4.invoke(() -> verifySenderAttributes( "ln", 2, false, true, 1000, socketReadTimeout, true, 1000, 5000, + true, false, 1000, 100, 2, OrderPolicy.THREAD, null, null )); + vm5.invoke(() -> verifySenderAttributes( "ln", 2, false, true, 1000, socketReadTimeout, true, 1000, 5000, + true, false, 1000, 100, 2, OrderPolicy.THREAD, null, null )); + } + + /** + * GatewaySender with given attribute values. + * Error scenario where dispatcher threads is set to more than 1 and + * no order policy provided. + */ + @Test + public void testCreateGatewaySender_Error() { + + Integer punePort = (Integer) vm1.invoke(() -> createFirstLocatorWithDSId( 1 )); + + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(DISTRIBUTED_SYSTEM_ID, "1"); + props.setProperty(LOCATORS, "localhost[" + punePort + "]"); + setUpJmxManagerOnVm0ThenConnect(props); + + Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, punePort )); + + vm3.invoke(() -> createCache( punePort )); + vm4.invoke(() -> createCache( punePort )); + vm5.invoke(() -> createCache( punePort )); + + int socketReadTimeout = GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT+1000; + String command = CliStrings.CREATE_GATEWAYSENDER + " --" + + CliStrings.CREATE_GATEWAYSENDER__ID + "=ln" + + " --" + CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID + "=2" + + " --" + CliStrings.CREATE_GATEWAYSENDER__PARALLEL + "=false" + + " --" + CliStrings.CREATE_GATEWAYSENDER__MANUALSTART + "=true" + + " --" + CliStrings.CREATE_GATEWAYSENDER__SOCKETBUFFERSIZE + "=1000" + + " --" + CliStrings.CREATE_GATEWAYSENDER__SOCKETREADTIMEOUT + "=" + socketReadTimeout + + " --" + CliStrings.CREATE_GATEWAYSENDER__ENABLEBATCHCONFLATION + "=true" + + " --" + CliStrings.CREATE_GATEWAYSENDER__BATCHSIZE + "=1000" + + " --" + CliStrings.CREATE_GATEWAYSENDER__BATCHTIMEINTERVAL + "=5000" + + " --" + CliStrings.CREATE_GATEWAYSENDER__ENABLEPERSISTENCE + "=true" + + " --" + CliStrings.CREATE_GATEWAYSENDER__DISKSYNCHRONOUS + "=false" + + " --" + CliStrings.CREATE_GATEWAYSENDER__MAXQUEUEMEMORY + "=1000" + + " --" + CliStrings.CREATE_GATEWAYSENDER__ALERTTHRESHOLD + "=100" + + " --" + CliStrings.CREATE_GATEWAYSENDER__DISPATCHERTHREADS + "=2"; + CommandResult cmdResult = executeCommandWithIgnoredExceptions(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testCreateGatewaySender stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData resultData = (TabularResultData) cmdResult + .getResultData(); + List<String> status = resultData.retrieveAllValues("Status"); + assertEquals(5, status.size()); + for (int i = 0; i < status.size(); i++) { + assertTrue("GatewaySender creation should fail", status.get(i).indexOf("ERROR:") != -1); + } + } else { + fail("testCreateGatewaySender failed as did not get CommandResult"); + } + + } + + /** + * GatewaySender with given attribute values and event filters. + */ + @Test + public void testCreateGatewaySenderWithGatewayEventFilters() { + + Integer punePort = (Integer) vm1.invoke(() -> createFirstLocatorWithDSId( 1 )); + + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(DISTRIBUTED_SYSTEM_ID, "1"); + props.setProperty(LOCATORS, "localhost[" + punePort + "]"); + setUpJmxManagerOnVm0ThenConnect(props); + + Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, punePort )); + + vm3.invoke(() -> createCache( punePort )); + vm4.invoke(() -> createCache( punePort )); + vm5.invoke(() -> createCache( punePort )); + + int socketReadTimeout = GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT+1000; + String command = CliStrings.CREATE_GATEWAYSENDER + " --" + + CliStrings.CREATE_GATEWAYSENDER__ID + "=ln" + + " --" + CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID + "=2" + + " --" + CliStrings.CREATE_GATEWAYSENDER__PARALLEL + "=false" + + " --" + CliStrings.CREATE_GATEWAYSENDER__MANUALSTART + "=true" + + " --" + CliStrings.CREATE_GATEWAYSENDER__SOCKETBUFFERSIZE + "=1000" + + " --" + CliStrings.CREATE_GATEWAYSENDER__SOCKETREADTIMEOUT + "=" + socketReadTimeout + + " --" + CliStrings.CREATE_GATEWAYSENDER__ENABLEBATCHCONFLATION + "=true" + + " --" + CliStrings.CREATE_GATEWAYSENDER__BATCHSIZE + "=1000" + + " --" + CliStrings.CREATE_GATEWAYSENDER__BATCHTIMEINTERVAL + "=5000" + + " --" + CliStrings.CREATE_GATEWAYSENDER__ENABLEPERSISTENCE + "=true" + + " --" + CliStrings.CREATE_GATEWAYSENDER__DISKSYNCHRONOUS + "=false" + + " --" + CliStrings.CREATE_GATEWAYSENDER__MAXQUEUEMEMORY + "=1000" + + " --" + CliStrings.CREATE_GATEWAYSENDER__ALERTTHRESHOLD + "=100" + + " --" + CliStrings.CREATE_GATEWAYSENDER__DISPATCHERTHREADS + "=2" + + " --" + CliStrings.CREATE_GATEWAYSENDER__ORDERPOLICY + "=THREAD" + + " --" + CliStrings.CREATE_GATEWAYSENDER__GATEWAYEVENTFILTER + + "=com.gemstone.gemfire.cache30.MyGatewayEventFilter1,com.gemstone.gemfire.cache30.MyGatewayEventFilter2"; + CommandResult cmdResult = executeCommandWithIgnoredExceptions(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testCreateGatewaySender stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData resultData = (TabularResultData) cmdResult + .getResultData(); + List<String> status = resultData.retrieveAllValues("Status"); + assertEquals(5, status.size()); + for (int i = 0; i < status.size(); i++) { + assertTrue("GatewaySender creation failed with: " + status.get(i), status.get(i).indexOf("ERROR:") == -1); + } + } else { + fail("testCreateGatewaySender failed as did not get CommandResult"); + } + + vm3.invoke(() -> verifySenderState( + "ln", false, false )); + vm4.invoke(() -> verifySenderState( + "ln", false, false )); + vm5.invoke(() -> verifySenderState( + "ln", false, false )); + + List<String> eventFilters = new ArrayList<String>(); + eventFilters.add("com.gemstone.gemfire.cache30.MyGatewayEventFilter1"); + eventFilters.add("com.gemstone.gemfire.cache30.MyGatewayEventFilter2"); + vm3.invoke(() -> verifySenderAttributes( "ln", 2, false, true, 1000, socketReadTimeout, true, 1000, 5000, + true, false, 1000, 100, 2, OrderPolicy.THREAD, eventFilters, null )); + vm4.invoke(() -> verifySenderAttributes( "ln", 2, false, true, 1000, socketReadTimeout, true, 1000, 5000, + true, false, 1000, 100, 2, OrderPolicy.THREAD, eventFilters, null )); + vm5.invoke(() -> verifySenderAttributes( "ln", 2, false, true, 1000, socketReadTimeout, true, 1000, 5000, + true, false, 1000, 100, 2, OrderPolicy.THREAD, eventFilters, null )); + } + + /** + * GatewaySender with given attribute values and transport filters. + */ + @Test + public void testCreateGatewaySenderWithGatewayTransportFilters() { + + Integer punePort = (Integer) vm1.invoke(() ->createFirstLocatorWithDSId( 1 )); + + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(DISTRIBUTED_SYSTEM_ID, "1"); + props.setProperty(LOCATORS, "localhost[" + punePort + "]"); + setUpJmxManagerOnVm0ThenConnect(props); + + Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, punePort )); + + vm3.invoke(() -> createCache( punePort )); + vm4.invoke(() -> createCache( punePort )); + vm5.invoke(() -> createCache( punePort )); + + int socketReadTimeout = GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT+1000; + String command = CliStrings.CREATE_GATEWAYSENDER + " --" + + CliStrings.CREATE_GATEWAYSENDER__ID + "=ln" + + " --" + CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID + "=2" + + " --" + CliStrings.CREATE_GATEWAYSENDER__PARALLEL + "=false" + + " --" + CliStrings.CREATE_GATEWAYSENDER__MANUALSTART + "=true" + + " --" + CliStrings.CREATE_GATEWAYSENDER__SOCKETBUFFERSIZE + "=1000" + + " --" + CliStrings.CREATE_GATEWAYSENDER__SOCKETREADTIMEOUT + "=" + socketReadTimeout + + " --" + CliStrings.CREATE_GATEWAYSENDER__ENABLEBATCHCONFLATION + "=true" + + " --" + CliStrings.CREATE_GATEWAYSENDER__BATCHSIZE + "=1000" + + " --" + CliStrings.CREATE_GATEWAYSENDER__BATCHTIMEINTERVAL + "=5000" + + " --" + CliStrings.CREATE_GATEWAYSENDER__ENABLEPERSISTENCE + "=true" + + " --" + CliStrings.CREATE_GATEWAYSENDER__DISKSYNCHRONOUS + "=false" + + " --" + CliStrings.CREATE_GATEWAYSENDER__MAXQUEUEMEMORY + "=1000" + + " --" + CliStrings.CREATE_GATEWAYSENDER__ALERTTHRESHOLD + "=100" + + " --" + CliStrings.CREATE_GATEWAYSENDER__DISPATCHERTHREADS + "=2" + + " --" + CliStrings.CREATE_GATEWAYSENDER__ORDERPOLICY + "=THREAD" + + " --" + CliStrings.CREATE_GATEWAYSENDER__GATEWAYTRANSPORTFILTER + "=com.gemstone.gemfire.cache30.MyGatewayTransportFilter1"; + CommandResult cmdResult = executeCommandWithIgnoredExceptions(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testCreateGatewaySender stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData resultData = (TabularResultData) cmdResult + .getResultData(); + List<String> status = resultData.retrieveAllValues("Status"); + assertEquals(5, status.size()); + for (int i = 0; i < status.size(); i++) { + assertTrue("GatewaySender creation failed with: " + status.get(i), status.get(i).indexOf("ERROR:") == -1); + } + } else { + fail("testCreateGatewaySender failed as did not get CommandResult"); + } + + vm3.invoke(() -> verifySenderState( + "ln", false, false )); + vm4.invoke(() -> verifySenderState( + "ln", false, false )); + vm5.invoke(() -> verifySenderState( + "ln", false, false )); + + List<String> transportFilters = new ArrayList<String>(); + transportFilters.add("com.gemstone.gemfire.cache30.MyGatewayTransportFilter1"); + vm3.invoke(() -> verifySenderAttributes( "ln", 2, false, true, 1000, socketReadTimeout, true, 1000, 5000, + true, false, 1000, 100, 2, OrderPolicy.THREAD, null, transportFilters )); + vm4.invoke(() -> verifySenderAttributes( "ln", 2, false, true, 1000, socketReadTimeout, true, 1000, 5000, + true, false, 1000, 100, 2, OrderPolicy.THREAD, null, transportFilters )); + vm5.invoke(() -> verifySenderAttributes( "ln", 2, false, true, 1000, socketReadTimeout, true, 1000, 5000, + true, false, 1000, 100, 2, OrderPolicy.THREAD, null, transportFilters )); + } + + /** + * GatewaySender with given attribute values on given member. + */ + @Test + public void testCreateGatewaySender_OnMember() { + + Integer punePort = (Integer) vm1.invoke(() -> createFirstLocatorWithDSId( 1 )); + + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(DISTRIBUTED_SYSTEM_ID, "1"); + props.setProperty(LOCATORS, "localhost[" + punePort + "]"); + setUpJmxManagerOnVm0ThenConnect(props); + + Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, punePort )); + + vm3.invoke(() -> createCache( punePort )); + vm4.invoke(() -> createCache( punePort )); + vm5.invoke(() -> createCache( punePort )); + + final DistributedMember vm3Member = (DistributedMember) vm3.invoke(() -> getMember()); + + int socketReadTimeout = GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT+1000; + String command = CliStrings.CREATE_GATEWAYSENDER + " --" + + CliStrings.CREATE_GATEWAYSENDER__ID + "=ln" + + " --" + CliStrings.CREATE_GATEWAYSENDER__MEMBER + "=" + vm3Member.getId() + + " --" + CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID + "=2" + + " --" + CliStrings.CREATE_GATEWAYSENDER__PARALLEL + "=false" + + " --" + CliStrings.CREATE_GATEWAYSENDER__MANUALSTART + "=true" + + " --" + CliStrings.CREATE_GATEWAYSENDER__SOCKETBUFFERSIZE + "=1000" + + " --" + CliStrings.CREATE_GATEWAYSENDER__SOCKETREADTIMEOUT + "=" + socketReadTimeout + + " --" + CliStrings.CREATE_GATEWAYSENDER__ENABLEBATCHCONFLATION + "=true" + + " --" + CliStrings.CREATE_GATEWAYSENDER__BATCHSIZE + "=1000" + + " --" + CliStrings.CREATE_GATEWAYSENDER__BATCHTIMEINTERVAL + "=5000" + + " --" + CliStrings.CREATE_GATEWAYSENDER__ENABLEPERSISTENCE + "=true" + + " --" + CliStrings.CREATE_GATEWAYSENDER__DISKSYNCHRONOUS + "=false" + + " --" + CliStrings.CREATE_GATEWAYSENDER__MAXQUEUEMEMORY + "=1000" + + " --" + CliStrings.CREATE_GATEWAYSENDER__ALERTTHRESHOLD + "=100" + + " --" + CliStrings.CREATE_GATEWAYSENDER__DISPATCHERTHREADS + "=2" + + " --" + CliStrings.CREATE_GATEWAYSENDER__ORDERPOLICY + "=THREAD"; + CommandResult cmdResult = executeCommandWithIgnoredExceptions(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testCreateGatewaySender stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData resultData = (TabularResultData) cmdResult + .getResultData(); + List<String> status = resultData.retrieveAllValues("Status"); + assertEquals(1, status.size()); + for (int i = 0; i < status.size(); i++) { + assertTrue("GatewaySender creation failed with: " + status.get(i), status.get(i).indexOf("ERROR:") == -1); + } + } else { + fail("testCreateGatewaySender failed as did not get CommandResult"); + } + + vm3.invoke(() -> verifySenderState( + "ln", false, false )); + + vm3.invoke(() -> verifySenderAttributes( "ln", 2, false, true, 1000, socketReadTimeout, true, 1000, 5000, + true, false, 1000, 100, 2, OrderPolicy.THREAD, null, null )); + } + + /** + * GatewaySender with given attribute values on given group + */ + @Test + public void testCreateGatewaySender_Group() { + + Integer punePort = (Integer) vm1.invoke(() -> createFirstLocatorWithDSId( 1 )); + + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(DISTRIBUTED_SYSTEM_ID, "1"); + props.setProperty(LOCATORS, "localhost[" + punePort + "]"); + setUpJmxManagerOnVm0ThenConnect(props); + + Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, punePort )); + + vm3.invoke(() -> createCacheWithGroups( punePort, "SenderGroup1" )); + vm4.invoke(() -> createCacheWithGroups( punePort, "SenderGroup1" )); + vm5.invoke(() -> createCacheWithGroups( punePort, "SenderGroup1" )); + + int socketReadTimeout = GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT+1000; + String command = CliStrings.CREATE_GATEWAYSENDER + " --" + + CliStrings.CREATE_GATEWAYSENDER__ID + "=ln" + + " --" + CliStrings.CREATE_GATEWAYSENDER__GROUP + "=SenderGroup1" + + " --" + CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID + "=2" + + " --" + CliStrings.CREATE_GATEWAYSENDER__PARALLEL + "=false" + + " --" + CliStrings.CREATE_GATEWAYSENDER__MANUALSTART + "=false" + + " --" + CliStrings.CREATE_GATEWAYSENDER__SOCKETBUFFERSIZE + "=1000" + + " --" + CliStrings.CREATE_GATEWAYSENDER__SOCKETREADTIMEOUT + "=" + socketReadTimeout + + " --" + CliStrings.CREATE_GATEWAYSENDER__ENABLEBATCHCONFLATION + "=true" + + " --" + CliStrings.CREATE_GATEWAYSENDER__BATCHSIZE + "=1000" + + " --" + CliStrings.CREATE_GATEWAYSENDER__BATCHTIMEINTERVAL + "=5000" + + " --" + CliStrings.CREATE_GATEWAYSENDER__ENABLEPERSISTENCE + "=true" + + " --" + CliStrings.CREATE_GATEWAYSENDER__DISKSYNCHRONOUS + "=false" + + " --" + CliStrings.CREATE_GATEWAYSENDER__MAXQUEUEMEMORY + "=1000" + + " --" + CliStrings.CREATE_GATEWAYSENDER__ALERTTHRESHOLD + "=100" + + " --" + CliStrings.CREATE_GATEWAYSENDER__DISPATCHERTHREADS + "=2" + + " --" + CliStrings.CREATE_GATEWAYSENDER__ORDERPOLICY + "=THREAD"; + CommandResult cmdResult = executeCommandWithIgnoredExceptions(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testCreateGatewaySender stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData resultData = (TabularResultData) cmdResult + .getResultData(); + List<String> status = resultData.retrieveAllValues("Status"); + assertEquals(3, status.size()); + for (int i = 0; i < status.size(); i++) { + assertTrue("GatewaySender creation failed with: " + status.get(i), status.get(i).indexOf("ERROR:") == -1); + } + } else { + fail("testCreateGatewaySender failed as did not get CommandResult"); + } + + vm3.invoke(() -> verifySenderState( + "ln", true, false )); + vm4.invoke(() -> verifySenderState( + "ln", true, false )); + vm5.invoke(() -> verifySenderState( + "ln", true, false )); + } + + /** + * GatewaySender with given attribute values on given group. + * Only 2 of 3 members are part of the group. + */ + @Test + public void testCreateGatewaySender_Group_Scenario2() { + + Integer punePort = (Integer) vm1.invoke(() -> createFirstLocatorWithDSId( 1 )); + + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(DISTRIBUTED_SYSTEM_ID, "1"); + props.setProperty(LOCATORS, "localhost[" + punePort + "]"); + setUpJmxManagerOnVm0ThenConnect(props); + + Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, punePort )); + + vm3.invoke(() -> createCacheWithGroups( punePort, "SenderGroup1" )); + vm4.invoke(() -> createCacheWithGroups( punePort, "SenderGroup1" )); + vm5.invoke(() -> createCacheWithGroups( punePort, "SenderGroup2" )); + + int socketReadTimeout = GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT+1000; + String command = CliStrings.CREATE_GATEWAYSENDER + " --" + + CliStrings.CREATE_GATEWAYSENDER__ID + "=ln" + + " --" + CliStrings.CREATE_GATEWAYSENDER__GROUP + "=SenderGroup1" + + " --" + CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID + "=2" + + " --" + CliStrings.CREATE_GATEWAYSENDER__PARALLEL + "=false" + + " --" + CliStrings.CREATE_GATEWAYSENDER__MANUALSTART + "=false" + + " --" + CliStrings.CREATE_GATEWAYSENDER__SOCKETBUFFERSIZE + "=1000" + + " --" + CliStrings.CREATE_GATEWAYSENDER__SOCKETREADTIMEOUT + "=" + socketReadTimeout + + " --" + CliStrings.CREATE_GATEWAYSENDER__ENABLEBATCHCONFLATION + "=true" + + " --" + CliStrings.CREATE_GATEWAYSENDER__BATCHSIZE + "=1000" + + " --" + CliStrings.CREATE_GATEWAYSENDER__BATCHTIMEINTERVAL + "=5000" + + " --" + CliStrings.CREATE_GATEWAYSENDER__ENABLEPERSISTENCE + "=true" + + " --" + CliStrings.CREATE_GATEWAYSENDER__DISKSYNCHRONOUS + "=false" + + " --" + CliStrings.CREATE_GATEWAYSENDER__MAXQUEUEMEMORY + "=1000" + + " --" + CliStrings.CREATE_GATEWAYSENDER__ALERTTHRESHOLD + "=100" + + " --" + CliStrings.CREATE_GATEWAYSENDER__DISPATCHERTHREADS + "=2" + + " --" + CliStrings.CREATE_GATEWAYSENDER__ORDERPOLICY + "=THREAD"; + CommandResult cmdResult = executeCommandWithIgnoredExceptions(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testCreateGatewaySender stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData resultData = (TabularResultData) cmdResult + .getResultData(); + List<String> status = resultData.retrieveAllValues("Status"); + assertEquals(2, status.size()); + for (int i = 0; i < status.size(); i++) { + assertTrue("GatewaySender creation failed with: " + status.get(i), status.get(i).indexOf("ERROR:") == -1); + } + } else { + fail("testCreateGatewaySender failed as did not get CommandResult"); + } + + vm3.invoke(() -> verifySenderState( + "ln", true, false )); + vm4.invoke(() -> verifySenderState( + "ln", true, false )); + } + + /** + * Parallel GatewaySender with given attribute values + */ + @Test + public void testCreateParallelGatewaySender() { + + Integer punePort = (Integer) vm1.invoke(() -> createFirstLocatorWithDSId( 1 )); + + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(DISTRIBUTED_SYSTEM_ID, "1"); + props.setProperty(LOCATORS, "localhost[" + punePort + "]"); + setUpJmxManagerOnVm0ThenConnect(props); + + Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, punePort )); + + vm3.invoke(() -> createCache( punePort )); + vm4.invoke(() -> createCache( punePort )); + vm5.invoke(() -> createCache( punePort )); + + int socketReadTimeout = GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT+1000; + String command = CliStrings.CREATE_GATEWAYSENDER + " --" + + CliStrings.CREATE_GATEWAYSENDER__ID + "=ln" + + " --" + CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID + "=2" + + " --" + CliStrings.CREATE_GATEWAYSENDER__PARALLEL + "=true" + + " --" + CliStrings.CREATE_GATEWAYSENDER__MANUALSTART + "=true" + + " --" + CliStrings.CREATE_GATEWAYSENDER__SOCKETBUFFERSIZE + "=1000" + + " --" + CliStrings.CREATE_GATEWAYSENDER__SOCKETREADTIMEOUT + "=" + socketReadTimeout + + " --" + CliStrings.CREATE_GATEWAYSENDER__ENABLEBATCHCONFLATION + "=true" + + " --" + CliStrings.CREATE_GATEWAYSENDER__BATCHSIZE + "=1000" + + " --" + CliStrings.CREATE_GATEWAYSENDER__BATCHTIMEINTERVAL + "=5000" + + " --" + CliStrings.CREATE_GATEWAYSENDER__ENABLEPERSISTENCE + "=true" + + " --" + CliStrings.CREATE_GATEWAYSENDER__DISKSYNCHRONOUS + "=false" + + " --" + CliStrings.CREATE_GATEWAYSENDER__MAXQUEUEMEMORY + "=1000" + + " --" + CliStrings.CREATE_GATEWAYSENDER__ALERTTHRESHOLD + "=100"; + CommandResult cmdResult = executeCommandWithIgnoredExceptions(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testCreateGatewaySender stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData resultData = (TabularResultData) cmdResult + .getResultData(); + List<String> status = resultData.retrieveAllValues("Status"); + assertEquals(5, status.size()); + for (int i = 0; i < status.size(); i++) { + assertTrue("GatewaySender creation failed with: " + status.get(i), status.get(i).indexOf("ERROR:") == -1); + } + } else { + fail("testCreateGatewaySender failed as did not get CommandResult"); + } + + vm3.invoke(() -> verifySenderState( + "ln", false, false )); + vm4.invoke(() -> verifySenderState( + "ln", false, false )); + vm5.invoke(() -> verifySenderState( + "ln", false, false )); + + vm3.invoke(() -> verifySenderAttributes( "ln", 2, true, true, 1000, socketReadTimeout, true, 1000, 5000, + true, false, 1000, 100, GatewaySender.DEFAULT_DISPATCHER_THREADS, null, null, null )); + vm4.invoke(() -> verifySenderAttributes( "ln", 2, true, true, 1000, socketReadTimeout, true, 1000, 5000, + true, false, 1000, 100, GatewaySender.DEFAULT_DISPATCHER_THREADS, null, null, null )); + vm5.invoke(() -> verifySenderAttributes( "ln", 2, true, true, 1000, socketReadTimeout, true, 1000, 5000, + true, false, 1000, 100, GatewaySender.DEFAULT_DISPATCHER_THREADS, null, null, null )); + } + + /** + * Parallel GatewaySender with given attribute values. + * Provide dispatcherThreads as 2 which is not valid for Parallel sender. + */ + @Test + public void testCreateParallelGatewaySender_Error() { + + Integer punePort = (Integer) vm1.invoke(() -> createFirstLocatorWithDSId( 1 )); + + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(DISTRIBUTED_SYSTEM_ID, "1"); + props.setProperty(LOCATORS, "localhost[" + punePort + + "]"); + setUpJmxManagerOnVm0ThenConnect(props); + + Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, punePort )); + + vm3.invoke(() -> createCache( punePort )); + vm4.invoke(() -> createCache( punePort )); + vm5.invoke(() -> createCache( punePort )); + + int socketReadTimeout = GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT+1000; + String command = CliStrings.CREATE_GATEWAYSENDER + " --" + + CliStrings.CREATE_GATEWAYSENDER__ID + "=ln" + " --" + + CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID + "=2" + + " --" + CliStrings.CREATE_GATEWAYSENDER__PARALLEL + "=true" + " --" + + CliStrings.CREATE_GATEWAYSENDER__MANUALSTART + "=true" + " --" + + CliStrings.CREATE_GATEWAYSENDER__SOCKETBUFFERSIZE + "=1000" + " --" + + CliStrings.CREATE_GATEWAYSENDER__SOCKETREADTIMEOUT + "=" + socketReadTimeout + " --" + + CliStrings.CREATE_GATEWAYSENDER__ENABLEBATCHCONFLATION + "=true" + + " --" + CliStrings.CREATE_GATEWAYSENDER__BATCHSIZE + "=1000" + " --" + + CliStrings.CREATE_GATEWAYSENDER__BATCHTIMEINTERVAL + "=5000" + " --" + + CliStrings.CREATE_GATEWAYSENDER__ENABLEPERSISTENCE + "=true" + " --" + + CliStrings.CREATE_GATEWAYSENDER__DISKSYNCHRONOUS + "=false" + " --" + + CliStrings.CREATE_GATEWAYSENDER__MAXQUEUEMEMORY + "=1000" + " --" + + CliStrings.CREATE_GATEWAYSENDER__ALERTTHRESHOLD + "=100" + " --" + + CliStrings.CREATE_GATEWAYSENDER__DISPATCHERTHREADS + "=2" + " --" + + CliStrings.CREATE_GATEWAYSENDER__ORDERPOLICY + "=THREAD"; + IgnoredException exp = IgnoredException.addIgnoredException(GatewaySenderException.class + .getName()); + try { + CommandResult cmdResult = executeCommandWithIgnoredExceptions(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testCreateGatewaySender stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData resultData = (TabularResultData) cmdResult + .getResultData(); + List<String> status = resultData.retrieveAllValues("Status"); + assertEquals(5, status.size()); + for (int i = 0; i < status.size(); i++) { + assertTrue("GatewaySender creation should have failed", status.get(i) + .indexOf("ERROR:") != -1); + } + } else { + fail("testCreateGatewaySender failed as did not get CommandResult"); + } + } finally { + exp.remove(); + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/WanCommandGatewayReceiverStartDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/WanCommandGatewayReceiverStartDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/WanCommandGatewayReceiverStartDUnitTest.java new file mode 100644 index 0000000..23ca687 --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/WanCommandGatewayReceiverStartDUnitTest.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.wancommand; + +import com.gemstone.gemfire.distributed.DistributedMember; +import com.gemstone.gemfire.management.cli.Result; +import com.gemstone.gemfire.management.internal.cli.i18n.CliStrings; +import com.gemstone.gemfire.management.internal.cli.result.CommandResult; +import com.gemstone.gemfire.management.internal.cli.result.TabularResultData; +import com.gemstone.gemfire.test.dunit.Host; +import com.gemstone.gemfire.test.dunit.VM; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.List; +import java.util.Properties; + +import static com.gemstone.gemfire.distributed.ConfigurationProperties.LOCATORS; +import static com.gemstone.gemfire.distributed.ConfigurationProperties.MCAST_PORT; +import static com.gemstone.gemfire.test.dunit.Assert.*; +import static com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter; +import static com.gemstone.gemfire.test.dunit.Wait.pause; + +@Category(DistributedTest.class) +public class WanCommandGatewayReceiverStartDUnitTest extends WANCommandTestBase { + + private static final long serialVersionUID = 1L; + + /** + * Test wan commands for error in input 1> start gateway-sender command needs + * only one of member or group. + */ + @Test + public void testStartGatewayReceiver_ErrorConditions() { + + VM puneLocator = Host.getLocator(); + int punePort = (Integer) puneLocator.invoke(() -> getLocatorPort()); + + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, "localhost[" + punePort + "]"); + setUpJmxManagerOnVm0ThenConnect(props); + + Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, punePort )); + + vm3.invoke(() -> createReceiver( punePort )); + + final DistributedMember vm1Member = (DistributedMember) vm3.invoke(() -> getMember()); + + String command = CliStrings.START_GATEWAYRECEIVER + " --" + + CliStrings.START_GATEWAYRECEIVER__MEMBER + "=" + vm1Member.getId() + " --" + + CliStrings.START_GATEWAYRECEIVER__GROUP + "=RG1"; + + CommandResult cmdResult = executeCommand(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testStartGatewayReceiver_ErrorConditions stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.ERROR, cmdResult.getStatus()); + assertTrue(strCmdResult.contains(CliStrings.PROVIDE_EITHER_MEMBER_OR_GROUP_MESSAGE)); + } else { + fail("testStartGatewayReceiver_ErrorConditions failed as did not get CommandResult"); + } + } + + @Test + public void testStartGatewayReceiver() { + + VM puneLocator = Host.getLocator(); + int punePort = (Integer) puneLocator.invoke(() -> getLocatorPort()); + + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, "localhost[" + punePort + "]"); + setUpJmxManagerOnVm0ThenConnect(props); + + Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, punePort )); + + vm3.invoke(() -> createReceiver( punePort )); + vm4.invoke(() -> createReceiver( punePort )); + vm5.invoke(() -> createReceiver( punePort )); + + vm3.invoke(() -> verifyReceiverState( false )); + vm4.invoke(() -> verifyReceiverState( false )); + vm5.invoke(() -> verifyReceiverState( false )); + + pause(10000); + String command = CliStrings.START_GATEWAYRECEIVER; + CommandResult cmdResult = executeCommand(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testStartGatewayReceiver stringResult : " + strCmdResult + ">>>>"); + + TabularResultData resultData = (TabularResultData) cmdResult.getResultData(); + List<String> status = resultData.retrieveAllValues("Result"); + assertEquals(4, status.size()); + assertTrue(status.contains("Error")); + } else { + fail("testStartGatewayReceiver failed as did not get CommandResult"); + } + + vm3.invoke(() -> verifyReceiverState( true )); + vm4.invoke(() -> verifyReceiverState( true )); + vm5.invoke(() -> verifyReceiverState( true )); + } + + /** + * test to validate that the start gateway sender starts the gateway sender on + * a member + */ + @Test + public void testStartGatewayReceiver_onMember() { + + VM puneLocator = Host.getLocator(); + int punePort = (Integer) puneLocator.invoke(() -> getLocatorPort()); + + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, "localhost[" + punePort + "]"); + setUpJmxManagerOnVm0ThenConnect(props); + + Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, punePort )); + + vm3.invoke(() -> createReceiver( punePort )); + vm4.invoke(() -> createReceiver( punePort )); + vm5.invoke(() -> createReceiver( punePort )); + + vm3.invoke(() -> verifyReceiverState( false )); + vm4.invoke(() -> verifyReceiverState( false )); + vm5.invoke(() -> verifyReceiverState( false )); + + final DistributedMember vm1Member = (DistributedMember) vm3.invoke(() -> getMember()); + pause(10000); + String command = CliStrings.START_GATEWAYRECEIVER + " --" + + CliStrings.START_GATEWAYRECEIVER__MEMBER+ "=" + vm1Member.getId(); + + CommandResult cmdResult = executeCommand(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testStartGatewayReceiver_onMember stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + assertTrue(strCmdResult.contains("is started on member")); + } else { + fail("testStartGatewayReceiver failed as did not get CommandResult"); + } + + vm3.invoke(() -> verifyReceiverState( true )); + vm4.invoke(() -> verifyReceiverState( false )); + vm5.invoke(() -> verifyReceiverState( false )); + } + + /** + * test to validate that the start gateway sender starts the gateway sender on + * a group of members + */ + @Test + public void testStartGatewayReceiver_Group() { + + VM puneLocator = Host.getLocator(); + int punePort = (Integer) puneLocator.invoke(() -> getLocatorPort()); + + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, "localhost[" + punePort + "]"); + setUpJmxManagerOnVm0ThenConnect(props); + + Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, punePort )); + + vm3.invoke(() -> createReceiverWithGroup( punePort, "RG1" )); + vm4.invoke(() -> createReceiverWithGroup( punePort, "RG1" )); + vm5.invoke(() -> createReceiverWithGroup( punePort, "RG1" )); + + vm3.invoke(() -> verifyReceiverState( false )); + vm4.invoke(() -> verifyReceiverState( false )); + vm5.invoke(() -> verifyReceiverState( false )); + + pause(10000); + String command = CliStrings.START_GATEWAYRECEIVER + " --" + + CliStrings.START_GATEWAYRECEIVER__GROUP + "=RG1"; + CommandResult cmdResult = executeCommand(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testStartGatewayReceiver_Group stringResult : " + strCmdResult + + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData resultData = (TabularResultData) cmdResult.getResultData(); + List<String> status = resultData.retrieveAllValues("Result"); + assertEquals(3, status.size()); + assertFalse(status.contains("Error")); + assertTrue(status.contains("OK")); + } else { + fail("testStartGatewayReceiver_Group failed as did not get CommandResult"); + } + + vm3.invoke(() -> verifyReceiverState( true )); + vm4.invoke(() -> verifyReceiverState( true )); + vm5.invoke(() -> verifyReceiverState( true )); + } + + /** + * Test to validate the scenario gateway sender is started when one or more + * sender members belongs to multiple groups + * + */ + @Test + public void testStartGatewayReceiver_MultipleGroup() { + + VM puneLocator = Host.getLocator(); + int punePort = (Integer) puneLocator.invoke(() -> getLocatorPort()); + + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, "localhost[" + punePort + "]"); + setUpJmxManagerOnVm0ThenConnect(props); + + Integer nyPort = (Integer) vm2.invoke(() -> createFirstRemoteLocator( 2, punePort )); + + vm3.invoke(() -> createReceiverWithGroup( punePort, "RG1" )); + vm4.invoke(() -> createReceiverWithGroup( punePort, "RG1" )); + vm5.invoke(() -> createReceiverWithGroup( punePort, "RG1, RG2" )); + vm6.invoke(() -> createReceiverWithGroup( punePort, "RG1, RG2" )); + vm7.invoke(() -> createReceiverWithGroup( punePort, "RG3" )); + + vm3.invoke(() -> verifyReceiverState( false )); + vm4.invoke(() -> verifyReceiverState( false )); + vm5.invoke(() -> verifyReceiverState( false )); + vm6.invoke(() -> verifyReceiverState( false )); + vm7.invoke(() -> verifyReceiverState( false )); + + pause(10000); + String command = CliStrings.START_GATEWAYRECEIVER + " --" + + CliStrings.START_GATEWAYRECEIVER__GROUP + "=RG1,RG2"; + CommandResult cmdResult = executeCommand(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testStartGatewayReceiver_Group stringResult : " + strCmdResult + + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData resultData = (TabularResultData) cmdResult.getResultData(); + List<String> status = resultData.retrieveAllValues("Result"); + assertEquals(4, status.size()); + assertFalse(status.contains("Error")); + assertTrue(status.contains("OK")); + } else { + fail("testStartGatewayReceiver failed as did not get CommandResult"); + } + + vm3.invoke(() -> verifyReceiverState( true )); + vm4.invoke(() -> verifyReceiverState( true)); + vm5.invoke(() -> verifyReceiverState( true )); + vm6.invoke(() -> verifyReceiverState( true )); + vm7.invoke(() -> verifyReceiverState( false )); + } + +}
