http://git-wip-us.apache.org/repos/asf/geode/blob/e1befef4/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/CreateAndDestroyGatewaySenderCommandsDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/CreateAndDestroyGatewaySenderCommandsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/CreateAndDestroyGatewaySenderCommandsDUnitTest.java new file mode 100644 index 0000000..ab0a355 --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/CreateAndDestroyGatewaySenderCommandsDUnitTest.java @@ -0,0 +1,532 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.wan.wancommand; + +import static org.apache.geode.test.dunit.Assert.assertEquals; +import static org.apache.geode.test.dunit.Assert.assertTrue; +import static org.apache.geode.test.dunit.Assert.fail; +import static org.apache.geode.test.dunit.LogWriterUtils.getLogWriter; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.cache.wan.GatewaySender; +import org.apache.geode.cache.wan.GatewaySender.OrderPolicy; +import org.apache.geode.distributed.DistributedMember; +import org.apache.geode.management.cli.Result; +import org.apache.geode.management.internal.cli.i18n.CliStrings; +import org.apache.geode.management.internal.cli.result.CommandResult; +import org.apache.geode.management.internal.cli.result.TabularResultData; +import org.apache.geode.test.dunit.IgnoredException; +import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.junit.categories.DistributedTest; + +@Category(DistributedTest.class) +public class CreateAndDestroyGatewaySenderCommandsDUnitTest extends WANCommandTestBase { + /** + * GatewaySender with all default attributes + */ + @Test + public void testCreateDestroyGatewaySenderWithDefault() { + Integer punePort = vm1.invoke(() -> createFirstLocatorWithDSId(1)); + propsSetUp(punePort); + + vm2.invoke(() -> createFirstRemoteLocator(2, punePort)); + vm3.invoke(() -> createCache(punePort)); + vm4.invoke(() -> createCache(punePort)); + vm5.invoke(() -> createCache(punePort)); + + String command = CliStrings.CREATE_GATEWAYSENDER + " --" + CliStrings.CREATE_GATEWAYSENDER__ID + + "=ln" + " --" + CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID + "=2"; + CommandResult cmdResult = executeCommandWithIgnoredExceptions(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testCreateDestroyGatewaySenderWithDefault stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData resultData = (TabularResultData) cmdResult.getResultData(); + List<String> status = resultData.retrieveAllValues("Status"); + assertEquals(5, status.size()); + for (String stat : status) { + assertTrue("GatewaySender creation failed with: " + stat, !stat.contains("ERROR:")); + } + } else { + fail("testCreateDestroyGatewaySenderWithDefault failed as did not get CommandResult"); + } + vm3.invoke(() -> verifySenderState("ln", true, false)); + vm4.invoke(() -> verifySenderState("ln", true, false)); + vm5.invoke(() -> verifySenderState("ln", true, false)); + doDestroyAndVerifyGatewaySender("ln", null, null, "testCreateDestroyGatewaySenderWithDefault", + Arrays.asList(vm3, vm4, vm5), 5, false); + } + + /** + * + * GatewaySender with given attribute values + + */ + @Test + public void testCreateDestroyGatewaySender() { + Integer punePort = vm1.invoke(() -> createFirstLocatorWithDSId(1)); + propsSetUp(punePort); + + vm2.invoke(() -> createFirstRemoteLocator(2, punePort)); + vm3.invoke(() -> createCache(punePort)); + vm4.invoke(() -> createCache(punePort)); + vm5.invoke(() -> createCache(punePort)); + + int socketReadTimeout = GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT + 1000; + String command = CliStrings.CREATE_GATEWAYSENDER + " --" + CliStrings.CREATE_GATEWAYSENDER__ID + + "=ln" + " --" + CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID + "=2" + " --" + + CliStrings.CREATE_GATEWAYSENDER__PARALLEL + "=false" + " --" + + CliStrings.CREATE_GATEWAYSENDER__MANUALSTART + "=true" + " --" + + CliStrings.CREATE_GATEWAYSENDER__SOCKETBUFFERSIZE + "=1000" + " --" + + CliStrings.CREATE_GATEWAYSENDER__SOCKETREADTIMEOUT + "=" + socketReadTimeout + " --" + + CliStrings.CREATE_GATEWAYSENDER__ENABLEBATCHCONFLATION + "=true" + " --" + + CliStrings.CREATE_GATEWAYSENDER__BATCHSIZE + "=1000" + " --" + + CliStrings.CREATE_GATEWAYSENDER__BATCHTIMEINTERVAL + "=5000" + " --" + + CliStrings.CREATE_GATEWAYSENDER__ENABLEPERSISTENCE + "=true" + " --" + + CliStrings.CREATE_GATEWAYSENDER__DISKSYNCHRONOUS + "=false" + " --" + + CliStrings.CREATE_GATEWAYSENDER__MAXQUEUEMEMORY + "=1000" + " --" + + CliStrings.CREATE_GATEWAYSENDER__ALERTTHRESHOLD + "=100" + " --" + + CliStrings.CREATE_GATEWAYSENDER__DISPATCHERTHREADS + "=2" + " --" + + CliStrings.CREATE_GATEWAYSENDER__ORDERPOLICY + "=THREAD"; + CommandResult cmdResult = executeCommandWithIgnoredExceptions(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info("testCreateDestroyGatewaySender stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData resultData = (TabularResultData) cmdResult.getResultData(); + List<String> status = resultData.retrieveAllValues("Status"); + assertEquals(5, status.size()); + for (String stat : status) { + assertTrue("GatewaySender creation failed with: " + stat, !stat.contains("ERROR:")); + } + } else { + fail("testCreateDestroyGatewaySender failed as did not get CommandResult"); + } + vm3.invoke(() -> verifySenderState("ln", false, false)); + vm4.invoke(() -> verifySenderState("ln", false, false)); + vm5.invoke(() -> verifySenderState("ln", false, false)); + vm3.invoke(() -> verifySenderAttributes("ln", 2, false, true, 1000, socketReadTimeout, true, + 1000, 5000, true, false, 1000, 100, 2, OrderPolicy.THREAD, null, null)); + vm4.invoke(() -> verifySenderAttributes("ln", 2, false, true, 1000, socketReadTimeout, true, + 1000, 5000, true, false, 1000, 100, 2, OrderPolicy.THREAD, null, null)); + vm5.invoke(() -> verifySenderAttributes("ln", 2, false, true, 1000, socketReadTimeout, true, + 1000, 5000, true, false, 1000, 100, 2, OrderPolicy.THREAD, null, null)); + doDestroyAndVerifyGatewaySender("ln", null, null, "testCreateDestroyGatewaySender", + Arrays.asList(vm3, vm4, vm5), 5, false); + } + + /** + * GatewaySender with given attribute values and event filters. + */ + @Test + public void testCreateDestroyGatewaySenderWithGatewayEventFilters() { + Integer punePort = vm1.invoke(() -> createFirstLocatorWithDSId(1)); + propsSetUp(punePort); + + vm2.invoke(() -> createFirstRemoteLocator(2, punePort)); + vm3.invoke(() -> createCache(punePort)); + vm4.invoke(() -> createCache(punePort)); + vm5.invoke(() -> createCache(punePort)); + + int socketReadTimeout = GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT + 1000; + String command = CliStrings.CREATE_GATEWAYSENDER + " --" + CliStrings.CREATE_GATEWAYSENDER__ID + + "=ln" + " --" + CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID + "=2" + " --" + + CliStrings.CREATE_GATEWAYSENDER__PARALLEL + "=false" + " --" + + CliStrings.CREATE_GATEWAYSENDER__MANUALSTART + "=true" + " --" + + CliStrings.CREATE_GATEWAYSENDER__SOCKETBUFFERSIZE + "=1000" + " --" + + CliStrings.CREATE_GATEWAYSENDER__SOCKETREADTIMEOUT + "=" + socketReadTimeout + " --" + + CliStrings.CREATE_GATEWAYSENDER__ENABLEBATCHCONFLATION + "=true" + " --" + + CliStrings.CREATE_GATEWAYSENDER__BATCHSIZE + "=1000" + " --" + + CliStrings.CREATE_GATEWAYSENDER__BATCHTIMEINTERVAL + "=5000" + " --" + + CliStrings.CREATE_GATEWAYSENDER__ENABLEPERSISTENCE + "=true" + " --" + + CliStrings.CREATE_GATEWAYSENDER__DISKSYNCHRONOUS + "=false" + " --" + + CliStrings.CREATE_GATEWAYSENDER__MAXQUEUEMEMORY + "=1000" + " --" + + CliStrings.CREATE_GATEWAYSENDER__ALERTTHRESHOLD + "=100" + " --" + + CliStrings.CREATE_GATEWAYSENDER__DISPATCHERTHREADS + "=2" + " --" + + CliStrings.CREATE_GATEWAYSENDER__ORDERPOLICY + "=THREAD" + " --" + + CliStrings.CREATE_GATEWAYSENDER__GATEWAYEVENTFILTER + + "=org.apache.geode.cache30.MyGatewayEventFilter1,org.apache.geode.cache30.MyGatewayEventFilter2"; + CommandResult cmdResult = executeCommandWithIgnoredExceptions(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info("testCreateDestroyGatewaySenderWithGatewayEventFilters stringResult : " + + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + TabularResultData resultData = (TabularResultData) cmdResult.getResultData(); + List<String> status = resultData.retrieveAllValues("Status"); + assertEquals(5, status.size()); + for (String stat : status) { + assertTrue("GatewaySender creation failed with: " + stat, !stat.contains("ERROR:")); + } + } else { + fail( + "testCreateDestroyGatewaySenderWithGatewayEventFilters failed as did not get CommandResult"); + } + vm3.invoke(() -> verifySenderState("ln", false, false)); + vm4.invoke(() -> verifySenderState("ln", false, false)); + vm5.invoke(() -> verifySenderState("ln", false, false)); + List<String> eventFilters = new ArrayList<String>(); + eventFilters.add("org.apache.geode.cache30.MyGatewayEventFilter1"); + eventFilters.add("org.apache.geode.cache30.MyGatewayEventFilter2"); + vm3.invoke(() -> verifySenderAttributes("ln", 2, false, true, 1000, socketReadTimeout, true, + 1000, 5000, true, false, 1000, 100, 2, OrderPolicy.THREAD, eventFilters, null)); + vm4.invoke(() -> verifySenderAttributes("ln", 2, false, true, 1000, socketReadTimeout, true, + 1000, 5000, true, false, 1000, 100, 2, OrderPolicy.THREAD, eventFilters, null)); + vm5.invoke(() -> verifySenderAttributes("ln", 2, false, true, 1000, socketReadTimeout, true, + 1000, 5000, true, false, 1000, 100, 2, OrderPolicy.THREAD, eventFilters, null)); + doDestroyAndVerifyGatewaySender("ln", null, null, + "testCreateDestroyGatewaySenderWithGatewayEventFilters", Arrays.asList(vm3, vm4, vm5), 5, + false); + } + + /** + * GatewaySender with given attribute values and transport filters. + */ + @Test + public void testCreateDestroyGatewaySenderWithGatewayTransportFilters() { + Integer punePort = vm1.invoke(() -> createFirstLocatorWithDSId(1)); + propsSetUp(punePort); + + vm2.invoke(() -> createFirstRemoteLocator(2, punePort)); + vm3.invoke(() -> createCache(punePort)); + vm4.invoke(() -> createCache(punePort)); + vm5.invoke(() -> createCache(punePort)); + + int socketReadTimeout = GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT + 1000; + String command = CliStrings.CREATE_GATEWAYSENDER + " --" + CliStrings.CREATE_GATEWAYSENDER__ID + + "=ln" + " --" + CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID + "=2" + " --" + + CliStrings.CREATE_GATEWAYSENDER__PARALLEL + "=false" + " --" + + CliStrings.CREATE_GATEWAYSENDER__MANUALSTART + "=true" + " --" + + CliStrings.CREATE_GATEWAYSENDER__SOCKETBUFFERSIZE + "=1000" + " --" + + CliStrings.CREATE_GATEWAYSENDER__SOCKETREADTIMEOUT + "=" + socketReadTimeout + " --" + + CliStrings.CREATE_GATEWAYSENDER__ENABLEBATCHCONFLATION + "=true" + " --" + + CliStrings.CREATE_GATEWAYSENDER__BATCHSIZE + "=1000" + " --" + + CliStrings.CREATE_GATEWAYSENDER__BATCHTIMEINTERVAL + "=5000" + " --" + + CliStrings.CREATE_GATEWAYSENDER__ENABLEPERSISTENCE + "=true" + " --" + + CliStrings.CREATE_GATEWAYSENDER__DISKSYNCHRONOUS + "=false" + " --" + + CliStrings.CREATE_GATEWAYSENDER__MAXQUEUEMEMORY + "=1000" + " --" + + CliStrings.CREATE_GATEWAYSENDER__ALERTTHRESHOLD + "=100" + " --" + + CliStrings.CREATE_GATEWAYSENDER__DISPATCHERTHREADS + "=2" + " --" + + CliStrings.CREATE_GATEWAYSENDER__ORDERPOLICY + "=THREAD" + " --" + + CliStrings.CREATE_GATEWAYSENDER__GATEWAYTRANSPORTFILTER + + "=org.apache.geode.cache30.MyGatewayTransportFilter1"; + CommandResult cmdResult = executeCommandWithIgnoredExceptions(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter() + .info("testCreateDestroyGatewaySenderWithGatewayTransportFilters stringResult : " + + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + TabularResultData resultData = (TabularResultData) cmdResult.getResultData(); + List<String> status = resultData.retrieveAllValues("Status"); + assertEquals(5, status.size()); + for (String stat : status) { + assertTrue("GatewaySender creation failed with: " + stat, !stat.contains("ERROR:")); + } + } else { + fail( + "testCreateDestroyGatewaySenderWithGatewayTransportFilters failed as did not get CommandResult"); + } + vm3.invoke(() -> verifySenderState("ln", false, false)); + vm4.invoke(() -> verifySenderState("ln", false, false)); + vm5.invoke(() -> verifySenderState("ln", false, false)); + List<String> transportFilters = new ArrayList<String>(); + transportFilters.add("org.apache.geode.cache30.MyGatewayTransportFilter1"); + vm3.invoke(() -> verifySenderAttributes("ln", 2, false, true, 1000, socketReadTimeout, true, + 1000, 5000, true, false, 1000, 100, 2, OrderPolicy.THREAD, null, transportFilters)); + vm4.invoke(() -> verifySenderAttributes("ln", 2, false, true, 1000, socketReadTimeout, true, + 1000, 5000, true, false, 1000, 100, 2, OrderPolicy.THREAD, null, transportFilters)); + vm5.invoke(() -> verifySenderAttributes("ln", 2, false, true, 1000, socketReadTimeout, true, + 1000, 5000, true, false, 1000, 100, 2, OrderPolicy.THREAD, null, transportFilters)); + doDestroyAndVerifyGatewaySender("ln", null, null, + "testCreateDestroyGatewaySenderWithGatewayTransportFilters", Arrays.asList(vm3, vm4, vm5), + 5, false); + } + + /** + * GatewaySender with given attribute values on given member. + */ + @Test + public void testCreateDestroyGatewaySender_OnMember() { + Integer punePort = vm1.invoke(() -> createFirstLocatorWithDSId(1)); + propsSetUp(punePort); + + vm2.invoke(() -> createFirstRemoteLocator(2, punePort)); + vm3.invoke(() -> createCache(punePort)); + vm4.invoke(() -> createCache(punePort)); + vm5.invoke(() -> createCache(punePort)); + + final DistributedMember vm3Member = vm3.invoke(this::getMember); + int socketReadTimeout = GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT + 1000; + String command = CliStrings.CREATE_GATEWAYSENDER + " --" + CliStrings.CREATE_GATEWAYSENDER__ID + + "=ln" + " --" + CliStrings.MEMBER + "=" + vm3Member.getId() + " --" + + CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID + "=2" + " --" + + CliStrings.CREATE_GATEWAYSENDER__PARALLEL + "=false" + " --" + + CliStrings.CREATE_GATEWAYSENDER__MANUALSTART + "=true" + " --" + + CliStrings.CREATE_GATEWAYSENDER__SOCKETBUFFERSIZE + "=1000" + " --" + + CliStrings.CREATE_GATEWAYSENDER__SOCKETREADTIMEOUT + "=" + socketReadTimeout + " --" + + CliStrings.CREATE_GATEWAYSENDER__ENABLEBATCHCONFLATION + "=true" + " --" + + CliStrings.CREATE_GATEWAYSENDER__BATCHSIZE + "=1000" + " --" + + CliStrings.CREATE_GATEWAYSENDER__BATCHTIMEINTERVAL + "=5000" + " --" + + CliStrings.CREATE_GATEWAYSENDER__ENABLEPERSISTENCE + "=true" + " --" + + CliStrings.CREATE_GATEWAYSENDER__DISKSYNCHRONOUS + "=false" + " --" + + CliStrings.CREATE_GATEWAYSENDER__MAXQUEUEMEMORY + "=1000" + " --" + + CliStrings.CREATE_GATEWAYSENDER__ALERTTHRESHOLD + "=100" + " --" + + CliStrings.CREATE_GATEWAYSENDER__DISPATCHERTHREADS + "=2" + " --" + + CliStrings.CREATE_GATEWAYSENDER__ORDERPOLICY + "=THREAD"; + CommandResult cmdResult = executeCommandWithIgnoredExceptions(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter() + .info("testCreateDestroyGatewaySender_OnMember stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + TabularResultData resultData = (TabularResultData) cmdResult.getResultData(); + List<String> status = resultData.retrieveAllValues("Status"); + assertEquals(1, status.size()); + for (String stat : status) { + assertTrue("GatewaySender creation failed with: " + stat, !stat.contains("ERROR:")); + } + } else { + fail("testCreateDestroyGatewaySender_OnMember failed as did not get CommandResult"); + } + vm3.invoke(() -> verifySenderState("ln", false, false)); + vm3.invoke(() -> verifySenderAttributes("ln", 2, false, true, 1000, socketReadTimeout, true, + 1000, 5000, true, false, 1000, 100, 2, OrderPolicy.THREAD, null, null)); + doDestroyAndVerifyGatewaySender("ln", null, vm3Member, + "testCreateDestroyGatewaySender_OnMember", Arrays.asList(vm3), 1, false); + } + + /** + * GatewaySender with given attribute values on given group + */ + @Test + public void testCreateDestroyGatewaySender_Group() { + Integer punePort = vm1.invoke(() -> createFirstLocatorWithDSId(1)); + propsSetUp(punePort); + + vm2.invoke(() -> createFirstRemoteLocator(2, punePort)); + vm3.invoke(() -> createCacheWithGroups(punePort, "SenderGroup1")); + vm4.invoke(() -> createCacheWithGroups(punePort, "SenderGroup1")); + vm5.invoke(() -> createCacheWithGroups(punePort, "SenderGroup1")); + + int socketReadTimeout = GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT + 1000; + String command = CliStrings.CREATE_GATEWAYSENDER + " --" + CliStrings.CREATE_GATEWAYSENDER__ID + + "=ln" + " --" + CliStrings.GROUP + "=SenderGroup1" + " --" + + CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID + "=2" + " --" + + CliStrings.CREATE_GATEWAYSENDER__PARALLEL + "=false" + " --" + + CliStrings.CREATE_GATEWAYSENDER__MANUALSTART + "=false" + " --" + + CliStrings.CREATE_GATEWAYSENDER__SOCKETBUFFERSIZE + "=1000" + " --" + + CliStrings.CREATE_GATEWAYSENDER__SOCKETREADTIMEOUT + "=" + socketReadTimeout + " --" + + CliStrings.CREATE_GATEWAYSENDER__ENABLEBATCHCONFLATION + "=true" + " --" + + CliStrings.CREATE_GATEWAYSENDER__BATCHSIZE + "=1000" + " --" + + CliStrings.CREATE_GATEWAYSENDER__BATCHTIMEINTERVAL + "=5000" + " --" + + CliStrings.CREATE_GATEWAYSENDER__ENABLEPERSISTENCE + "=true" + " --" + + CliStrings.CREATE_GATEWAYSENDER__DISKSYNCHRONOUS + "=false" + " --" + + CliStrings.CREATE_GATEWAYSENDER__MAXQUEUEMEMORY + "=1000" + " --" + + CliStrings.CREATE_GATEWAYSENDER__ALERTTHRESHOLD + "=100" + " --" + + CliStrings.CREATE_GATEWAYSENDER__DISPATCHERTHREADS + "=2" + " --" + + CliStrings.CREATE_GATEWAYSENDER__ORDERPOLICY + "=THREAD"; + CommandResult cmdResult = executeCommandWithIgnoredExceptions(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter() + .info("testCreateDestroyGatewaySender_Group stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData resultData = (TabularResultData) cmdResult.getResultData(); + List<String> status = resultData.retrieveAllValues("Status"); + assertEquals(3, status.size()); + for (String stat : status) { + assertTrue("GatewaySender creation failed with: " + stat, !stat.contains("ERROR:")); + } + } else { + fail("testCreateDestroyGatewaySender_Group failed as did not get CommandResult"); + } + vm3.invoke(() -> verifySenderState("ln", true, false)); + vm4.invoke(() -> verifySenderState("ln", true, false)); + vm5.invoke(() -> verifySenderState("ln", true, false)); + doDestroyAndVerifyGatewaySender("ln", "SenderGroup1", null, + "testCreateDestroyGatewaySender_Group", Arrays.asList(vm3, vm4, vm5), 3, false); + } + + /** + * GatewaySender with given attribute values on given group. Only 2 of 3 members are part of the + * group. + */ + @Test + public void testCreateDestroyGatewaySender_Group_Scenario2() { + Integer punePort = vm1.invoke(() -> createFirstLocatorWithDSId(1)); + propsSetUp(punePort); + + vm2.invoke(() -> createFirstRemoteLocator(2, punePort)); + vm3.invoke(() -> createCacheWithGroups(punePort, "SenderGroup1")); + vm4.invoke(() -> createCacheWithGroups(punePort, "SenderGroup1")); + vm5.invoke(() -> createCacheWithGroups(punePort, "SenderGroup2")); + + int socketReadTimeout = GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT + 1000; + String command = CliStrings.CREATE_GATEWAYSENDER + " --" + CliStrings.CREATE_GATEWAYSENDER__ID + + "=ln" + " --" + CliStrings.GROUP + "=SenderGroup1" + " --" + + CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID + "=2" + " --" + + CliStrings.CREATE_GATEWAYSENDER__PARALLEL + "=false" + " --" + + CliStrings.CREATE_GATEWAYSENDER__MANUALSTART + "=false" + " --" + + CliStrings.CREATE_GATEWAYSENDER__SOCKETBUFFERSIZE + "=1000" + " --" + + CliStrings.CREATE_GATEWAYSENDER__SOCKETREADTIMEOUT + "=" + socketReadTimeout + " --" + + CliStrings.CREATE_GATEWAYSENDER__ENABLEBATCHCONFLATION + "=true" + " --" + + CliStrings.CREATE_GATEWAYSENDER__BATCHSIZE + "=1000" + " --" + + CliStrings.CREATE_GATEWAYSENDER__BATCHTIMEINTERVAL + "=5000" + " --" + + CliStrings.CREATE_GATEWAYSENDER__ENABLEPERSISTENCE + "=true" + " --" + + CliStrings.CREATE_GATEWAYSENDER__DISKSYNCHRONOUS + "=false" + " --" + + CliStrings.CREATE_GATEWAYSENDER__MAXQUEUEMEMORY + "=1000" + " --" + + CliStrings.CREATE_GATEWAYSENDER__ALERTTHRESHOLD + "=100" + " --" + + CliStrings.CREATE_GATEWAYSENDER__DISPATCHERTHREADS + "=2" + " --" + + CliStrings.CREATE_GATEWAYSENDER__ORDERPOLICY + "=THREAD"; + CommandResult cmdResult = executeCommandWithIgnoredExceptions(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testCreateDestroyGatewaySender_Group_Scenario2 stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + TabularResultData resultData = (TabularResultData) cmdResult.getResultData(); + List<String> status = resultData.retrieveAllValues("Status"); + assertEquals(2, status.size()); + for (String stat : status) { + assertTrue("GatewaySender creation failed with: " + stat, !stat.contains("ERROR:")); + } + } else { + fail("testCreateDestroyGatewaySender_Group_Scenario2 failed as did not get CommandResult"); + } + vm3.invoke(() -> verifySenderState("ln", true, false)); + vm4.invoke(() -> verifySenderState("ln", true, false)); + doDestroyAndVerifyGatewaySender("ln", "SenderGroup1", null, + "testCreateDestroyGatewaySender_Group_Scenario2", Arrays.asList(vm3, vm4), 2, false); + } + + /** + * + * Parallel GatewaySender with given attribute values + + */ + @Test + public void testCreateDestroyParallelGatewaySender() { + Integer punePort = vm1.invoke(() -> createFirstLocatorWithDSId(1)); + propsSetUp(punePort); + + vm2.invoke(() -> createFirstRemoteLocator(2, punePort)); + vm3.invoke(() -> createCache(punePort)); + vm4.invoke(() -> createCache(punePort)); + vm5.invoke(() -> createCache(punePort)); + + int socketReadTimeout = GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT + 1000; + String command = CliStrings.CREATE_GATEWAYSENDER + " --" + CliStrings.CREATE_GATEWAYSENDER__ID + + "=ln" + " --" + CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID + "=2" + " --" + + CliStrings.CREATE_GATEWAYSENDER__PARALLEL + "=true" + " --" + + CliStrings.CREATE_GATEWAYSENDER__MANUALSTART + "=true" + " --" + + CliStrings.CREATE_GATEWAYSENDER__SOCKETBUFFERSIZE + "=1000" + " --" + + CliStrings.CREATE_GATEWAYSENDER__SOCKETREADTIMEOUT + "=" + socketReadTimeout + " --" + + CliStrings.CREATE_GATEWAYSENDER__ENABLEBATCHCONFLATION + "=true" + " --" + + CliStrings.CREATE_GATEWAYSENDER__BATCHSIZE + "=1000" + " --" + + CliStrings.CREATE_GATEWAYSENDER__BATCHTIMEINTERVAL + "=5000" + " --" + + CliStrings.CREATE_GATEWAYSENDER__ENABLEPERSISTENCE + "=true" + " --" + + CliStrings.CREATE_GATEWAYSENDER__DISKSYNCHRONOUS + "=false" + " --" + + CliStrings.CREATE_GATEWAYSENDER__MAXQUEUEMEMORY + "=1000" + " --" + + CliStrings.CREATE_GATEWAYSENDER__ALERTTHRESHOLD + "=100"; + CommandResult cmdResult = executeCommandWithIgnoredExceptions(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter() + .info("testCreateDestroyParallelGatewaySender stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData resultData = (TabularResultData) cmdResult.getResultData(); + List<String> status = resultData.retrieveAllValues("Status"); + assertEquals(5, status.size()); + for (String stat : status) { + assertTrue("GatewaySender creation failed with: " + stat, !stat.contains("ERROR:")); + } + } else { + fail("testCreateDestroyParallelGatewaySender failed as did not get CommandResult"); + } + vm3.invoke(() -> verifySenderState("ln", false, false)); + vm4.invoke(() -> verifySenderState("ln", false, false)); + vm5.invoke(() -> verifySenderState("ln", false, false)); + vm3.invoke( + () -> verifySenderAttributes("ln", 2, true, true, 1000, socketReadTimeout, true, 1000, 5000, + true, false, 1000, 100, GatewaySender.DEFAULT_DISPATCHER_THREADS, null, null, null)); + vm4.invoke( + () -> verifySenderAttributes("ln", 2, true, true, 1000, socketReadTimeout, true, 1000, 5000, + true, false, 1000, 100, GatewaySender.DEFAULT_DISPATCHER_THREADS, null, null, null)); + vm5.invoke( + () -> verifySenderAttributes("ln", 2, true, true, 1000, socketReadTimeout, true, 1000, 5000, + true, false, 1000, 100, GatewaySender.DEFAULT_DISPATCHER_THREADS, null, null, null)); + doDestroyAndVerifyGatewaySender("ln", null, null, "testCreateDestroyParallelGatewaySender", + Arrays.asList(vm3, vm4), 5, true); + } + + /** + * doDestroyAndVerifyGatewaySender helper command. + * + * @param id if of the Gateway Sender + * @param group Group for the GatewaySender + * @param member Distributed Member for memeber id. + * @param testName testName for the logging + * @param vms list of vms where to verify the destroyed gateway sender + * @param size command result. + * @param isParallel true if parallel , false otherwise. + */ + private void doDestroyAndVerifyGatewaySender(final String id, final String group, + final DistributedMember member, final String testName, final List<VM> vms, final int size, + final boolean isParallel) { + String command = + CliStrings.DESTROY_GATEWAYSENDER + " --" + CliStrings.DESTROY_GATEWAYSENDER__ID + "=" + id; + if (group != null) { + command += " --" + CliStrings.GROUP + "=" + group; + } + if (member != null) { + command += " --" + CliStrings.MEMBER + "=" + member.getId(); + } + final CommandResult cmdResult = executeCommandWithIgnoredExceptions(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info(testName + " stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData resultData = (TabularResultData) cmdResult.getResultData(); + List<String> status = resultData.retrieveAllValues("Status"); + assertEquals(size, status.size()); + for (String stat : status) { + assertTrue("GatewaySender destroy failed with: " + stat, !stat.contains("ERROR:")); + } + } else { + fail(testName + " failed as did not get CommandResult"); + } + for (VM vm : vms) { + vm.invoke(() -> verifySenderDestroyed(id, isParallel)); + } + } + + private CommandResult executeCommandWithIgnoredExceptions(String command) { + final IgnoredException ignored = IgnoredException.addIgnoredException("Could not connect"); + try { + return executeCommand(command); + } finally { + ignored.remove(); + } + } +}
http://git-wip-us.apache.org/repos/asf/geode/blob/e1befef4/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/CreateGatewayReceiverCommandDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/CreateGatewayReceiverCommandDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/CreateGatewayReceiverCommandDUnitTest.java new file mode 100644 index 0000000..5fb18a5 --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/CreateGatewayReceiverCommandDUnitTest.java @@ -0,0 +1,506 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.wan.wancommand; + +import static org.apache.geode.test.dunit.Assert.assertEquals; +import static org.apache.geode.test.dunit.Assert.assertTrue; +import static org.apache.geode.test.dunit.Assert.fail; +import static org.apache.geode.test.dunit.LogWriterUtils.getLogWriter; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.cache.wan.GatewayReceiver; +import org.apache.geode.distributed.DistributedMember; +import org.apache.geode.management.cli.Result; +import org.apache.geode.management.internal.cli.i18n.CliStrings; +import org.apache.geode.management.internal.cli.result.CommandResult; +import org.apache.geode.management.internal.cli.result.TabularResultData; +import org.apache.geode.test.dunit.Host; +import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.junit.categories.DistributedTest; +import org.apache.geode.test.junit.categories.FlakyTest; + +/** + * DUnit tests for 'create gateway-receiver' command. + */ +@Category(DistributedTest.class) +public class CreateGatewayReceiverCommandDUnitTest extends WANCommandTestBase { + + private static final long serialVersionUID = 1L; + + /** + * GatewayReceiver with all default attributes + */ + @Test + public void testCreateGatewayReceiverWithDefault() { + VM puneLocator = Host.getLocator(); + int punePort = puneLocator.invoke(this::getLocatorPort); + propsSetUp(punePort); + vm2.invoke(() -> createFirstRemoteLocator(2, punePort)); + + vm3.invoke(() -> createCache(punePort)); + vm4.invoke(() -> createCache(punePort)); + vm5.invoke(() -> createCache(punePort)); + + String command = CliStrings.CREATE_GATEWAYRECEIVER; + CommandResult cmdResult = executeCommand(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info("testCreateGatewayReceiver stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData resultData = (TabularResultData) cmdResult.getResultData(); + List<String> status = resultData.retrieveAllValues("Status"); + assertEquals(4, status.size());// expected size 4 includes the manager node + // verify there is no error in the status + for (String stat : status) { + assertTrue("GatewayReceiver creation failed with: " + stat, !stat.contains("ERROR:")); + } + } else { + fail("testCreateGatewayReceiver failed as did not get CommandResult"); + } + + vm3.invoke(() -> verifyReceiverCreationWithAttributes(!GatewayReceiver.DEFAULT_MANUAL_START, + GatewayReceiver.DEFAULT_START_PORT, GatewayReceiver.DEFAULT_END_PORT, + GatewayReceiver.DEFAULT_BIND_ADDRESS, GatewayReceiver.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, + GatewayReceiver.DEFAULT_SOCKET_BUFFER_SIZE, null)); + vm4.invoke(() -> verifyReceiverCreationWithAttributes(!GatewayReceiver.DEFAULT_MANUAL_START, + GatewayReceiver.DEFAULT_START_PORT, GatewayReceiver.DEFAULT_END_PORT, + GatewayReceiver.DEFAULT_BIND_ADDRESS, GatewayReceiver.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, + GatewayReceiver.DEFAULT_SOCKET_BUFFER_SIZE, null)); + vm5.invoke(() -> verifyReceiverCreationWithAttributes(!GatewayReceiver.DEFAULT_MANUAL_START, + GatewayReceiver.DEFAULT_START_PORT, GatewayReceiver.DEFAULT_END_PORT, + GatewayReceiver.DEFAULT_BIND_ADDRESS, GatewayReceiver.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, + GatewayReceiver.DEFAULT_SOCKET_BUFFER_SIZE, null)); + } + + /** + * GatewayReceiver with given attributes + */ + @Test + public void testCreateGatewayReceiver() { + VM puneLocator = Host.getLocator(); + int punePort = puneLocator.invoke(this::getLocatorPort); + propsSetUp(punePort); + vm2.invoke(() -> createFirstRemoteLocator(2, punePort)); + + vm3.invoke(() -> createCache(punePort)); + vm4.invoke(() -> createCache(punePort)); + vm5.invoke(() -> createCache(punePort)); + + String command = + CliStrings.CREATE_GATEWAYRECEIVER + " --" + CliStrings.CREATE_GATEWAYRECEIVER__MANUALSTART + + "=true" + " --" + CliStrings.CREATE_GATEWAYRECEIVER__BINDADDRESS + "=localhost" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__STARTPORT + "=10000" + " --" + + CliStrings.CREATE_GATEWAYRECEIVER__ENDPORT + "=11000" + " --" + + CliStrings.CREATE_GATEWAYRECEIVER__MAXTIMEBETWEENPINGS + "=100000" + " --" + + CliStrings.CREATE_GATEWAYRECEIVER__SOCKETBUFFERSIZE + "=512000"; + CommandResult cmdResult = executeCommand(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info("testCreateGatewayReceiver stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData resultData = (TabularResultData) cmdResult.getResultData(); + List<String> status = resultData.retrieveAllValues("Status"); + assertEquals(4, status.size());// expected size 4 includes the manager node + // verify there is no error in the status + for (String stat : status) { + assertTrue("GatewayReceiver creation failed with: " + stat, !stat.contains("ERROR:")); + } + } else { + fail("testCreateGatewayReceiver failed as did not get CommandResult"); + } + vm3.invoke(() -> verifyReceiverCreationWithAttributes(false, 10000, 11000, "localhost", 100000, + 512000, null)); + vm4.invoke(() -> verifyReceiverCreationWithAttributes(false, 10000, 11000, "localhost", 100000, + 512000, null)); + vm5.invoke(() -> verifyReceiverCreationWithAttributes(false, 10000, 11000, "localhost", 100000, + 512000, null)); + } + + /** + * GatewayReceiver with given attributes and a single GatewayTransportFilter. + */ + @Test + public void testCreateGatewayReceiverWithGatewayTransportFilter() { + VM puneLocator = Host.getLocator(); + int punePort = puneLocator.invoke(this::getLocatorPort); + propsSetUp(punePort); + vm2.invoke(() -> createFirstRemoteLocator(2, punePort)); + + vm3.invoke(() -> createCache(punePort)); + vm4.invoke(() -> createCache(punePort)); + vm5.invoke(() -> createCache(punePort)); + + String command = + CliStrings.CREATE_GATEWAYRECEIVER + " --" + CliStrings.CREATE_GATEWAYRECEIVER__MANUALSTART + + "=false" + " --" + CliStrings.CREATE_GATEWAYRECEIVER__BINDADDRESS + "=localhost" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__STARTPORT + "=10000" + " --" + + CliStrings.CREATE_GATEWAYRECEIVER__ENDPORT + "=11000" + " --" + + CliStrings.CREATE_GATEWAYRECEIVER__MAXTIMEBETWEENPINGS + "=100000" + " --" + + CliStrings.CREATE_GATEWAYRECEIVER__SOCKETBUFFERSIZE + "=512000" + " --" + + CliStrings.CREATE_GATEWAYRECEIVER__GATEWAYTRANSPORTFILTER + + "=org.apache.geode.cache30.MyGatewayTransportFilter1"; + CommandResult cmdResult = executeCommand(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info("testCreateGatewayReceiver stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData resultData = (TabularResultData) cmdResult.getResultData(); + List<String> status = resultData.retrieveAllValues("Status"); + assertEquals(4, status.size());// expected size 4 includes the manager node + // verify there is no error in the status + for (String stat : status) { + assertTrue("GatewayReceiver creation failed with: " + stat, !stat.contains("ERROR:")); + } + } else { + fail("testCreateGatewayReceiver failed as did not get CommandResult"); + } + List<String> transportFilters = new ArrayList<String>(); + transportFilters.add("org.apache.geode.cache30.MyGatewayTransportFilter1"); + + vm3.invoke(() -> verifyReceiverCreationWithAttributes(true, 10000, 11000, "localhost", 100000, + 512000, transportFilters)); + vm4.invoke(() -> verifyReceiverCreationWithAttributes(true, 10000, 11000, "localhost", 100000, + 512000, transportFilters)); + vm5.invoke(() -> verifyReceiverCreationWithAttributes(true, 10000, 11000, "localhost", 100000, + 512000, transportFilters)); + } + + /** + * GatewayReceiver with given attributes and multiple GatewayTransportFilters. + */ + @Test + public void testCreateGatewayReceiverWithMultipleGatewayTransportFilters() { + VM puneLocator = Host.getLocator(); + int punePort = puneLocator.invoke(this::getLocatorPort); + propsSetUp(punePort); + vm2.invoke(() -> createFirstRemoteLocator(2, punePort)); + + vm3.invoke(() -> createCache(punePort)); + vm4.invoke(() -> createCache(punePort)); + vm5.invoke(() -> createCache(punePort)); + + String command = CliStrings.CREATE_GATEWAYRECEIVER + " --" + + CliStrings.CREATE_GATEWAYRECEIVER__BINDADDRESS + "=localhost" + " --" + + CliStrings.CREATE_GATEWAYRECEIVER__STARTPORT + "=10000" + " --" + + CliStrings.CREATE_GATEWAYRECEIVER__ENDPORT + "=11000" + " --" + + CliStrings.CREATE_GATEWAYRECEIVER__MAXTIMEBETWEENPINGS + "=100000" + " --" + + CliStrings.CREATE_GATEWAYRECEIVER__SOCKETBUFFERSIZE + "=512000" + " --" + + CliStrings.CREATE_GATEWAYRECEIVER__GATEWAYTRANSPORTFILTER + + "=org.apache.geode.cache30.MyGatewayTransportFilter1,org.apache.geode.cache30.MyGatewayTransportFilter2"; + CommandResult cmdResult = executeCommand(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info("testCreateGatewayReceiver stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData resultData = (TabularResultData) cmdResult.getResultData(); + List<String> status = resultData.retrieveAllValues("Status"); + assertEquals(4, status.size());// expected size 4 includes the manager node + // verify there is no error in the status + for (String stat : status) { + assertTrue("GatewayReceiver creation failed with: " + stat, !stat.contains("ERROR:")); + } + } else { + fail("testCreateGatewayReceiver failed as did not get CommandResult"); + } + List<String> transportFilters = new ArrayList<String>(); + transportFilters.add("org.apache.geode.cache30.MyGatewayTransportFilter1"); + transportFilters.add("org.apache.geode.cache30.MyGatewayTransportFilter2"); + + vm3.invoke(() -> verifyReceiverCreationWithAttributes(!GatewayReceiver.DEFAULT_MANUAL_START, + 10000, 11000, "localhost", 100000, 512000, transportFilters)); + vm4.invoke(() -> verifyReceiverCreationWithAttributes(!GatewayReceiver.DEFAULT_MANUAL_START, + 10000, 11000, "localhost", 100000, 512000, transportFilters)); + vm5.invoke(() -> verifyReceiverCreationWithAttributes(!GatewayReceiver.DEFAULT_MANUAL_START, + 10000, 11000, "localhost", 100000, 512000, transportFilters)); + } + + /** + * GatewayReceiver with given attributes. Error scenario where startPort is greater than endPort. + */ + @Test + public void testCreateGatewayReceiver_Error() { + VM puneLocator = Host.getLocator(); + int punePort = puneLocator.invoke(this::getLocatorPort); + propsSetUp(punePort); + vm2.invoke(() -> createFirstRemoteLocator(2, punePort)); + + vm3.invoke(() -> createCache(punePort)); + vm4.invoke(() -> createCache(punePort)); + vm5.invoke(() -> createCache(punePort)); + + String command = + CliStrings.CREATE_GATEWAYRECEIVER + " --" + CliStrings.CREATE_GATEWAYRECEIVER__BINDADDRESS + + "=localhost" + " --" + CliStrings.CREATE_GATEWAYRECEIVER__STARTPORT + "=11000" + " --" + + CliStrings.CREATE_GATEWAYRECEIVER__ENDPORT + "=10000" + " --" + + CliStrings.CREATE_GATEWAYRECEIVER__MAXTIMEBETWEENPINGS + "=100000" + " --" + + CliStrings.CREATE_GATEWAYRECEIVER__SOCKETBUFFERSIZE + "=512000"; + CommandResult cmdResult = executeCommand(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info("testCreateGatewayReceiver stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData resultData = (TabularResultData) cmdResult.getResultData(); + List<String> status = resultData.retrieveAllValues("Status"); + assertEquals(4, status.size());// expected size 4 includes the manager + // node + // verify there is no error in the status + for (String stat : status) { + assertTrue("GatewayReceiver creation should have failed", stat.contains("ERROR:")); + } + } else { + fail("testCreateGatewayReceiver failed as did not get CommandResult"); + } + } + + /** + * GatewayReceiver with given attributes on the given member. + */ + @Test + public void testCreateGatewayReceiver_onMember() { + VM puneLocator = Host.getLocator(); + int punePort = puneLocator.invoke(this::getLocatorPort); + propsSetUp(punePort); + vm2.invoke(() -> createFirstRemoteLocator(2, punePort)); + + vm3.invoke(() -> createCache(punePort)); + vm4.invoke(() -> createCache(punePort)); + vm5.invoke(() -> createCache(punePort)); + + final DistributedMember vm3Member = vm3.invoke(this::getMember); + + String command = + CliStrings.CREATE_GATEWAYRECEIVER + " --" + CliStrings.CREATE_GATEWAYRECEIVER__MANUALSTART + + "=true" + " --" + CliStrings.CREATE_GATEWAYRECEIVER__BINDADDRESS + "=localhost" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__STARTPORT + "=10000" + " --" + + CliStrings.CREATE_GATEWAYRECEIVER__ENDPORT + "=11000" + " --" + + CliStrings.CREATE_GATEWAYRECEIVER__MAXTIMEBETWEENPINGS + "=100000" + " --" + + CliStrings.CREATE_GATEWAYRECEIVER__SOCKETBUFFERSIZE + "=512000" + " --" + + CliStrings.MEMBER + "=" + vm3Member.getId(); + CommandResult cmdResult = executeCommand(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info("testCreateGatewayReceiver stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData resultData = (TabularResultData) cmdResult.getResultData(); + List<String> status = resultData.retrieveAllValues("Status"); + assertEquals(1, status.size()); + // verify there is no error in the status + for (String stat : status) { + assertTrue("GatewayReceiver creation failed with: " + stat, !stat.contains("ERROR:")); + } + } else { + fail("testCreateGatewayReceiver failed as did not get CommandResult"); + } + vm3.invoke(() -> verifyReceiverCreationWithAttributes(false, 10000, 11000, "localhost", 100000, + 512000, null)); + } + + /** + * GatewayReceiver with given attributes on multiple members. + */ + @Category(FlakyTest.class) // GEODE-1355 + @Test + public void testCreateGatewayReceiver_onMultipleMembers() { + VM puneLocator = Host.getLocator(); + int punePort = puneLocator.invoke(this::getLocatorPort); + propsSetUp(punePort); + vm2.invoke(() -> createFirstRemoteLocator(2, punePort)); + + vm3.invoke(() -> createCache(punePort)); + vm4.invoke(() -> createCache(punePort)); + vm5.invoke(() -> createCache(punePort)); + + final DistributedMember vm3Member = vm3.invoke(this::getMember); + final DistributedMember vm4Member = vm4.invoke(this::getMember); + + String command = + CliStrings.CREATE_GATEWAYRECEIVER + " --" + CliStrings.CREATE_GATEWAYRECEIVER__MANUALSTART + + "=true" + " --" + CliStrings.CREATE_GATEWAYRECEIVER__BINDADDRESS + "=localhost" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__STARTPORT + "=10000" + " --" + + CliStrings.CREATE_GATEWAYRECEIVER__ENDPORT + "=11000" + " --" + + CliStrings.CREATE_GATEWAYRECEIVER__MAXTIMEBETWEENPINGS + "=100000" + " --" + + CliStrings.CREATE_GATEWAYRECEIVER__SOCKETBUFFERSIZE + "=512000" + " --" + + CliStrings.MEMBER + "=" + vm3Member.getId() + "," + vm4Member.getId(); + CommandResult cmdResult = executeCommand(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info("testCreateGatewayReceiver stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData resultData = (TabularResultData) cmdResult.getResultData(); + List<String> status = resultData.retrieveAllValues("Status"); + assertEquals(2, status.size()); + // verify there is no error in the status + for (String stat : status) { + assertTrue("GatewayReceiver creation failed with: " + stat, !stat.contains("ERROR:")); + } + } else { + fail("testCreateGatewayReceiver failed as did not get CommandResult"); + } + vm3.invoke(() -> verifyReceiverCreationWithAttributes(false, 10000, 11000, "localhost", 100000, + 512000, null)); + vm4.invoke(() -> verifyReceiverCreationWithAttributes(false, 10000, 11000, "localhost", 100000, + 512000, null)); + } + + /** + * GatewayReceiver with given attributes on the given group. + */ + @Test + public void testCreateGatewayReceiver_onGroup() { + VM puneLocator = Host.getLocator(); + int punePort = puneLocator.invoke(this::getLocatorPort); + propsSetUp(punePort); + vm2.invoke(() -> createFirstRemoteLocator(2, punePort)); + + vm3.invoke(() -> createCacheWithGroups(punePort, "receiverGroup1")); + vm4.invoke(() -> createCacheWithGroups(punePort, "receiverGroup1")); + vm5.invoke(() -> createCacheWithGroups(punePort, "receiverGroup1")); + + String command = + CliStrings.CREATE_GATEWAYRECEIVER + " --" + CliStrings.CREATE_GATEWAYRECEIVER__MANUALSTART + + "=true" + " --" + CliStrings.CREATE_GATEWAYRECEIVER__BINDADDRESS + "=localhost" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__STARTPORT + "=10000" + " --" + + CliStrings.CREATE_GATEWAYRECEIVER__ENDPORT + "=11000" + " --" + + CliStrings.CREATE_GATEWAYRECEIVER__MAXTIMEBETWEENPINGS + "=100000" + " --" + + CliStrings.CREATE_GATEWAYRECEIVER__SOCKETBUFFERSIZE + "=512000" + " --" + + CliStrings.GROUP + "=receiverGroup1"; + CommandResult cmdResult = executeCommand(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info("testCreateGatewayReceiver stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData resultData = (TabularResultData) cmdResult.getResultData(); + List<String> status = resultData.retrieveAllValues("Status"); + assertEquals(3, status.size());// + // verify there is no error in the status + for (String stat : status) { + assertTrue("GatewayReceiver creation failed with: " + stat, !stat.contains("ERROR:")); + } + } else { + fail("testCreateGatewayReceiver failed as did not get CommandResult"); + } + + vm3.invoke(() -> verifyReceiverCreationWithAttributes(false, 10000, 11000, "localhost", 100000, + 512000, null)); + vm4.invoke(() -> verifyReceiverCreationWithAttributes(false, 10000, 11000, "localhost", 100000, + 512000, null)); + vm5.invoke(() -> verifyReceiverCreationWithAttributes(false, 10000, 11000, "localhost", 100000, + 512000, null)); + } + + /** + * GatewayReceiver with given attributes on the given group. Only 2 of 3 members are part of the + * group. + */ + @Test + public void testCreateGatewayReceiver_onGroup_Scenario2() { + VM puneLocator = Host.getLocator(); + int punePort = puneLocator.invoke(this::getLocatorPort); + propsSetUp(punePort); + vm2.invoke(() -> createFirstRemoteLocator(2, punePort)); + + vm3.invoke(() -> createCacheWithGroups(punePort, "receiverGroup1")); + vm4.invoke(() -> createCacheWithGroups(punePort, "receiverGroup1")); + vm5.invoke(() -> createCacheWithGroups(punePort, "receiverGroup2")); + + String command = + CliStrings.CREATE_GATEWAYRECEIVER + " --" + CliStrings.CREATE_GATEWAYRECEIVER__MANUALSTART + + "=true" + " --" + CliStrings.CREATE_GATEWAYRECEIVER__BINDADDRESS + "=localhost" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__STARTPORT + "=10000" + " --" + + CliStrings.CREATE_GATEWAYRECEIVER__ENDPORT + "=11000" + " --" + + CliStrings.CREATE_GATEWAYRECEIVER__MAXTIMEBETWEENPINGS + "=100000" + " --" + + CliStrings.CREATE_GATEWAYRECEIVER__SOCKETBUFFERSIZE + "=512000" + " --" + + CliStrings.GROUP + "=receiverGroup1"; + CommandResult cmdResult = executeCommand(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info("testCreateGatewayReceiver stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData resultData = (TabularResultData) cmdResult.getResultData(); + List<String> status = resultData.retrieveAllValues("Status"); + assertEquals(2, status.size());// + // verify there is no error in the status + for (String stat : status) { + assertTrue("GatewayReceiver creation failed with: " + stat, !stat.contains("ERROR:")); + } + } else { + fail("testCreateGatewayReceiver failed as did not get CommandResult"); + } + vm3.invoke(() -> verifyReceiverCreationWithAttributes(false, 10000, 11000, "localhost", 100000, + 512000, null)); + vm4.invoke(() -> verifyReceiverCreationWithAttributes(false, 10000, 11000, "localhost", 100000, + 512000, null)); + } + + /** + * GatewayReceiver with given attributes on multiple groups. + */ + @Test + public void testCreateGatewayReceiver_onMultipleGroups() { + VM puneLocator = Host.getLocator(); + int punePort = puneLocator.invoke(this::getLocatorPort); + propsSetUp(punePort); + vm2.invoke(() -> createFirstRemoteLocator(2, punePort)); + + vm3.invoke(() -> createCacheWithGroups(punePort, "receiverGroup1")); + vm4.invoke(() -> createCacheWithGroups(punePort, "receiverGroup1")); + vm5.invoke(() -> createCacheWithGroups(punePort, "receiverGroup2")); + + String command = + CliStrings.CREATE_GATEWAYRECEIVER + " --" + CliStrings.CREATE_GATEWAYRECEIVER__MANUALSTART + + "=true" + " --" + CliStrings.CREATE_GATEWAYRECEIVER__BINDADDRESS + "=localhost" + + " --" + CliStrings.CREATE_GATEWAYRECEIVER__STARTPORT + "=10000" + " --" + + CliStrings.CREATE_GATEWAYRECEIVER__ENDPORT + "=11000" + " --" + + CliStrings.CREATE_GATEWAYRECEIVER__MAXTIMEBETWEENPINGS + "=100000" + " --" + + CliStrings.CREATE_GATEWAYRECEIVER__SOCKETBUFFERSIZE + "=512000" + " --" + + CliStrings.GROUP + "=receiverGroup1,receiverGroup2"; + CommandResult cmdResult = executeCommand(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info("testCreateGatewayReceiver stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData resultData = (TabularResultData) cmdResult.getResultData(); + List<String> status = resultData.retrieveAllValues("Status"); + assertEquals(3, status.size());// + // verify there is no error in the status + for (String stat : status) { + assertTrue("GatewayReceiver creation failed with: " + stat, !stat.contains("ERROR:")); + } + } else { + fail("testCreateGatewayReceiver failed as did not get CommandResult"); + } + vm3.invoke(() -> verifyReceiverCreationWithAttributes(false, 10000, 11000, "localhost", 100000, + 512000, null)); + vm4.invoke(() -> verifyReceiverCreationWithAttributes(false, 10000, 11000, "localhost", 100000, + 512000, null)); + vm5.invoke(() -> verifyReceiverCreationWithAttributes(false, 10000, 11000, "localhost", 100000, + 512000, null)); + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/e1befef4/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/CreateGatewaySenderCommandDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/CreateGatewaySenderCommandDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/CreateGatewaySenderCommandDUnitTest.java new file mode 100644 index 0000000..5132ef5 --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/CreateGatewaySenderCommandDUnitTest.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.cache.wan.wancommand; + +import static org.apache.geode.test.dunit.LogWriterUtils.getLogWriter; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.List; + +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.cache.wan.GatewaySender; +import org.apache.geode.internal.cache.wan.GatewaySenderException; +import org.apache.geode.management.cli.Result; +import org.apache.geode.management.internal.cli.i18n.CliStrings; +import org.apache.geode.management.internal.cli.result.CommandResult; +import org.apache.geode.management.internal.cli.result.TabularResultData; +import org.apache.geode.test.dunit.IgnoredException; +import org.apache.geode.test.junit.categories.DistributedTest; + +@Category(DistributedTest.class) +public class CreateGatewaySenderCommandDUnitTest extends WANCommandTestBase { + + /** + * GatewaySender with given attribute values. Error scenario where dispatcher threads is set to + * more than 1 and no order policy provided. + */ + @Test + public void testCreateGatewaySender_Error() { + Integer punePort = vm1.invoke(() -> createFirstLocatorWithDSId(1)); + propsSetUp(punePort); + + vm2.invoke(() -> createFirstRemoteLocator(2, punePort)); + vm3.invoke(() -> createCache(punePort)); + vm4.invoke(() -> createCache(punePort)); + vm5.invoke(() -> createCache(punePort)); + + int socketReadTimeout = GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT + 1000; + String command = CliStrings.CREATE_GATEWAYSENDER + " --" + CliStrings.CREATE_GATEWAYSENDER__ID + + "=ln" + " --" + CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID + "=2" + " --" + + CliStrings.CREATE_GATEWAYSENDER__PARALLEL + "=false" + " --" + + CliStrings.CREATE_GATEWAYSENDER__MANUALSTART + "=true" + " --" + + CliStrings.CREATE_GATEWAYSENDER__SOCKETBUFFERSIZE + "=1000" + " --" + + CliStrings.CREATE_GATEWAYSENDER__SOCKETREADTIMEOUT + "=" + socketReadTimeout + " --" + + CliStrings.CREATE_GATEWAYSENDER__ENABLEBATCHCONFLATION + "=true" + " --" + + CliStrings.CREATE_GATEWAYSENDER__BATCHSIZE + "=1000" + " --" + + CliStrings.CREATE_GATEWAYSENDER__BATCHTIMEINTERVAL + "=5000" + " --" + + CliStrings.CREATE_GATEWAYSENDER__ENABLEPERSISTENCE + "=true" + " --" + + CliStrings.CREATE_GATEWAYSENDER__DISKSYNCHRONOUS + "=false" + " --" + + CliStrings.CREATE_GATEWAYSENDER__MAXQUEUEMEMORY + "=1000" + " --" + + CliStrings.CREATE_GATEWAYSENDER__ALERTTHRESHOLD + "=100" + " --" + + CliStrings.CREATE_GATEWAYSENDER__DISPATCHERTHREADS + "=2"; + CommandResult cmdResult = executeCommandWithIgnoredExceptions(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info("testCreateDestroyGatewaySender stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData resultData = (TabularResultData) cmdResult.getResultData(); + List<String> status = resultData.retrieveAllValues("Status"); + assertEquals(5, status.size()); + for (String stat : status) { + assertTrue("GatewaySender creation should fail", stat.contains("ERROR:")); + } + } else { + fail("testCreateDestroyGatewaySender failed as did not get CommandResult"); + } + } + + /** + * Parallel GatewaySender with given attribute values. Provide dispatcherThreads as 2 which is not + * valid for Parallel sender. + */ + @Test + public void testCreateParallelGatewaySender_Error() { + Integer punePort = vm1.invoke(() -> createFirstLocatorWithDSId(1)); + propsSetUp(punePort); + + vm2.invoke(() -> createFirstRemoteLocator(2, punePort)); + vm3.invoke(() -> createCache(punePort)); + vm4.invoke(() -> createCache(punePort)); + vm5.invoke(() -> createCache(punePort)); + + int socketReadTimeout = GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT + 1000; + String command = CliStrings.CREATE_GATEWAYSENDER + " --" + CliStrings.CREATE_GATEWAYSENDER__ID + + "=ln" + " --" + CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID + "=2" + " --" + + CliStrings.CREATE_GATEWAYSENDER__PARALLEL + "=true" + " --" + + CliStrings.CREATE_GATEWAYSENDER__MANUALSTART + "=true" + " --" + + CliStrings.CREATE_GATEWAYSENDER__SOCKETBUFFERSIZE + "=1000" + " --" + + CliStrings.CREATE_GATEWAYSENDER__SOCKETREADTIMEOUT + "=" + socketReadTimeout + " --" + + CliStrings.CREATE_GATEWAYSENDER__ENABLEBATCHCONFLATION + "=true" + " --" + + CliStrings.CREATE_GATEWAYSENDER__BATCHSIZE + "=1000" + " --" + + CliStrings.CREATE_GATEWAYSENDER__BATCHTIMEINTERVAL + "=5000" + " --" + + CliStrings.CREATE_GATEWAYSENDER__ENABLEPERSISTENCE + "=true" + " --" + + CliStrings.CREATE_GATEWAYSENDER__DISKSYNCHRONOUS + "=false" + " --" + + CliStrings.CREATE_GATEWAYSENDER__MAXQUEUEMEMORY + "=1000" + " --" + + CliStrings.CREATE_GATEWAYSENDER__ALERTTHRESHOLD + "=100" + " --" + + CliStrings.CREATE_GATEWAYSENDER__DISPATCHERTHREADS + "=2" + " --" + + CliStrings.CREATE_GATEWAYSENDER__ORDERPOLICY + "=THREAD"; + IgnoredException exp = + IgnoredException.addIgnoredException(GatewaySenderException.class.getName()); + try { + CommandResult cmdResult = executeCommandWithIgnoredExceptions(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter() + .info("testCreateParallelGatewaySender_Error stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + + TabularResultData resultData = (TabularResultData) cmdResult.getResultData(); + List<String> status = resultData.retrieveAllValues("Status"); + assertEquals(5, status.size()); + for (String stat : status) { + assertTrue("GatewaySender creation should have failed", stat.contains("ERROR:")); + } + } else { + fail("testCreateParallelGatewaySender_Error failed as did not get CommandResult"); + } + } finally { + exp.remove(); + } + } + + private CommandResult executeCommandWithIgnoredExceptions(String command) { + final IgnoredException ignored = IgnoredException.addIgnoredException("Could not connect"); + try { + return executeCommand(command); + } finally { + ignored.remove(); + } + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/e1befef4/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/DestroyGatewaySenderCommandDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/DestroyGatewaySenderCommandDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/DestroyGatewaySenderCommandDUnitTest.java new file mode 100644 index 0000000..6ec0d72 --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/DestroyGatewaySenderCommandDUnitTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.cache.wan.wancommand; + +import static org.apache.geode.test.dunit.LogWriterUtils.getLogWriter; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.List; + +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.management.cli.Result; +import org.apache.geode.management.internal.cli.i18n.CliStrings; +import org.apache.geode.management.internal.cli.result.CommandResult; +import org.apache.geode.management.internal.cli.result.TabularResultData; +import org.apache.geode.test.dunit.IgnoredException; +import org.apache.geode.test.junit.categories.DistributedTest; + +@Category(DistributedTest.class) +public class DestroyGatewaySenderCommandDUnitTest extends WANCommandTestBase { + @Test + public void testDestroyGatewaySender_NotCreatedSender() { + Integer punePort = vm1.invoke(() -> createFirstLocatorWithDSId(1)); + propsSetUp(punePort); + + vm2.invoke(() -> createFirstRemoteLocator(2, punePort)); + vm3.invoke(() -> createCache(punePort)); + vm4.invoke(() -> createCache(punePort)); + vm5.invoke(() -> createCache(punePort)); + + // Test Destroy Command + String command = + CliStrings.DESTROY_GATEWAYSENDER + " --" + CliStrings.DESTROY_GATEWAYSENDER__ID + "=ln"; + CommandResult cmdResult = executeCommandWithIgnoredExceptions(command); + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info( + "testDestroyGatewaySender_NotCreatedSender stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + TabularResultData resultData = (TabularResultData) cmdResult.getResultData(); + List<String> status = resultData.retrieveAllValues("Status"); + assertEquals(5, status.size()); + for (String stat : status) { + assertTrue("GatewaySender destroy should fail", stat.contains("ERROR:")); + } + } else { + fail("testCreateDestroyParallelGatewaySender failed as did not get CommandResult"); + } + } + + private CommandResult executeCommandWithIgnoredExceptions(String command) { + final IgnoredException ignored = IgnoredException.addIgnoredException("Could not connect"); + try { + return executeCommand(command); + } finally { + ignored.remove(); + } + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/e1befef4/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/PauseGatewaySenderCommandDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/PauseGatewaySenderCommandDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/PauseGatewaySenderCommandDUnitTest.java new file mode 100644 index 0000000..c97b0ae --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/PauseGatewaySenderCommandDUnitTest.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.wan.wancommand; + +import static org.apache.geode.test.dunit.Assert.assertEquals; +import static org.apache.geode.test.dunit.Assert.assertFalse; +import static org.apache.geode.test.dunit.Assert.assertTrue; +import static org.apache.geode.test.dunit.Assert.fail; +import static org.apache.geode.test.dunit.LogWriterUtils.getLogWriter; +import static org.apache.geode.test.dunit.Wait.pause; + +import java.util.List; + +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.distributed.DistributedMember; +import org.apache.geode.management.cli.Result; +import org.apache.geode.management.internal.cli.i18n.CliStrings; +import org.apache.geode.management.internal.cli.result.CommandResult; +import org.apache.geode.management.internal.cli.result.TabularResultData; +import org.apache.geode.test.junit.categories.DistributedTest; + +@Category(DistributedTest.class) +public class PauseGatewaySenderCommandDUnitTest extends WANCommandTestBase { + @Test + public void testPauseGatewaySender_ErrorConditions() { + Integer punePort = vm1.invoke(() -> createFirstLocatorWithDSId(1)); + propsSetUp(punePort); + vm2.invoke(() -> createFirstRemoteLocator(2, punePort)); + + vm3.invoke(() -> createCache(punePort)); + vm3.invoke(() -> createSender("ln", 2, false, 100, 400, false, false, null, true)); + + final DistributedMember vm1Member = vm3.invoke(this::getMember); + String command = CliStrings.PAUSE_GATEWAYSENDER + " --" + CliStrings.PAUSE_GATEWAYSENDER__ID + + "=ln --" + CliStrings.MEMBER + "=" + vm1Member.getId() + " --" + CliStrings.GROUP + + "=SenderGroup1"; + CommandResult cmdResult = executeCommand(command); + + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info("testPauseGatewaySender stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.ERROR, cmdResult.getStatus()); + assertTrue(strCmdResult.contains(CliStrings.PROVIDE_EITHER_MEMBER_OR_GROUP_MESSAGE)); + } else { + fail("testPauseGatewaySender failed as did not get CommandResult"); + } + } + + /** + * test to validate that the start gateway sender starts the gateway sender on a member + */ + @Test + public void testPauseGatewaySender_onMember() { + Integer punePort = vm1.invoke(() -> createFirstLocatorWithDSId(1)); + propsSetUp(punePort); + vm2.invoke(() -> createFirstRemoteLocator(2, punePort)); + + vm3.invoke(() -> createCache(punePort)); + vm3.invoke(() -> createSender("ln", 2, false, 100, 400, false, false, null, true)); + vm3.invoke(() -> startSender("ln")); + vm3.invoke(() -> verifySenderState("ln", true, false)); + + final DistributedMember vm1Member = vm3.invoke(this::getMember); + pause(10000); + String command = CliStrings.PAUSE_GATEWAYSENDER + " --" + CliStrings.PAUSE_GATEWAYSENDER__ID + + "=ln --" + CliStrings.MEMBER + "=" + vm1Member.getId(); + CommandResult cmdResult = executeCommand(command); + + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info("testPauseGatewaySender stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + assertTrue(strCmdResult.contains("is paused on member")); + } else { + fail("testPauseGatewaySender failed as did not get CommandResult"); + } + vm3.invoke(() -> verifySenderState("ln", true, true)); + } + + @Test + public void testPauseGatewaySender() { + Integer punePort = vm1.invoke(() -> createFirstLocatorWithDSId(1)); + propsSetUp(punePort); + vm2.invoke(() -> createFirstRemoteLocator(2, punePort)); + + vm3.invoke(() -> createCache(punePort)); + vm3.invoke(() -> createSender("ln", 2, false, 100, 400, false, false, null, true)); + vm4.invoke(() -> createCache(punePort)); + vm4.invoke(() -> createSender("ln", 2, false, 100, 400, false, false, null, true)); + vm5.invoke(() -> createCache(punePort)); + vm5.invoke(() -> createSender("ln", 2, false, 100, 400, false, false, null, true)); + vm3.invoke(() -> startSender("ln")); + vm4.invoke(() -> startSender("ln")); + vm5.invoke(() -> startSender("ln")); + vm3.invoke(() -> verifySenderState("ln", true, false)); + vm4.invoke(() -> verifySenderState("ln", true, false)); + vm5.invoke(() -> verifySenderState("ln", true, false)); + + pause(10000); + String command = + CliStrings.PAUSE_GATEWAYSENDER + " --" + CliStrings.PAUSE_GATEWAYSENDER__ID + "=ln"; + CommandResult cmdResult = executeCommand(command); + + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info("testPauseGatewaySender stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + TabularResultData resultData = (TabularResultData) cmdResult.getResultData(); + List<String> status = resultData.retrieveAllValues("Result"); + assertEquals(5, status.size()); + assertTrue(status.contains("Error")); + assertTrue(status.contains("OK")); + } else { + fail("testPauseGatewaySender failed as did not get CommandResult"); + } + vm3.invoke(() -> verifySenderState("ln", true, true)); + vm4.invoke(() -> verifySenderState("ln", true, true)); + vm5.invoke(() -> verifySenderState("ln", true, true)); + } + + /** + * test to validate that the start gateway sender starts the gateway sender on a group of members + */ + @Test + public void testPauseGatewaySender_Group() { + Integer punePort = vm1.invoke(() -> createFirstLocatorWithDSId(1)); + propsSetUp(punePort); + vm2.invoke(() -> createFirstRemoteLocator(2, punePort)); + + vm3.invoke(() -> createCacheWithGroups(punePort, "SenderGroup1")); + vm3.invoke(() -> createSender("ln", 2, false, 100, 400, false, false, null, true)); + vm4.invoke(() -> createCacheWithGroups(punePort, "SenderGroup1")); + vm4.invoke(() -> createSender("ln", 2, false, 100, 400, false, false, null, true)); + vm5.invoke(() -> createCacheWithGroups(punePort, "SenderGroup1")); + vm5.invoke(() -> createSender("ln", 2, false, 100, 400, false, false, null, true)); + vm3.invoke(() -> startSender("ln")); + vm4.invoke(() -> startSender("ln")); + vm5.invoke(() -> startSender("ln")); + vm3.invoke(() -> verifySenderState("ln", true, false)); + vm4.invoke(() -> verifySenderState("ln", true, false)); + vm5.invoke(() -> verifySenderState("ln", true, false)); + + pause(10000); + String command = CliStrings.PAUSE_GATEWAYSENDER + " --" + CliStrings.PAUSE_GATEWAYSENDER__ID + + "=ln --" + CliStrings.GROUP + "=SenderGroup1"; + CommandResult cmdResult = executeCommand(command); + + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info("testPauseGatewaySender_Group stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + TabularResultData resultData = (TabularResultData) cmdResult.getResultData(); + List<String> status = resultData.retrieveAllValues("Result"); + assertEquals(3, status.size()); + assertFalse(status.contains("Error")); + assertTrue(status.contains("OK")); + } else { + fail("testPauseGatewaySender failed as did not get CommandResult"); + } + vm3.invoke(() -> verifySenderState("ln", true, true)); + vm4.invoke(() -> verifySenderState("ln", true, true)); + vm5.invoke(() -> verifySenderState("ln", true, true)); + } + + /** + * Test to validate the scenario gateway sender is started when one or more sender members belongs + * to multiple groups + */ + @Test + public void testPauseGatewaySender_MultipleGroup() { + Integer punePort = vm1.invoke(() -> createFirstLocatorWithDSId(1)); + propsSetUp(punePort); + vm2.invoke(() -> createFirstRemoteLocator(2, punePort)); + + vm3.invoke(() -> createCacheWithGroups(punePort, "SenderGroup1")); + vm3.invoke(() -> createSender("ln", 2, false, 100, 400, false, false, null, true)); + vm4.invoke(() -> createCacheWithGroups(punePort, "SenderGroup1")); + vm4.invoke(() -> createSender("ln", 2, false, 100, 400, false, false, null, true)); + vm5.invoke(() -> createCacheWithGroups(punePort, "SenderGroup1, SenderGroup2")); + vm5.invoke(() -> createSender("ln", 2, false, 100, 400, false, false, null, true)); + vm6.invoke(() -> createCacheWithGroups(punePort, "SenderGroup2")); + vm6.invoke(() -> createSender("ln", 2, false, 100, 400, false, false, null, true)); + vm7.invoke(() -> createCacheWithGroups(punePort, "SenderGroup3")); + vm7.invoke(() -> createSender("ln", 2, false, 100, 400, false, false, null, true)); + vm3.invoke(() -> startSender("ln")); + vm4.invoke(() -> startSender("ln")); + vm5.invoke(() -> startSender("ln")); + vm6.invoke(() -> startSender("ln")); + vm7.invoke(() -> startSender("ln")); + vm3.invoke(() -> verifySenderState("ln", true, false)); + vm4.invoke(() -> verifySenderState("ln", true, false)); + vm5.invoke(() -> verifySenderState("ln", true, false)); + vm6.invoke(() -> verifySenderState("ln", true, false)); + vm7.invoke(() -> verifySenderState("ln", true, false)); + + pause(10000); + String command = CliStrings.PAUSE_GATEWAYSENDER + " --" + CliStrings.PAUSE_GATEWAYSENDER__ID + + "=ln --" + CliStrings.GROUP + "=SenderGroup1,SenderGroup2"; + CommandResult cmdResult = executeCommand(command); + + if (cmdResult != null) { + String strCmdResult = commandResultToString(cmdResult); + getLogWriter().info("testPauseGatewaySender_Group stringResult : " + strCmdResult + ">>>>"); + assertEquals(Result.Status.OK, cmdResult.getStatus()); + TabularResultData resultData = (TabularResultData) cmdResult.getResultData(); + List<String> status = resultData.retrieveAllValues("Result"); + assertEquals(4, status.size()); + assertFalse(status.contains("Error")); + assertTrue(status.contains("OK")); + } else { + fail("testPauseGatewaySender failed as did not get CommandResult"); + } + vm3.invoke(() -> verifySenderState("ln", true, true)); + vm4.invoke(() -> verifySenderState("ln", true, true)); + vm5.invoke(() -> verifySenderState("ln", true, true)); + vm6.invoke(() -> verifySenderState("ln", true, true)); + vm7.invoke(() -> verifySenderState("ln", true, false)); + } +}
