http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPropagationDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPropagationDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPropagationDUnitTest.java new file mode 100644 index 0000000..3ae9a01 --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPropagationDUnitTest.java @@ -0,0 +1,1063 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.misc; + +import org.junit.Ignore; +import org.junit.experimental.categories.Category; +import org.junit.Test; + +import static org.junit.Assert.*; + +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +import com.gemstone.gemfire.cache.CacheClosedException; +import com.gemstone.gemfire.cache.DataPolicy; +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.cache.Scope; +import com.gemstone.gemfire.internal.cache.ForceReattemptException; +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; +import com.gemstone.gemfire.test.dunit.Assert; +import com.gemstone.gemfire.test.dunit.AsyncInvocation; +import com.gemstone.gemfire.test.dunit.IgnoredException; +import com.gemstone.gemfire.test.dunit.LogWriterUtils; +import com.gemstone.gemfire.test.dunit.Wait; + +@Category(DistributedTest.class) +public class ReplicatedRegion_ParallelWANPropagationDUnitTest extends WANTestBase{ + + public ReplicatedRegion_ParallelWANPropagationDUnitTest() { + super(); + // TODO Auto-generated constructor stub + } + + final String expectedExceptions = null; + + + /** + * + */ + private static final long serialVersionUID = 1L; + + @Test + public void test_DR_PGS_1Nodes_Put_Receiver() throws Exception { + try { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2); + vm2.invoke(() -> WANTestBase.createReceiver()); + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + createCacheInVMs(lnPort, vm4); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln1", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.createSender( "ln1", 2, + true, 10, 100, false, false, null, true )); + + vm4.invoke(() -> WANTestBase.startSender( "ln1" )); + fail("Expected GatewaySenderConfigException where parallel gateway sender can not be used with replicated region"); + } + catch (Exception e) { + if (!e.getCause().getMessage() + .contains("can not be used with replicated region")) { + fail("Expected GatewaySenderConfigException where parallel gateway sender can not be used with replicated region"); + } + } + } + + /*1. Validate that parallelGatewaySenderId can be added to distributed region + *Region distributed ack/noack + PGS + *1. Find out the restrictions on totalNumBuckets on shadowPR + *2. Find out the restrictions on redundancy on shadowPR + *3. Find out the restrictions on localMaxMemory on shadowPR + *4. Find out the best way user will specify PR attributes to PGS + *5. Find out the restrictions on ordering. + *6. put on region populates the queue + *7. put on region reaches to remote site. Dispatcher works as expected + *8. m1 and m2 has DR(ack/noack). put on DR from m1 populates queue on both m1 and m2. Validate that remote site got all the events + *9. m1 and m2 has DR(ack/noack). create/put/destroy/operations populates the queue. Validate that remote site got correct events + *10. m1 and m2 has DR(ack/noack). localDestroy is called on m1's DR. This locally destroys M1's shadowPr + *11. m1 and m2 has DR(ack/noack). destroy is called on m1's DR. This destroys entire shadowPr on m1 and m2 + *12. m1 and m2 has DR(ack/noack). close Region is called on m1's DR. This locally destroys shadowPr on m1 + *13. m1 and m2 has DR(ack/noack). cache.close on m1'. This locally destroys shadowPr on m1 + *14. Validate HA scenario does not cause any event loss + *15. PDX events of DR are propagated to remote sites + *16. validate stats + *17: PR and DR regions with same name.. Can this be created. If yes then how to differentiate these 2 different shadowPR. + *18. test for redundancy. FOR SPR's redundancy will be equal to the number of nodes where DR is present. Max is 3. I know this needs to be figure it out at runtime. + *19. test without providing diskStoreName..I suspect some problem with this code. diskStoreName=null looks like this is not handled very well. need to verify + *20. ParallelGatewaySenderQueue#addPR method has multiple check for inPersistenceEnabled. Can's we do it with only one check. + */ + + /** + * Test to validate that created parallel gatewaySenders id can be added to + * distributed region + * Below test is disabled intentionally + 1> In this release 8.0, for rolling upgrade support queue name is changed to old style + 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about + ParallelGatewaySenderQueue#convertPathToName + 3> We have to enabled it in next release + 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 + and version prior to 8.0 + */ + @Ignore + @Test + public void test_PGS_Started_DR_CREATED_NO_RECEIVER() throws Exception { + try { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + vm4.invoke(() -> WANTestBase.createCache( lnPort )); +/* ExpectedException exp1 = addExpectedException(GatewaySenderException.class + .getName(), vm4); + ExpectedException exp2 = addExpectedException(InterruptedException.class + .getName(), vm4); + try { +*/ vm4.invoke(() -> WANTestBase.createSender( "ln1", 2, + true, 10, 100, false, false, null, false )); + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln1", isOffHeap() )); + vm4.invoke(() -> WANTestBase.doPuts( + getTestMethodName() + "_RR", 1000 )); + vm4.invoke(() -> WANTestBase.validateQueueContents( + "ln1", 1000 )); + +/* } + finally { + exp1.remove(); + exp2.remove(); + } +*/ } + catch (Exception e) { + Assert.fail("Unexpected exception", e); + } + } + + /** + * Test to validate that distributed region with given parallelGatewaySender id + * is created first and then a same parallelGatewaySender is created + * a single put in DR is enqueued in parallelQueue + * Below test is disabled intentionally + 1> In this release 8.0, for rolling upgrade support queue name is changed to old style + 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about + ParallelGatewaySenderQueue#convertPathToName + 3> We have to enabled it in next release + 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 + and version prior to 8.0 + */ + @Ignore + @Test + public void test_DR_CREATED_PGS_STARTED_NO_RECEIVER() throws Exception { + try { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + vm4.invoke(() -> WANTestBase.createCache( lnPort )); + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln1", isOffHeap() )); +/* ExpectedException exp1 = addExpectedException(GatewaySenderException.class + .getName(), vm4); + ExpectedException exp2 = addExpectedException(InterruptedException.class + .getName(), vm4); + try {*/ + vm4.invoke(() -> WANTestBase.createSender( "ln1", 2, + true, 10, 100, false, false, null, false )); + vm4.invoke(() -> WANTestBase.doPuts( + getTestMethodName() + "_RR", 1000 )); + vm4.invoke(() -> WANTestBase.validateQueueContents( + "ln1", 1000 )); +/* } + finally { + exp1.remove(); + exp2.remove(); + } +*/ } + catch (Exception e) { + Assert.fail("Unexpected exception", e); + } + } + + /**Below test is disabled intentionally + 1> In this release 8.0, for rolling upgrade support queue name is changed to old style + 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about + ParallelGatewaySenderQueue#convertPathToName + 3> We have to enabled it in next release + 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 + and version prior to 8.0*/ + @Ignore + @Test + public void test_DR_PGS_1Node_Put_ValidateQueue_No_Receiver() throws Exception { + try { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + vm4.invoke(() -> WANTestBase.createCache( lnPort )); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln1", isOffHeap() )); + +/* ExpectedException exp1 = addExpectedException(GatewaySenderException.class + .getName(), vm4); + ExpectedException exp2 = addExpectedException(InterruptedException.class + .getName(), vm4); + try {*/ + vm4.invoke(() -> WANTestBase.createSender( "ln1", 2, + true, 10, 100, false, false, null, true )); + vm4.invoke(() -> WANTestBase.startSender( "ln1" )); + + vm4.invoke(() -> WANTestBase.doPuts( + getTestMethodName() + "_RR", 10000 )); + + vm4.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 10000 )); + vm4.invoke(() -> WANTestBase.validateQueueContents( + "ln1", 10000 )); +/* } + finally { + exp1.remove(); + exp2.remove(); + } + */ + } + catch (Exception e) { + Assert.fail("Unexpected exception", e); + } + } + + /**Below test is disabled intentionally + 1> In this release 8.0, for rolling upgrade support queue name is changed to old style + 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about + ParallelGatewaySenderQueue#convertPathToName + 3> We have to enabled it in next release + 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 + and version prior to 8.0*/ + @Ignore + @Test + public void test_DR_PGS_2Nodes_Put_ValidateQueue_No_Receiver() throws Exception { + try { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + vm4.invoke(() -> WANTestBase.createCache( lnPort )); + vm5.invoke(() -> WANTestBase.createCache( lnPort )); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln1", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln1", isOffHeap() )); + +/* ExpectedException exp1 = addExpectedException( + GatewaySenderException.class.getName()); + ExpectedException exp2 = addExpectedException( + InterruptedException.class.getName()); + ExpectedException exp3 = addExpectedException( + CacheClosedException.class.getName()); + try { +*/ vm4.invoke(() -> WANTestBase.createSender( "ln1", 2, + true, 10, 100, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln1", 2, + true, 10, 100, false, false, null, true )); + + startSenderInVMs("ln1", vm4, vm5); + + vm4.invoke(() -> WANTestBase.doPuts( + getTestMethodName() + "_RR", 1000 )); + + vm4.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + vm5.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + + vm4.invoke(() -> WANTestBase.validateQueueContents( + "ln1", 1000 )); + vm5.invoke(() -> WANTestBase.validateQueueContents( + "ln1", 1000 )); + +/* } + finally { + exp1.remove(); + exp2.remove(); + exp3.remove(); + } +*/ + } + catch (Exception e) { + Assert.fail("Unexpected exception", e); + } + } + +// public void test_DR_PGS_ORDERPOLICY_PARTITION_EXPECTException(){ +// +// } +// public void test_DR_PGS_DISKSTORE_NAME_PROVIDED_VALIDATE_DISK(){ +// +// } +// public void test_DR_PGS_DISKSTORE_NAME_NOT_PROVIDED_VALIDATE_DISK(){ +// +// } +// +// public void test_DR_PGS_START_STOP_START(){ +// +// } +// +// public void test_DR_PGS_PERSISTENCE_START_STOP_START(){ +// +// } +// +// public void test_DR_PGS_START_PAUSE_STOP(){ +// +// } +// +// public void test_DR_PGS_START_PAUSE_RESUME_VALIDATE_RECEIVER(){ +// +// } + + /**Below test is disabled intentionally + 1> In this release 8.0, for rolling upgrade support queue name is changed to old style + 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about + ParallelGatewaySenderQueue#convertPathToName + 3> We have to enabled it in next release + 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 + and version prior to 8.0*/ + @Ignore + @Test + public void test_DR_PGS_1Nodes_Put_Receiver_2() throws Exception { + try { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2); + vm2.invoke(() -> WANTestBase.createReceiver()); + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + createCacheInVMs(lnPort, vm4); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln1", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.createSender( "ln1", 2, + true, 10, 100, false, false, null, true)); + + vm4.invoke(() -> WANTestBase.startSender( "ln1")); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000)); + + vm4.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR", + 1000)); + + vm4.invoke(() -> WANTestBase.validateQueueContents( "ln1", + 0 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000)); + } + catch (Exception e) { + Assert.fail("Unexpected exception", e); + } + } + + /**Below test is disabled intentionally + 1> In this release 8.0, for rolling upgrade support queue name is changed to old style + 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about + ParallelGatewaySenderQueue#convertPathToName + 3> We have to enabled it in next release + 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 + and version prior to 8.0*/ + @Ignore + @Test + public void test_DR_PGS_2Nodes_Put_Receiver() throws Exception { + try { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2); + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm2.invoke(() -> WANTestBase.createReceiver()); + + createCacheInVMs(lnPort, vm4, vm5); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln1", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln1", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.createSender( "ln1", 2, + true, 10, 100, false, false, null, true)); + vm5.invoke(() -> WANTestBase.createSender( "ln1", 2, + true, 10, 100, false, false, null, true)); + + startSenderInVMs("ln1", vm4, vm5); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + + vm4.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR", + 1000 )); + vm5.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR", + 1000 )); + + vm4.invoke(() -> WANTestBase.validateQueueContents( "ln1", + 0 )); + vm5.invoke(() -> WANTestBase.validateQueueContents( "ln1", + 0 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + } + catch (Exception e) { + Assert.fail("Unexpected exception", e); + } + } + + /**Below test is disabled intentionally + 1> In this release 8.0, for rolling upgrade support queue name is changed to old style + 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about + ParallelGatewaySenderQueue#convertPathToName + 3> We have to enabled it in next release + 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 + and version prior to 8.0*/ + @Ignore + @Test + public void test_DR_PGS_2Nodes_EMPTY_Put_Receiver() throws Exception { + try { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + vm2.invoke(() -> WANTestBase.createCache( nyPort )); + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm2.invoke(() -> WANTestBase.createReceiver()); + + vm4.invoke(() -> WANTestBase.createCache( lnPort )); + vm5.invoke(() -> WANTestBase.createCache( lnPort )); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln1", Scope.DISTRIBUTED_ACK, DataPolicy.EMPTY, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln1", Scope.DISTRIBUTED_ACK, DataPolicy.REPLICATE, isOffHeap() )); + + vm4.invoke(() -> WANTestBase.createSender( "ln1", 2, + true, 10, 100, false, false, null, true)); + vm5.invoke(() -> WANTestBase.createSender( "ln1", 2, + true, 10, 100, false, false, null, true)); + + startSenderInVMs("ln1", vm4, vm5); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + +// vm4.invoke(() -> WANTestBase.validateRegionSize( testName + "_RR", +// 1000 )); + vm5.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR", + 1000 )); + + vm4.invoke(() -> WANTestBase.validateQueueContents( "ln1", + 0 )); + vm5.invoke(() -> WANTestBase.validateQueueContents( "ln1", + 0 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + } + catch (Exception e) { + Assert.fail("Unexpected exception", e); + } + } + + /**Below test is disabled intentionally + 1> In this release 8.0, for rolling upgrade support queue name is changed to old style + 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about + ParallelGatewaySenderQueue#convertPathToName + 3> We have to enabled it in next release + 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 + and version prior to 8.0*/ + @Ignore + @Test + public void test_DR_PR_PGS_4Nodes_Put_Receiver_2Nodes() throws Exception { + try { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + vm2.invoke(() -> WANTestBase.createReceiver()); + vm3.invoke(() -> WANTestBase.createReceiver()); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 10, 100, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 10, 100, false, false, null, true )); + vm6.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 10, 100, false, false, null, true )); + vm7.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 10, 100, false, false, null, true )); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + vm5.invoke(() -> WANTestBase.doNextPuts( getTestMethodName() + "_PR", + 1000, 2000 )); + + vm4.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_PR", + 1000 )); + vm5.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR", + 1000 )); + + vm4.invoke(() -> WANTestBase.validateQueueContents( "ln", + 0 )); + vm5.invoke(() -> WANTestBase.validateQueueContents( "ln", + 0 )); + +/* ExpectedException exp1 = addExpectedException(CacheClosedException.class + .getName()); + try {*/ + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 1000 )); +/* } + finally { + exp1.remove(); + } +*/ } + catch (Exception e) { + Assert.fail("Unexpected exception", e); + } + } + + /**Below test is disabled intentionally + 1> In this release 8.0, for rolling upgrade support queue name is changed to old style + 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about + ParallelGatewaySenderQueue#convertPathToName + 3> We have to enabled it in next release + 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 + and version prior to 8.0*/ + @Ignore + @Test + public void test_DR_PGS_NOMANUALSTART_4Nodes_Put_ValidateReceiver() throws Exception { + try { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2); + vm2.invoke(() -> WANTestBase.createReceiver()); + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln1", 2, + true, 10, 100, false, false, null, false )); + vm5.invoke(() -> WANTestBase.createSender( "ln1", 2, + true, 10, 100, false, false, null, false )); + vm6.invoke(() -> WANTestBase.createSender( "ln1", 2, + true, 10, 100, false, false, null, false )); + vm7.invoke(() -> WANTestBase.createSender( "ln1", 2, + true, 10, 100, false, false, null, false )); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln1", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln1", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln1", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln1", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + + vm4.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR", + 1000 )); + vm5.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR", + 1000 )); + vm6.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR", + 1000 )); + vm7.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR", + 1000 )); + + + vm4.invoke(() -> WANTestBase.validateQueueContents( "ln1", + 0 )); + vm5.invoke(() -> WANTestBase.validateQueueContents( "ln1", + 0 )); + vm6.invoke(() -> WANTestBase.validateQueueContents( "ln1", + 0 )); + vm7.invoke(() -> WANTestBase.validateQueueContents( "ln1", + 0 )); + +/* ExpectedException exp1 = addExpectedException(CacheClosedException.class + .getName()); + try {*/ + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); +/* } + finally { + exp1.remove(); + }*/ + } + catch (Exception e) { + Assert.fail("Unexpected exception", e); + } + } + + /**Below test is disabled intentionally + 1> In this release 8.0, for rolling upgrade support queue name is changed to old style + 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about + ParallelGatewaySenderQueue#convertPathToName + 3> We have to enabled it in next release + 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 + and version prior to 8.0*/ + @Ignore + @Test + public void test_DR_PGS_4Nodes_Put_CLOSE4NODESCACHE_RECREATE_PUT_ValidateReceiver() + throws Exception { + try { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm6.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm7.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + // before doing any puts, let the senders be running in order to ensure + // that + // not a single event will be lost + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + +/* ExpectedException exp1 = addExpectedException(CacheClosedException.class + .getName()); + try {*/ + vm4.invoke(() -> WANTestBase.killSender()); + vm5.invoke(() -> WANTestBase.killSender()); + vm6.invoke(() -> WANTestBase.killSender()); + vm7.invoke(() -> WANTestBase.killSender()); +/* } + finally { + exp1.remove(); + }*/ + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm6.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm7.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + // ------------------------------------------------------------------------------------ + + vm4.invoke(() -> WANTestBase.doNextPuts( + getTestMethodName() + "_RR", 1000, 2000 )); + + // verify all buckets drained on all sender nodes. + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" )); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" )); + vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" )); + vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" )); + +/* exp1 = addExpectedException(CacheClosedException.class.getName()); + try {*/ + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 2000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 2000 )); +/* } + finally { + exp1.remove(); + }*/ + } + catch (Exception e) { + Assert.fail("Unexpected exception", e); + } + + } + + /**Below test is disabled intentionally + 1> In this release 8.0, for rolling upgrade support queue name is changed to old style + 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about + ParallelGatewaySenderQueue#convertPathToName + 3> We have to enabled it in next release + 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 + and version prior to 8.0*/ + @Ignore + @Test + public void test_DR_NO_ACK_PGS_2Nodes_Put_ValidateQueue_Receiver() throws Exception { + try { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2); + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm2.invoke(() -> WANTestBase.createReceiver()); + + createCacheInVMs(lnPort, vm4, vm5); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( getTestMethodName() + "_RR", "ln1", + Scope.DISTRIBUTED_NO_ACK, DataPolicy.REPLICATE, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( getTestMethodName() + "_RR", "ln1", + Scope.DISTRIBUTED_NO_ACK, DataPolicy.REPLICATE, isOffHeap() )); + + vm4.invoke(() -> WANTestBase.createSender( "ln1", 2, + true, 10, 100, false, false, null, true)); + vm5.invoke(() -> WANTestBase.createSender( "ln1", 2, + true, 10, 100, false, false, null, true)); + + startSenderInVMs("ln1", vm4, vm5); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + + vm4.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR", + 1000 )); + vm5.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR", + 1000 )); + + vm4.invoke(() -> WANTestBase.validateQueueContents( "ln1", + 0 )); + vm5.invoke(() -> WANTestBase.validateQueueContents( "ln1", + 0 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + } + catch (Exception e) { + Assert.fail("Unexpected exception", e); + } + } + + /**Below test is disabled intentionally + 1> In this release 8.0, for rolling upgrade support queue name is changed to old style + 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about + ParallelGatewaySenderQueue#convertPathToName + 3> We have to enabled it in next release + 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 + and version prior to 8.0*/ + @Ignore + @Test + public void test_DR_PGS_2NODES_1NODESDOWN_Validate_Receiver() throws Exception { + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, true, + 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, true, + 100, 10, false, false, null, true )); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + + Thread.sleep(60000);; + +/* ExpectedException exp1 = addExpectedException(CacheClosedException.class + .getName()); + try {*/ + AsyncInvocation inv1 = vm4.invokeAsync(() -> ReplicatedRegion_ParallelWANPropagationDUnitTest.doPuts0( + getTestMethodName() + "_RR", 1000 )); + Wait.pause(1000); + AsyncInvocation inv2 = vm5.invokeAsync(() -> WANTestBase.killSender()); + try { + inv1.join(); + inv2.join(); + } + catch (Exception e) { + Assert.fail("UnExpected Exception", e); + } +/* } + finally { + exp1.remove(); + }*/ + + Integer size = (Integer)vm4.invoke(() -> WANTestBase.getQueueContentSize( "ln" )); + LogWriterUtils.getLogWriter().info("The size of the queue is in vm4 " + size); + + + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" )); + + size = (Integer)vm4.invoke(() -> WANTestBase.getQueueContentSize( "ln" )); + LogWriterUtils.getLogWriter().info("The size of the queue is in vm4 " + size); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + } + + /**Below test is disabled intentionally + 1> In this release 8.0, for rolling upgrade support queue name is changed to old style + 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about + ParallelGatewaySenderQueue#convertPathToName + 3> We have to enabled it in next release + 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 + and version prior to 8.0*/ + @Ignore + @Test + public void test_DR_PGS_4NODES_2NODESDOWN_Validate_Receiver() throws Exception { + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, true, + 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, true, + 100, 10, false, false, null, true )); + vm6.invoke(() -> WANTestBase.createSender( "ln", 2, true, + 100, 10, false, false, null, true )); + vm7.invoke(() -> WANTestBase.createSender( "ln", 2, true, + 100, 10, false, false, null, true )); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + + Thread.sleep(60000); +/* ExpectedException exp1 = addExpectedException(CacheClosedException.class + .getName()); + try */{ + AsyncInvocation inv1 = vm7.invokeAsync(() -> ReplicatedRegion_ParallelWANPropagationDUnitTest.doPuts0( + getTestMethodName() + "_RR", 10000 )); + Thread.sleep(1000); + AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender()); + Thread.sleep(2000); + AsyncInvocation inv3 = vm6.invokeAsync(() -> ReplicatedRegion_ParallelWANPropagationDUnitTest.doPuts1( + getTestMethodName() + "_RR", 10000 )); + Thread.sleep(1500); + AsyncInvocation inv4 = vm5.invokeAsync(() -> WANTestBase.killSender()); + try { + inv1.join(); + inv2.join(); + inv3.join(); + inv4.join(); + } + catch (Exception e) { + Assert.fail("UnExpected Exception", e); + } + }/* + finally { + exp1.remove(); + }*/ + + vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" )); + vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 10000 )); + + } + + public static void doPuts0(String regionName, int numPuts) { + IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class + .getName()); + IgnoredException exp1 = IgnoredException.addIgnoredException(CacheClosedException.class + .getName()); + try { + + Region r = cache.getRegion(Region.SEPARATOR + regionName); + assertNotNull(r); + for (long i = 0; i < numPuts; i++) { + LogWriterUtils.getLogWriter().info("Put : key : " + i); + r.put(i, "0_" + i); + } + } finally { + exp.remove(); + exp1.remove(); + } + } + + public static void doPuts1(String regionName, int numPuts){ + IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class + .getName()); + IgnoredException exp1 = IgnoredException.addIgnoredException(CacheClosedException.class + .getName()); + try { + + Region r = cache.getRegion(Region.SEPARATOR + regionName); + assertNotNull(r); + for (long i = 0; i < numPuts; i++) { + LogWriterUtils.getLogWriter().info("Put : key : " + i); + r.put(i, "1_" + i); + } + } finally { + exp.remove(); + exp1.remove(); + } + } + + public static void doPuts2(String regionName, int numPuts){ + IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class + .getName()); + IgnoredException exp1 = IgnoredException.addIgnoredException(CacheClosedException.class + .getName()); + try { + Region r = cache.getRegion(Region.SEPARATOR + regionName); + assertNotNull(r); + for (long i = 0; i < numPuts; i++) { + LogWriterUtils.getLogWriter().info("Put : key : " + i); + r.put(i, "2_" + i); + } + } finally { + exp.remove(); + exp1.remove(); + } + } + + /** + * Test to validate that put on DR with no ack on multiple nodes are propagated to parallelQueue on multiple nodes + */ + + /** + * Test to validate that the single put in DR is propagated to remote site through parallelGatewaySender + */ + + +}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/SenderWithTransportFilterDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/SenderWithTransportFilterDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/SenderWithTransportFilterDUnitTest.java new file mode 100644 index 0000000..0ee78c5 --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/SenderWithTransportFilterDUnitTest.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.misc; + +import static com.gemstone.gemfire.distributed.ConfigurationProperties.*; +import static com.gemstone.gemfire.test.dunit.Assert.*; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Properties; +import java.util.zip.Adler32; +import java.util.zip.CheckedInputStream; +import java.util.zip.CheckedOutputStream; + +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.gemstone.gemfire.cache.CacheFactory; +import com.gemstone.gemfire.cache.DiskStore; +import com.gemstone.gemfire.cache.DiskStoreFactory; +import com.gemstone.gemfire.cache.wan.GatewayReceiver; +import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory; +import com.gemstone.gemfire.cache.wan.GatewaySenderFactory; +import com.gemstone.gemfire.cache.wan.GatewayTransportFilter; +import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; +import com.gemstone.gemfire.internal.AvailablePortHelper; +import com.gemstone.gemfire.internal.cache.wan.InternalGatewaySenderFactory; +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; +import com.gemstone.gemfire.test.dunit.VM; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +@Category(DistributedTest.class) +public class SenderWithTransportFilterDUnitTest extends WANTestBase { + + @Test + public void testSerialSenderWithTransportFilter() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + vm2.invoke(() -> SenderWithTransportFilterDUnitTest.createReceiverWithTransportFilters( nyPort )); + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + vm3.invoke(() -> WANTestBase.createCache( lnPort )); + + vm3.invoke(() -> SenderWithTransportFilterDUnitTest.createSenderWithTransportFilter( "ln", 2, false, 100, + 1, false, false, true )); + + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + vm3.invoke(() -> WANTestBase.startSender( "ln" )); + + vm3.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", 100 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 100 )); + } + + @Test + public void testParallelSenderWithTransportFilter() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + vm2.invoke(() -> SenderWithTransportFilterDUnitTest.createReceiverWithTransportFilters( nyPort )); + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 0, 10, isOffHeap() )); + + vm3.invoke(() -> WANTestBase.createCache( lnPort )); + + vm3.invoke(() -> SenderWithTransportFilterDUnitTest.createSenderWithTransportFilter( "ln", 2, true, 100, + 1, false, false, true )); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 0, 10, isOffHeap() )); + + vm3.invoke(() -> WANTestBase.startSender( "ln" )); + + vm3.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 100 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 100 )); + } + + public static int createReceiverWithTransportFilters(int locPort) { + WANTestBase test = new WANTestBase(); + Properties props = test.getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, "localhost[" + locPort + + "]"); + + InternalDistributedSystem ds = test.getSystem(props); + cache = CacheFactory.create(ds); + GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); + int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + fact.setStartPort(port); + fact.setEndPort(port); + ArrayList<GatewayTransportFilter> transportFilters = new ArrayList<GatewayTransportFilter>(); + transportFilters.add(new CheckSumTransportFilter("CheckSumTransportFilter")); + if (!transportFilters.isEmpty()) { + for (GatewayTransportFilter filter : transportFilters) { + fact.addGatewayTransportFilter(filter); + } + } + GatewayReceiver receiver = fact.create(); + try { + receiver.start(); + } + catch (IOException e) { + fail("Test " + test.getName() + " failed to start GatewayReceiver on port " + port, e); + } + return port; + } + + public static void createSenderWithTransportFilter(String dsName, + int remoteDsId, boolean isParallel, Integer maxMemory, Integer batchSize, + boolean isConflation, boolean isPersistent, boolean isManualStart) { + File persistentDirectory = new File(dsName + "_disk_" + + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); + persistentDirectory.mkdir(); + DiskStoreFactory dsf = cache.createDiskStoreFactory(); + File[] dirs1 = new File[] { persistentDirectory }; + + if (isParallel) { + GatewaySenderFactory gateway = cache + .createGatewaySenderFactory(); + gateway.setParallel(true); + gateway.setMaximumQueueMemory(maxMemory); + gateway.setBatchSize(batchSize); + ((InternalGatewaySenderFactory)gateway).setLocatorDiscoveryCallback(new MyLocatorCallback()); + ArrayList<GatewayTransportFilter> transportFilters = new ArrayList<GatewayTransportFilter>(); + transportFilters.add(new CheckSumTransportFilter("CheckSumTransportFilter")); + if (!transportFilters.isEmpty()) { + for (GatewayTransportFilter filter : transportFilters) { + gateway.addGatewayTransportFilter(filter); + } + } + if (isPersistent) { + gateway.setPersistenceEnabled(true); + gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName) + .getName()); + } + else { + DiskStore store = dsf.setDiskDirs(dirs1).create(dsName); + gateway.setDiskStoreName(store.getName()); + } + gateway.setBatchConflationEnabled(isConflation); + gateway.create(dsName, remoteDsId); + + } + else { + GatewaySenderFactory gateway = cache + .createGatewaySenderFactory(); + gateway.setMaximumQueueMemory(maxMemory); + gateway.setBatchSize(batchSize); + gateway.setManualStart(isManualStart); + ((InternalGatewaySenderFactory)gateway) + .setLocatorDiscoveryCallback(new MyLocatorCallback()); + ArrayList<GatewayTransportFilter> transportFilters = new ArrayList<GatewayTransportFilter>(); + transportFilters.add(new CheckSumTransportFilter("CheckSumTransportFilter")); + if (!transportFilters.isEmpty()) { + for (GatewayTransportFilter filter : transportFilters) { + gateway.addGatewayTransportFilter(filter); + } + } + gateway.setBatchConflationEnabled(isConflation); + if (isPersistent) { + gateway.setPersistenceEnabled(true); + gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName) + .getName()); + } + else { + DiskStore store = dsf.setDiskDirs(dirs1).create(dsName); + gateway.setDiskStoreName(store.getName()); + } + gateway.create(dsName, remoteDsId); + } + } + + static class CheckSumTransportFilter implements GatewayTransportFilter { + + Adler32 checker = new Adler32(); + + private String name; + + public CheckSumTransportFilter(String name){ + this.name = name; + } + + @Override + public String toString(){ + return this.name; + } + + @Override + public InputStream getInputStream(InputStream stream) { + return new CheckedInputStream(stream, checker); + } + + @Override + public OutputStream getOutputStream(OutputStream stream) { + return new CheckedOutputStream(stream, checker); + } + + @Override + public void close() { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/ShutdownAllPersistentGatewaySenderDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/ShutdownAllPersistentGatewaySenderDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/ShutdownAllPersistentGatewaySenderDUnitTest.java new file mode 100644 index 0000000..6e2581e --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/ShutdownAllPersistentGatewaySenderDUnitTest.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.misc; + +import static org.junit.Assert.*; + +import java.util.Set; + +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.gemstone.gemfire.admin.AdminDistributedSystemFactory; +import com.gemstone.gemfire.admin.AdminException; +import com.gemstone.gemfire.admin.DistributedSystemConfig; +import com.gemstone.gemfire.admin.internal.AdminDistributedSystemImpl; +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.internal.cache.CacheObserverAdapter; +import com.gemstone.gemfire.internal.cache.CacheObserverHolder; +import com.gemstone.gemfire.internal.cache.LocalRegion; +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; +import com.gemstone.gemfire.test.dunit.AsyncInvocation; +import com.gemstone.gemfire.test.dunit.IgnoredException; +import com.gemstone.gemfire.test.dunit.LogWriterUtils; +import com.gemstone.gemfire.test.dunit.SerializableRunnable; +import com.gemstone.gemfire.test.dunit.VM; +import com.gemstone.gemfire.test.dunit.Wait; +import com.gemstone.gemfire.test.dunit.WaitCriterion; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +@Category(DistributedTest.class) +public class ShutdownAllPersistentGatewaySenderDUnitTest extends WANTestBase { + + private static final long MAX_WAIT = 70000; + + private static final int NUM_KEYS = 1000; + + public ShutdownAllPersistentGatewaySenderDUnitTest() { + super(); + } + + @Override + protected final void postSetUpWANTestBase() throws Exception { + IgnoredException.addIgnoredException("Cache is being closed by ShutdownAll"); + } + + private static final long serialVersionUID = 1L; + + @Test + public void testGatewaySender() throws Exception { + IgnoredException.addIgnoredException("Cache is shutting down"); + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + vm2.invoke(() -> WANTestBase.createCache( nyPort )); + vm3.invoke(() -> WANTestBase.createCache( nyPort )); + vm2.invoke(() -> WANTestBase.createReceiver()); + + vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + + vm4.invoke(() -> WANTestBase.createCache( lnPort )); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 400, false, false, null, true )); + + vm4.invoke(() -> WANTestBase.startSender( "ln" )); + + vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + + // set the CacheObserver to block the ShutdownAll + SerializableRunnable waitAtShutdownAll = new SerializableRunnable() { + @Override + public void run() { + LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true; + CacheObserverHolder.setInstance(new CacheObserverAdapter() { + @Override + public void beforeShutdownAll() { + final Region region = cache.getRegion(getTestMethodName() + "_PR"); + Wait.waitForCriterion(new WaitCriterion() { + @Override + public boolean done() { + return region.size() >= 2; + } + + @Override + public String description() { + return "Wait for wan to have processed several events"; + } + }, 30000, 100, true); + } + }); + } + }; + vm2.invoke(waitAtShutdownAll); + vm3.invoke(waitAtShutdownAll); + + AsyncInvocation vm4_future = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", NUM_KEYS )); + + // ShutdownAll will be suspended at observer, so puts will continue + AsyncInvocation future = shutDownAllMembers(vm2, 2, MAX_WAIT); + future.join(MAX_WAIT); + + // now restart vm1 with gatewayHub + LogWriterUtils.getLogWriter().info("restart in VM2"); + vm2.invoke(() -> WANTestBase.createCache( nyPort )); + vm3.invoke(() -> WANTestBase.createCache( nyPort )); + AsyncInvocation vm3_future = vm3.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", + "ln", 1, 100, isOffHeap() )); + vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm3_future.join(MAX_WAIT); + + vm3.invoke(new SerializableRunnable() { + public void run() { + final Region region = cache.getRegion(getTestMethodName() + "_PR"); + cache.getLogger().info( + "vm1's region size before restart gatewayHub is " + region.size()); + } + }); + vm2.invoke(() -> WANTestBase.createReceiver()); + + // wait for vm0 to finish its work + vm4_future.join(MAX_WAIT); + vm4.invoke(new SerializableRunnable() { + public void run() { + Region region = cache.getRegion(getTestMethodName() + "_PR"); + assertEquals(NUM_KEYS, region.size()); + } + }); + + // verify the other side (vm1)'s entries received from gateway + vm2.invoke(new SerializableRunnable() { + public void run() { + final Region region = cache.getRegion(getTestMethodName() + "_PR"); + + cache.getLogger().info( + "vm1's region size after restart gatewayHub is " + region.size()); + Wait.waitForCriterion(new WaitCriterion() { + public boolean done() { + Object lastValue = region.get(NUM_KEYS - 1); + if (lastValue != null && lastValue.equals(NUM_KEYS - 1)) { + region.getCache().getLogger().info( + "Last key has arrived, its value is " + lastValue + + ", end of wait."); + return true; + } + else + return (region.size() == NUM_KEYS); + } + + public String description() { + return "Waiting for destination region to reach size: " + NUM_KEYS + + ", current is " + region.size(); + } + }, MAX_WAIT, 100, true); + assertEquals(NUM_KEYS, region.size()); + } + }); + + } + + private AsyncInvocation shutDownAllMembers(VM vm, final int expectedNumber, final long timeout) { + AsyncInvocation future = vm.invokeAsync(new SerializableRunnable("Shutdown all the members") { + + public void run() { + DistributedSystemConfig config; + AdminDistributedSystemImpl adminDS = null; + try { + config = AdminDistributedSystemFactory.defineDistributedSystem(cache + .getDistributedSystem(), ""); + adminDS = (AdminDistributedSystemImpl)AdminDistributedSystemFactory + .getDistributedSystem(config); + adminDS.connect(); + Set members = adminDS.shutDownAllMembers(timeout); + int num = members == null ? 0 : members.size(); + assertEquals(expectedNumber, num); + } + catch (AdminException e) { + throw new RuntimeException(e); + } + finally { + if (adminDS != null) { + adminDS.disconnect(); + } + } + } + }); + return future; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/WANConfigurationJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/WANConfigurationJUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/WANConfigurationJUnitTest.java new file mode 100644 index 0000000..b504f87 --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/WANConfigurationJUnitTest.java @@ -0,0 +1,601 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.misc; + +import com.gemstone.gemfire.cache.*; +import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener; +import com.gemstone.gemfire.cache.wan.*; +import com.gemstone.gemfire.cache30.MyGatewayEventFilter1; +import com.gemstone.gemfire.cache30.MyGatewayTransportFilter1; +import com.gemstone.gemfire.cache30.MyGatewayTransportFilter2; +import com.gemstone.gemfire.internal.AvailablePortHelper; +import com.gemstone.gemfire.internal.cache.LocalRegion; +import com.gemstone.gemfire.internal.cache.wan.GatewayReceiverException; +import com.gemstone.gemfire.internal.cache.wan.GatewaySenderException; +import com.gemstone.gemfire.internal.cache.wan.InternalGatewaySenderFactory; +import com.gemstone.gemfire.internal.cache.wan.MyGatewaySenderEventListener; +import com.gemstone.gemfire.internal.i18n.LocalizedStrings; +import com.gemstone.gemfire.test.junit.categories.IntegrationTest; +import org.junit.After; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import static com.gemstone.gemfire.distributed.ConfigurationProperties.MCAST_PORT; +import static org.junit.Assert.*; + +@Category(IntegrationTest.class) +public class WANConfigurationJUnitTest { + + private Cache cache; + + /** + * Test to validate that the sender can not be started without configuring + * locator + * @throws IOException + * + * @throws IOException + */ + @Test + public void test_GatewaySender_without_Locator() throws IOException { + try { + cache = new CacheFactory().set(MCAST_PORT, "0").create(); + GatewaySenderFactory fact = cache.createGatewaySenderFactory(); + fact.setParallel(true); + GatewaySender sender1 = fact.create("NYSender", 2); + sender1.start(); + fail("Expected IllegalStateException but not thrown"); + } + catch (Exception e) { + if ((e instanceof IllegalStateException && e + .getMessage() + .startsWith( + LocalizedStrings.AbstractGatewaySender_LOCATOR_SHOULD_BE_CONFIGURED_BEFORE_STARTING_GATEWAY_SENDER + .toLocalizedString()))) { + } + else { + fail("Expected IllegalStateException but received :" + e); + } + } + } + + /** + * Test to validate that sender with same Id can not be added to cache. + */ + @Test + public void test_SameGatewaySenderCreatedTwice() { + cache = new CacheFactory().set(MCAST_PORT, "0").create(); + try { + GatewaySenderFactory fact = cache.createGatewaySenderFactory(); + fact.setParallel(true); + fact.setManualStart(true); + fact.create("NYSender", 2); + fact.create("NYSender", 2); + fail("Expected IllegalStateException but not thrown"); + } + catch (Exception e) { + if (e instanceof IllegalStateException + && e.getMessage().contains("A GatewaySender with id")) { + + } + else { + fail("Expected IllegalStateException but received :" + e); + } + } + } + + /** + * Test to validate that same gatewaySender Id can not be added to the region attributes. + */ + @Test + public void test_SameGatewaySenderIdAddedTwice() { + try { + cache = new CacheFactory().set(MCAST_PORT, "0").create(); + GatewaySenderFactory fact = cache.createGatewaySenderFactory(); + fact.setParallel(true); + fact.setManualStart(true); + GatewaySender sender1 = fact.create("NYSender", 2); + AttributesFactory factory = new AttributesFactory(); + factory.addGatewaySenderId(sender1.getId()); + factory.addGatewaySenderId(sender1.getId()); + fail("Expected IllegalArgumentException but not thrown"); + } + catch (Exception e) { + if (e instanceof IllegalArgumentException + && e.getMessage().contains("is already added")) { + + } + else { + fail("Expected IllegalStateException but received :" + e); + } + } + } + + @Test + public void test_GatewaySenderIdAndAsyncEventId() { + cache = new CacheFactory().set(MCAST_PORT, "0").create(); + AttributesFactory factory = new AttributesFactory(); + factory.addGatewaySenderId("ln"); + factory.addGatewaySenderId("ny"); + factory.addAsyncEventQueueId("Async_LN"); + RegionAttributes attrs = factory.create(); + + Set<String> senderIds = new HashSet<String>(); + senderIds.add("ln"); + senderIds.add("ny"); + Set<String> attrsSenderIds = attrs.getGatewaySenderIds(); + assertEquals(senderIds, attrsSenderIds); + Region r = cache.createRegion("Customer", attrs); + assertEquals(senderIds, ((LocalRegion)r).getGatewaySenderIds()); + } + + /** + * Test to validate that distributed region can not have the gateway sender + * with parallel distribution policy + * + */ + @Ignore("Bug51491") + @Test + public void test_GatewaySender_Parallel_DistributedRegion() { + cache = new CacheFactory().set(MCAST_PORT, "0").create(); + GatewaySenderFactory fact = cache.createGatewaySenderFactory(); + fact.setParallel(true); + fact.setManualStart(true); + GatewaySender sender1 = fact.create("NYSender", 2); + AttributesFactory factory = new AttributesFactory(); + factory.addGatewaySenderId(sender1.getId()); + factory.setScope(Scope.DISTRIBUTED_ACK); + factory.setDataPolicy(DataPolicy.REPLICATE); + try { + RegionFactory regionFactory = cache.createRegionFactory(factory.create()); + Region region = regionFactory + .create("test_GatewaySender_Parallel_DistributedRegion"); + } + catch (Exception e) { + fail("Unexpected Exception :" + e); + } + } + + @Test + public void test_GatewaySender_Parallel_MultipleDispatcherThread() { + cache = new CacheFactory().set(MCAST_PORT, "0").create(); + GatewaySenderFactory fact = cache.createGatewaySenderFactory(); + fact.setParallel(true); + fact.setManualStart(true); + fact.setDispatcherThreads(4); + try { + GatewaySender sender1 = fact.create("NYSender", 2); + } + catch (GatewaySenderException e) { + fail("UnExpected Exception " + e); + } + } + + @Test + public void test_GatewaySender_Serial_ZERO_DispatcherThread() { + cache = new CacheFactory().set(MCAST_PORT, "0").create(); + GatewaySenderFactory fact = cache.createGatewaySenderFactory(); + fact.setManualStart(true); + fact.setDispatcherThreads(0); + try { + GatewaySender sender1 = fact.create("NYSender", 2); + fail("Expected GatewaySenderException but not thrown"); + } + catch (GatewaySenderException e) { + if (e.getMessage().contains("can not be created with dispatcher threads less than 1")) { + } + else { + fail("Expected IllegalStateException but received :" + e); + } + } + } + + /** + * Test to validate the gateway receiver attributes are correctly set + */ + @Test + public void test_ValidateGatewayReceiverAttributes() { + cache = new CacheFactory().set(MCAST_PORT, "0").create(); + int[] randomAvailableTCPPorts = AvailablePortHelper.getRandomAvailableTCPPorts(2); + int port1 = randomAvailableTCPPorts[0]; + int port2 = randomAvailableTCPPorts[1]; + + GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); + if(port1 < port2){ + fact.setStartPort(port1); + fact.setEndPort(port2); + }else{ + fact.setStartPort(port2); + fact.setEndPort(port1); + } + + fact.setMaximumTimeBetweenPings(2000); + fact.setSocketBufferSize(200); + GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1(); + GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2(); + fact.addGatewayTransportFilter(myStreamFilter2); + fact.addGatewayTransportFilter(myStreamFilter1); + GatewayReceiver receiver1 = fact.create(); + + + Region region = cache.createRegionFactory().create( + "test_ValidateGatewayReceiverAttributes"); + Set<GatewayReceiver> receivers = cache.getGatewayReceivers(); + GatewayReceiver rec = receivers.iterator().next(); + assertEquals(receiver1.getHost(), rec.getHost()); + assertEquals(receiver1.getStartPort(), rec.getStartPort()); + assertEquals(receiver1.getEndPort(), rec.getEndPort()); + assertEquals(receiver1.getBindAddress(), rec.getBindAddress()); + assertEquals(receiver1.getMaximumTimeBetweenPings(), rec + .getMaximumTimeBetweenPings()); + assertEquals(receiver1.getSocketBufferSize(), rec + .getSocketBufferSize()); + assertEquals(receiver1.getGatewayTransportFilters().size(), rec + .getGatewayTransportFilters().size()); + + } + + @Test + public void test_ValidateGatewayReceiverStatus() { + cache = new CacheFactory().set(MCAST_PORT, "0").create(); + int[] randomAvailableTCPPorts = AvailablePortHelper.getRandomAvailableTCPPorts(2); + int port1 = randomAvailableTCPPorts[0]; + int port2 = randomAvailableTCPPorts[1]; + + GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); + if(port1 < port2){ + fact.setStartPort(port1); + fact.setEndPort(port2); + }else{ + fact.setStartPort(port2); + fact.setEndPort(port1); + } + + fact.setMaximumTimeBetweenPings(2000); + fact.setSocketBufferSize(200); + GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1(); + GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2(); + fact.addGatewayTransportFilter(myStreamFilter2); + fact.addGatewayTransportFilter(myStreamFilter1); + GatewayReceiver receiver1 = fact.create(); + assertTrue(receiver1.isRunning()); + } + + /** + * Test to validate that serial gateway sender attributes are correctly set + */ + @Test + public void test_ValidateSerialGatewaySenderAttributes() { + cache = new CacheFactory().set(MCAST_PORT, "0").create(); + GatewaySenderFactory fact = cache.createGatewaySenderFactory(); + fact.setManualStart(true); + fact.setBatchConflationEnabled(true); + fact.setBatchSize(200); + fact.setBatchTimeInterval(300); + fact.setPersistenceEnabled(false); + fact.setDiskStoreName("FORNY"); + fact.setMaximumQueueMemory(200); + fact.setAlertThreshold(1200); + GatewayEventFilter myEventFilter1 = new MyGatewayEventFilter1(); + fact.addGatewayEventFilter(myEventFilter1); + GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1(); + fact.addGatewayTransportFilter(myStreamFilter1); + GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2(); + fact.addGatewayTransportFilter(myStreamFilter2); + GatewaySender sender1 = fact.create("TKSender", 2); + + + AttributesFactory factory = new AttributesFactory(); + factory.addGatewaySenderId(sender1.getId()); + factory.setDataPolicy(DataPolicy.PARTITION); + Region region = cache.createRegionFactory(factory.create()).create( + "test_ValidateGatewaySenderAttributes"); + Set<GatewaySender> senders = cache.getGatewaySenders(); + assertEquals(senders.size(), 1); + GatewaySender gatewaySender = senders.iterator().next(); + assertEquals(sender1.getRemoteDSId(), gatewaySender + .getRemoteDSId()); + assertEquals(sender1.isManualStart(), gatewaySender.isManualStart()); + assertEquals(sender1.isBatchConflationEnabled(), gatewaySender + .isBatchConflationEnabled()); + assertEquals(sender1.getBatchSize(), gatewaySender.getBatchSize()); + assertEquals(sender1.getBatchTimeInterval(), gatewaySender + .getBatchTimeInterval()); + assertEquals(sender1.isPersistenceEnabled(), gatewaySender + .isPersistenceEnabled()); + assertEquals(sender1.getDiskStoreName(), gatewaySender.getDiskStoreName()); + assertEquals(sender1.getMaximumQueueMemory(), gatewaySender + .getMaximumQueueMemory()); + assertEquals(sender1.getAlertThreshold(), gatewaySender + .getAlertThreshold()); + assertEquals(sender1.getGatewayEventFilters().size(), gatewaySender + .getGatewayEventFilters().size()); + assertEquals(sender1.getGatewayTransportFilters().size(), gatewaySender + .getGatewayTransportFilters().size()); + + } + + /** + * Test to validate that parallel gateway sender attributes are correctly set + */ + @Test + public void test_ValidateParallelGatewaySenderAttributes() { + cache = new CacheFactory().set(MCAST_PORT, "0").create(); + GatewaySenderFactory fact = cache.createGatewaySenderFactory(); + fact.setParallel(true); + fact.setManualStart(true); + fact.setBatchConflationEnabled(true); + fact.setBatchSize(200); + fact.setBatchTimeInterval(300); + fact.setPersistenceEnabled(false); + fact.setDiskStoreName("FORNY"); + fact.setMaximumQueueMemory(200); + fact.setAlertThreshold(1200); + GatewayEventFilter myEventFilter1 = new MyGatewayEventFilter1(); + fact.addGatewayEventFilter(myEventFilter1); + GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1(); + fact.addGatewayTransportFilter(myStreamFilter1); + GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2(); + fact.addGatewayTransportFilter(myStreamFilter2); + GatewaySender sender1 = fact.create("TKSender", 2); + + + AttributesFactory factory = new AttributesFactory(); + factory.addGatewaySenderId(sender1.getId()); + factory.setDataPolicy(DataPolicy.PARTITION); + Region region = cache.createRegionFactory(factory.create()).create( + "test_ValidateGatewaySenderAttributes"); + Set<GatewaySender> senders = cache.getGatewaySenders(); + assertEquals(1, senders.size()); + GatewaySender gatewaySender = senders.iterator().next(); + assertEquals(sender1.getRemoteDSId(), gatewaySender + .getRemoteDSId()); + assertEquals(sender1.isManualStart(), gatewaySender.isManualStart()); + assertEquals(sender1.isBatchConflationEnabled(), gatewaySender + .isBatchConflationEnabled()); + assertEquals(sender1.getBatchSize(), gatewaySender.getBatchSize()); + assertEquals(sender1.getBatchTimeInterval(), gatewaySender + .getBatchTimeInterval()); + assertEquals(sender1.isPersistenceEnabled(), gatewaySender + .isPersistenceEnabled()); + assertEquals(sender1.getDiskStoreName(), gatewaySender.getDiskStoreName()); + assertEquals(sender1.getMaximumQueueMemory(), gatewaySender + .getMaximumQueueMemory()); + assertEquals(sender1.getAlertThreshold(), gatewaySender + .getAlertThreshold()); + assertEquals(sender1.getGatewayEventFilters().size(), gatewaySender + .getGatewayEventFilters().size()); + assertEquals(sender1.getGatewayTransportFilters().size(), gatewaySender + .getGatewayTransportFilters().size()); + + } + + @Test + public void test_GatewaySenderWithGatewaySenderEventListener1() { + cache = new CacheFactory().set(MCAST_PORT, "0").create(); + InternalGatewaySenderFactory fact = (InternalGatewaySenderFactory)cache.createGatewaySenderFactory(); + AsyncEventListener listener = new MyGatewaySenderEventListener(); + ((InternalGatewaySenderFactory)fact).addAsyncEventListener(listener); + try { + fact.create("ln", 2); + fail("Expected GatewaySenderException. When a sender is added , remoteDSId should not be provided."); + } catch (Exception e) { + if (e instanceof GatewaySenderException + && e.getMessage() + .contains( + "cannot define a remote site because at least AsyncEventListener is already added.")) { + + } else { + fail("Expected GatewaySenderException but received :" + e); + } + } + } + + @Test + public void test_GatewaySenderWithGatewaySenderEventListener2() { + cache = new CacheFactory().set(MCAST_PORT, "0").create(); + GatewaySenderFactory fact = cache.createGatewaySenderFactory(); + AsyncEventListener listener = new MyGatewaySenderEventListener(); + ((InternalGatewaySenderFactory)fact).addAsyncEventListener(listener); + try { + ((InternalGatewaySenderFactory)fact).create("ln"); + } catch (Exception e) { + fail("Received Exception :" + e); + } + } + + @Test + public void test_ValidateGatewayReceiverAttributes_2() { + cache = new CacheFactory().set(MCAST_PORT, "0").create(); + GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); + fact.setStartPort(50504); + fact.setMaximumTimeBetweenPings(1000); + fact.setSocketBufferSize(4000); + fact.setEndPort(70707); + fact.setManualStart(true); + GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1(); + fact.addGatewayTransportFilter(myStreamFilter1); + + GatewayReceiver receiver = fact.create(); + try { + receiver.start(); + } + catch (IOException e) { + fail("The test failed with IOException"); + } + + assertEquals(50504, receiver.getStartPort()); + assertEquals(1000, receiver.getMaximumTimeBetweenPings()); + assertEquals(4000,receiver.getSocketBufferSize()); + assertEquals(70707, receiver.getEndPort()); + } + + /** + * This test takes a minimum of 120s to execute. It is known to hang on Mac OS + * X Yosemite do to changes in the the message string checked in + * GatewayReceiverImpl around line 167. Expects + * "Cannot assign requested address" but gets + * "Can't assign requested address". Timeout after 150s to safeguard against + * hanging on other platforms that may differ. + */ + @Test(timeout = 150000) + public void test_ValidateGatewayReceiverAttributes_WrongBindAddress() { + if (System.getProperty("os.name").equals("Mac OS X")) { + fail("Failing to avoid known hang on Mac OS X."); + } + cache = new CacheFactory().set(MCAST_PORT, "0").create(); + GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); + fact.setStartPort(50504); + fact.setMaximumTimeBetweenPings(1000); + fact.setSocketBufferSize(4000); + fact.setEndPort(70707); + fact.setManualStart(true); + fact.setBindAddress("200.112.204.10"); + GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1(); + fact.addGatewayTransportFilter(myStreamFilter1); + + GatewayReceiver receiver = fact.create(); + try { + receiver.start(); + fail("Expected GatewayReceiverException"); + } + catch (GatewayReceiverException gRE){ + assertTrue(gRE.getMessage().contains("Failed to create server socket on")); + } + catch (IOException e) { + e.printStackTrace(); + fail("The test failed with IOException"); + } + } + + @Test + public void test_ValidateGatewayReceiverDefaultStartPortAndDefaultEndPort() { + cache = new CacheFactory().set(MCAST_PORT, "0").create(); + GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); + fact.setMaximumTimeBetweenPings(1000); + fact.setSocketBufferSize(4000); + fact.setManualStart(true); + GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1(); + fact.addGatewayTransportFilter(myStreamFilter1); + + GatewayReceiver receiver = fact.create(); + try { + receiver.start(); + } + catch (IOException e) { + fail("The test failed with IOException"); + } + int port = receiver.getPort(); + if((port < 5000) || (port > 5500)) { + fail("GatewayReceiver started on out of range port"); + } + } + + @Test + public void test_ValidateGatewayReceiverDefaultStartPortAndEndPortProvided() { + cache = new CacheFactory().set(MCAST_PORT, "0").create(); + GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); + fact.setMaximumTimeBetweenPings(1000); + fact.setSocketBufferSize(4000); + fact.setEndPort(50707); + fact.setManualStart(true); + GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1(); + fact.addGatewayTransportFilter(myStreamFilter1); + + GatewayReceiver receiver = fact.create(); + try { + receiver.start(); + } + catch (IOException e) { + fail("The test failed with IOException"); + } + int port = receiver.getPort(); + if((port < GatewayReceiver.DEFAULT_START_PORT) || (port > 50707)) { + fail("GatewayReceiver started on out of range port"); + } + } + + @Test + public void test_ValidateGatewayReceiverWithManualStartFALSE() { + cache = new CacheFactory().set(MCAST_PORT, "0").create(); + GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); + fact.setMaximumTimeBetweenPings(1000); + fact.setSocketBufferSize(4000); + fact.setStartPort(5303); + fact.setManualStart(false); + GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1(); + fact.addGatewayTransportFilter(myStreamFilter1); + GatewayReceiver receiver = fact.create(); + int port = receiver.getPort(); + if ((port < 5303) || (port > GatewayReceiver.DEFAULT_END_PORT)) { + fail("GatewayReceiver started on out of range port"); + } + } + + @Test + public void test_ValidateGatewayReceiverWithStartPortAndDefaultEndPort() { + cache = new CacheFactory().set(MCAST_PORT, "0").create(); + GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); + fact.setMaximumTimeBetweenPings(1000); + fact.setSocketBufferSize(4000); + fact.setStartPort(5303); + fact.setManualStart(true); + GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1(); + fact.addGatewayTransportFilter(myStreamFilter1); + + GatewayReceiver receiver = fact.create(); + try { + receiver.start(); + } + catch (IOException e) { + fail("The test failed with IOException"); + } + int port = receiver.getPort(); + if ((port < 5303) || (port > GatewayReceiver.DEFAULT_END_PORT)) { + fail("GatewayReceiver started on out of range port"); + } + } + + @Test + public void test_ValidateGatewayReceiverWithWrongEndPortProvided() { + cache = new CacheFactory().set(MCAST_PORT, "0").create(); + try { + GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); + fact.setMaximumTimeBetweenPings(1000); + fact.setSocketBufferSize(4000); + fact.setEndPort(4999); + GatewayReceiver receiver = fact.create(); + fail("wrong end port set in the GatewayReceiver"); + } catch (IllegalStateException expected) { + if(!expected.getMessage().contains("Please specify either start port a value which is less than end port.")){ + fail("Caught IllegalStateException"); + expected.printStackTrace(); + } + } + } + + @After + public void tearDown() throws Exception { + if (this.cache != null) { + this.cache.close(); + } + } +}
