http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPropagationDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPropagationDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPropagationDUnitTest.java deleted file mode 100644 index 3ae9a01..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPropagationDUnitTest.java +++ /dev/null @@ -1,1063 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.misc; - -import org.junit.Ignore; -import org.junit.experimental.categories.Category; -import org.junit.Test; - -import static org.junit.Assert.*; - -import com.gemstone.gemfire.test.junit.categories.DistributedTest; - -import com.gemstone.gemfire.cache.CacheClosedException; -import com.gemstone.gemfire.cache.DataPolicy; -import com.gemstone.gemfire.cache.Region; -import com.gemstone.gemfire.cache.Scope; -import com.gemstone.gemfire.internal.cache.ForceReattemptException; -import com.gemstone.gemfire.internal.cache.wan.WANTestBase; -import com.gemstone.gemfire.test.dunit.Assert; -import com.gemstone.gemfire.test.dunit.AsyncInvocation; -import com.gemstone.gemfire.test.dunit.IgnoredException; -import com.gemstone.gemfire.test.dunit.LogWriterUtils; -import com.gemstone.gemfire.test.dunit.Wait; - -@Category(DistributedTest.class) -public class ReplicatedRegion_ParallelWANPropagationDUnitTest extends WANTestBase{ - - public ReplicatedRegion_ParallelWANPropagationDUnitTest() { - super(); - // TODO Auto-generated constructor stub - } - - final String expectedExceptions = null; - - - /** - * - */ - private static final long serialVersionUID = 1L; - - @Test - public void test_DR_PGS_1Nodes_Put_Receiver() throws Exception { - try { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2); - vm2.invoke(() -> WANTestBase.createReceiver()); - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - createCacheInVMs(lnPort, vm4); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln1", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.createSender( "ln1", 2, - true, 10, 100, false, false, null, true )); - - vm4.invoke(() -> WANTestBase.startSender( "ln1" )); - fail("Expected GatewaySenderConfigException where parallel gateway sender can not be used with replicated region"); - } - catch (Exception e) { - if (!e.getCause().getMessage() - .contains("can not be used with replicated region")) { - fail("Expected GatewaySenderConfigException where parallel gateway sender can not be used with replicated region"); - } - } - } - - /*1. Validate that parallelGatewaySenderId can be added to distributed region - *Region distributed ack/noack + PGS - *1. Find out the restrictions on totalNumBuckets on shadowPR - *2. Find out the restrictions on redundancy on shadowPR - *3. Find out the restrictions on localMaxMemory on shadowPR - *4. Find out the best way user will specify PR attributes to PGS - *5. Find out the restrictions on ordering. - *6. put on region populates the queue - *7. put on region reaches to remote site. Dispatcher works as expected - *8. m1 and m2 has DR(ack/noack). put on DR from m1 populates queue on both m1 and m2. Validate that remote site got all the events - *9. m1 and m2 has DR(ack/noack). create/put/destroy/operations populates the queue. Validate that remote site got correct events - *10. m1 and m2 has DR(ack/noack). localDestroy is called on m1's DR. This locally destroys M1's shadowPr - *11. m1 and m2 has DR(ack/noack). destroy is called on m1's DR. This destroys entire shadowPr on m1 and m2 - *12. m1 and m2 has DR(ack/noack). close Region is called on m1's DR. This locally destroys shadowPr on m1 - *13. m1 and m2 has DR(ack/noack). cache.close on m1'. This locally destroys shadowPr on m1 - *14. Validate HA scenario does not cause any event loss - *15. PDX events of DR are propagated to remote sites - *16. validate stats - *17: PR and DR regions with same name.. Can this be created. If yes then how to differentiate these 2 different shadowPR. - *18. test for redundancy. FOR SPR's redundancy will be equal to the number of nodes where DR is present. Max is 3. I know this needs to be figure it out at runtime. - *19. test without providing diskStoreName..I suspect some problem with this code. diskStoreName=null looks like this is not handled very well. need to verify - *20. ParallelGatewaySenderQueue#addPR method has multiple check for inPersistenceEnabled. Can's we do it with only one check. - */ - - /** - * Test to validate that created parallel gatewaySenders id can be added to - * distributed region - * Below test is disabled intentionally - 1> In this release 8.0, for rolling upgrade support queue name is changed to old style - 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about - ParallelGatewaySenderQueue#convertPathToName - 3> We have to enabled it in next release - 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 - and version prior to 8.0 - */ - @Ignore - @Test - public void test_PGS_Started_DR_CREATED_NO_RECEIVER() throws Exception { - try { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - vm4.invoke(() -> WANTestBase.createCache( lnPort )); -/* ExpectedException exp1 = addExpectedException(GatewaySenderException.class - .getName(), vm4); - ExpectedException exp2 = addExpectedException(InterruptedException.class - .getName(), vm4); - try { -*/ vm4.invoke(() -> WANTestBase.createSender( "ln1", 2, - true, 10, 100, false, false, null, false )); - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln1", isOffHeap() )); - vm4.invoke(() -> WANTestBase.doPuts( - getTestMethodName() + "_RR", 1000 )); - vm4.invoke(() -> WANTestBase.validateQueueContents( - "ln1", 1000 )); - -/* } - finally { - exp1.remove(); - exp2.remove(); - } -*/ } - catch (Exception e) { - Assert.fail("Unexpected exception", e); - } - } - - /** - * Test to validate that distributed region with given parallelGatewaySender id - * is created first and then a same parallelGatewaySender is created - * a single put in DR is enqueued in parallelQueue - * Below test is disabled intentionally - 1> In this release 8.0, for rolling upgrade support queue name is changed to old style - 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about - ParallelGatewaySenderQueue#convertPathToName - 3> We have to enabled it in next release - 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 - and version prior to 8.0 - */ - @Ignore - @Test - public void test_DR_CREATED_PGS_STARTED_NO_RECEIVER() throws Exception { - try { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln1", isOffHeap() )); -/* ExpectedException exp1 = addExpectedException(GatewaySenderException.class - .getName(), vm4); - ExpectedException exp2 = addExpectedException(InterruptedException.class - .getName(), vm4); - try {*/ - vm4.invoke(() -> WANTestBase.createSender( "ln1", 2, - true, 10, 100, false, false, null, false )); - vm4.invoke(() -> WANTestBase.doPuts( - getTestMethodName() + "_RR", 1000 )); - vm4.invoke(() -> WANTestBase.validateQueueContents( - "ln1", 1000 )); -/* } - finally { - exp1.remove(); - exp2.remove(); - } -*/ } - catch (Exception e) { - Assert.fail("Unexpected exception", e); - } - } - - /**Below test is disabled intentionally - 1> In this release 8.0, for rolling upgrade support queue name is changed to old style - 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about - ParallelGatewaySenderQueue#convertPathToName - 3> We have to enabled it in next release - 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 - and version prior to 8.0*/ - @Ignore - @Test - public void test_DR_PGS_1Node_Put_ValidateQueue_No_Receiver() throws Exception { - try { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln1", isOffHeap() )); - -/* ExpectedException exp1 = addExpectedException(GatewaySenderException.class - .getName(), vm4); - ExpectedException exp2 = addExpectedException(InterruptedException.class - .getName(), vm4); - try {*/ - vm4.invoke(() -> WANTestBase.createSender( "ln1", 2, - true, 10, 100, false, false, null, true )); - vm4.invoke(() -> WANTestBase.startSender( "ln1" )); - - vm4.invoke(() -> WANTestBase.doPuts( - getTestMethodName() + "_RR", 10000 )); - - vm4.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 10000 )); - vm4.invoke(() -> WANTestBase.validateQueueContents( - "ln1", 10000 )); -/* } - finally { - exp1.remove(); - exp2.remove(); - } - */ - } - catch (Exception e) { - Assert.fail("Unexpected exception", e); - } - } - - /**Below test is disabled intentionally - 1> In this release 8.0, for rolling upgrade support queue name is changed to old style - 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about - ParallelGatewaySenderQueue#convertPathToName - 3> We have to enabled it in next release - 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 - and version prior to 8.0*/ - @Ignore - @Test - public void test_DR_PGS_2Nodes_Put_ValidateQueue_No_Receiver() throws Exception { - try { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln1", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln1", isOffHeap() )); - -/* ExpectedException exp1 = addExpectedException( - GatewaySenderException.class.getName()); - ExpectedException exp2 = addExpectedException( - InterruptedException.class.getName()); - ExpectedException exp3 = addExpectedException( - CacheClosedException.class.getName()); - try { -*/ vm4.invoke(() -> WANTestBase.createSender( "ln1", 2, - true, 10, 100, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln1", 2, - true, 10, 100, false, false, null, true )); - - startSenderInVMs("ln1", vm4, vm5); - - vm4.invoke(() -> WANTestBase.doPuts( - getTestMethodName() + "_RR", 1000 )); - - vm4.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - vm5.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - - vm4.invoke(() -> WANTestBase.validateQueueContents( - "ln1", 1000 )); - vm5.invoke(() -> WANTestBase.validateQueueContents( - "ln1", 1000 )); - -/* } - finally { - exp1.remove(); - exp2.remove(); - exp3.remove(); - } -*/ - } - catch (Exception e) { - Assert.fail("Unexpected exception", e); - } - } - -// public void test_DR_PGS_ORDERPOLICY_PARTITION_EXPECTException(){ -// -// } -// public void test_DR_PGS_DISKSTORE_NAME_PROVIDED_VALIDATE_DISK(){ -// -// } -// public void test_DR_PGS_DISKSTORE_NAME_NOT_PROVIDED_VALIDATE_DISK(){ -// -// } -// -// public void test_DR_PGS_START_STOP_START(){ -// -// } -// -// public void test_DR_PGS_PERSISTENCE_START_STOP_START(){ -// -// } -// -// public void test_DR_PGS_START_PAUSE_STOP(){ -// -// } -// -// public void test_DR_PGS_START_PAUSE_RESUME_VALIDATE_RECEIVER(){ -// -// } - - /**Below test is disabled intentionally - 1> In this release 8.0, for rolling upgrade support queue name is changed to old style - 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about - ParallelGatewaySenderQueue#convertPathToName - 3> We have to enabled it in next release - 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 - and version prior to 8.0*/ - @Ignore - @Test - public void test_DR_PGS_1Nodes_Put_Receiver_2() throws Exception { - try { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2); - vm2.invoke(() -> WANTestBase.createReceiver()); - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - createCacheInVMs(lnPort, vm4); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln1", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.createSender( "ln1", 2, - true, 10, 100, false, false, null, true)); - - vm4.invoke(() -> WANTestBase.startSender( "ln1")); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000)); - - vm4.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR", - 1000)); - - vm4.invoke(() -> WANTestBase.validateQueueContents( "ln1", - 0 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000)); - } - catch (Exception e) { - Assert.fail("Unexpected exception", e); - } - } - - /**Below test is disabled intentionally - 1> In this release 8.0, for rolling upgrade support queue name is changed to old style - 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about - ParallelGatewaySenderQueue#convertPathToName - 3> We have to enabled it in next release - 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 - and version prior to 8.0*/ - @Ignore - @Test - public void test_DR_PGS_2Nodes_Put_Receiver() throws Exception { - try { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2); - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm2.invoke(() -> WANTestBase.createReceiver()); - - createCacheInVMs(lnPort, vm4, vm5); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln1", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln1", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.createSender( "ln1", 2, - true, 10, 100, false, false, null, true)); - vm5.invoke(() -> WANTestBase.createSender( "ln1", 2, - true, 10, 100, false, false, null, true)); - - startSenderInVMs("ln1", vm4, vm5); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - - vm4.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR", - 1000 )); - vm5.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR", - 1000 )); - - vm4.invoke(() -> WANTestBase.validateQueueContents( "ln1", - 0 )); - vm5.invoke(() -> WANTestBase.validateQueueContents( "ln1", - 0 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - } - catch (Exception e) { - Assert.fail("Unexpected exception", e); - } - } - - /**Below test is disabled intentionally - 1> In this release 8.0, for rolling upgrade support queue name is changed to old style - 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about - ParallelGatewaySenderQueue#convertPathToName - 3> We have to enabled it in next release - 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 - and version prior to 8.0*/ - @Ignore - @Test - public void test_DR_PGS_2Nodes_EMPTY_Put_Receiver() throws Exception { - try { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - vm2.invoke(() -> WANTestBase.createCache( nyPort )); - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm2.invoke(() -> WANTestBase.createReceiver()); - - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln1", Scope.DISTRIBUTED_ACK, DataPolicy.EMPTY, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln1", Scope.DISTRIBUTED_ACK, DataPolicy.REPLICATE, isOffHeap() )); - - vm4.invoke(() -> WANTestBase.createSender( "ln1", 2, - true, 10, 100, false, false, null, true)); - vm5.invoke(() -> WANTestBase.createSender( "ln1", 2, - true, 10, 100, false, false, null, true)); - - startSenderInVMs("ln1", vm4, vm5); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - -// vm4.invoke(() -> WANTestBase.validateRegionSize( testName + "_RR", -// 1000 )); - vm5.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR", - 1000 )); - - vm4.invoke(() -> WANTestBase.validateQueueContents( "ln1", - 0 )); - vm5.invoke(() -> WANTestBase.validateQueueContents( "ln1", - 0 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - } - catch (Exception e) { - Assert.fail("Unexpected exception", e); - } - } - - /**Below test is disabled intentionally - 1> In this release 8.0, for rolling upgrade support queue name is changed to old style - 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about - ParallelGatewaySenderQueue#convertPathToName - 3> We have to enabled it in next release - 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 - and version prior to 8.0*/ - @Ignore - @Test - public void test_DR_PR_PGS_4Nodes_Put_Receiver_2Nodes() throws Exception { - try { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - vm2.invoke(() -> WANTestBase.createReceiver()); - vm3.invoke(() -> WANTestBase.createReceiver()); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 10, 100, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 10, 100, false, false, null, true )); - vm6.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 10, 100, false, false, null, true )); - vm7.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 10, 100, false, false, null, true )); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - vm5.invoke(() -> WANTestBase.doNextPuts( getTestMethodName() + "_PR", - 1000, 2000 )); - - vm4.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_PR", - 1000 )); - vm5.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR", - 1000 )); - - vm4.invoke(() -> WANTestBase.validateQueueContents( "ln", - 0 )); - vm5.invoke(() -> WANTestBase.validateQueueContents( "ln", - 0 )); - -/* ExpectedException exp1 = addExpectedException(CacheClosedException.class - .getName()); - try {*/ - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 1000 )); -/* } - finally { - exp1.remove(); - } -*/ } - catch (Exception e) { - Assert.fail("Unexpected exception", e); - } - } - - /**Below test is disabled intentionally - 1> In this release 8.0, for rolling upgrade support queue name is changed to old style - 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about - ParallelGatewaySenderQueue#convertPathToName - 3> We have to enabled it in next release - 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 - and version prior to 8.0*/ - @Ignore - @Test - public void test_DR_PGS_NOMANUALSTART_4Nodes_Put_ValidateReceiver() throws Exception { - try { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2); - vm2.invoke(() -> WANTestBase.createReceiver()); - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln1", 2, - true, 10, 100, false, false, null, false )); - vm5.invoke(() -> WANTestBase.createSender( "ln1", 2, - true, 10, 100, false, false, null, false )); - vm6.invoke(() -> WANTestBase.createSender( "ln1", 2, - true, 10, 100, false, false, null, false )); - vm7.invoke(() -> WANTestBase.createSender( "ln1", 2, - true, 10, 100, false, false, null, false )); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln1", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln1", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln1", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln1", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - - vm4.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR", - 1000 )); - vm5.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR", - 1000 )); - vm6.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR", - 1000 )); - vm7.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR", - 1000 )); - - - vm4.invoke(() -> WANTestBase.validateQueueContents( "ln1", - 0 )); - vm5.invoke(() -> WANTestBase.validateQueueContents( "ln1", - 0 )); - vm6.invoke(() -> WANTestBase.validateQueueContents( "ln1", - 0 )); - vm7.invoke(() -> WANTestBase.validateQueueContents( "ln1", - 0 )); - -/* ExpectedException exp1 = addExpectedException(CacheClosedException.class - .getName()); - try {*/ - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); -/* } - finally { - exp1.remove(); - }*/ - } - catch (Exception e) { - Assert.fail("Unexpected exception", e); - } - } - - /**Below test is disabled intentionally - 1> In this release 8.0, for rolling upgrade support queue name is changed to old style - 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about - ParallelGatewaySenderQueue#convertPathToName - 3> We have to enabled it in next release - 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 - and version prior to 8.0*/ - @Ignore - @Test - public void test_DR_PGS_4Nodes_Put_CLOSE4NODESCACHE_RECREATE_PUT_ValidateReceiver() - throws Exception { - try { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm6.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm7.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - // before doing any puts, let the senders be running in order to ensure - // that - // not a single event will be lost - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - -/* ExpectedException exp1 = addExpectedException(CacheClosedException.class - .getName()); - try {*/ - vm4.invoke(() -> WANTestBase.killSender()); - vm5.invoke(() -> WANTestBase.killSender()); - vm6.invoke(() -> WANTestBase.killSender()); - vm7.invoke(() -> WANTestBase.killSender()); -/* } - finally { - exp1.remove(); - }*/ - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm6.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm7.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - // ------------------------------------------------------------------------------------ - - vm4.invoke(() -> WANTestBase.doNextPuts( - getTestMethodName() + "_RR", 1000, 2000 )); - - // verify all buckets drained on all sender nodes. - vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" )); - vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" )); - vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" )); - vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" )); - -/* exp1 = addExpectedException(CacheClosedException.class.getName()); - try {*/ - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 2000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 2000 )); -/* } - finally { - exp1.remove(); - }*/ - } - catch (Exception e) { - Assert.fail("Unexpected exception", e); - } - - } - - /**Below test is disabled intentionally - 1> In this release 8.0, for rolling upgrade support queue name is changed to old style - 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about - ParallelGatewaySenderQueue#convertPathToName - 3> We have to enabled it in next release - 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 - and version prior to 8.0*/ - @Ignore - @Test - public void test_DR_NO_ACK_PGS_2Nodes_Put_ValidateQueue_Receiver() throws Exception { - try { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2); - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm2.invoke(() -> WANTestBase.createReceiver()); - - createCacheInVMs(lnPort, vm4, vm5); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( getTestMethodName() + "_RR", "ln1", - Scope.DISTRIBUTED_NO_ACK, DataPolicy.REPLICATE, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( getTestMethodName() + "_RR", "ln1", - Scope.DISTRIBUTED_NO_ACK, DataPolicy.REPLICATE, isOffHeap() )); - - vm4.invoke(() -> WANTestBase.createSender( "ln1", 2, - true, 10, 100, false, false, null, true)); - vm5.invoke(() -> WANTestBase.createSender( "ln1", 2, - true, 10, 100, false, false, null, true)); - - startSenderInVMs("ln1", vm4, vm5); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - - vm4.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR", - 1000 )); - vm5.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR", - 1000 )); - - vm4.invoke(() -> WANTestBase.validateQueueContents( "ln1", - 0 )); - vm5.invoke(() -> WANTestBase.validateQueueContents( "ln1", - 0 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - } - catch (Exception e) { - Assert.fail("Unexpected exception", e); - } - } - - /**Below test is disabled intentionally - 1> In this release 8.0, for rolling upgrade support queue name is changed to old style - 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about - ParallelGatewaySenderQueue#convertPathToName - 3> We have to enabled it in next release - 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 - and version prior to 8.0*/ - @Ignore - @Test - public void test_DR_PGS_2NODES_1NODESDOWN_Validate_Receiver() throws Exception { - - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, true, - 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, true, - 100, 10, false, false, null, true )); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - - Thread.sleep(60000);; - -/* ExpectedException exp1 = addExpectedException(CacheClosedException.class - .getName()); - try {*/ - AsyncInvocation inv1 = vm4.invokeAsync(() -> ReplicatedRegion_ParallelWANPropagationDUnitTest.doPuts0( - getTestMethodName() + "_RR", 1000 )); - Wait.pause(1000); - AsyncInvocation inv2 = vm5.invokeAsync(() -> WANTestBase.killSender()); - try { - inv1.join(); - inv2.join(); - } - catch (Exception e) { - Assert.fail("UnExpected Exception", e); - } -/* } - finally { - exp1.remove(); - }*/ - - Integer size = (Integer)vm4.invoke(() -> WANTestBase.getQueueContentSize( "ln" )); - LogWriterUtils.getLogWriter().info("The size of the queue is in vm4 " + size); - - - vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" )); - - size = (Integer)vm4.invoke(() -> WANTestBase.getQueueContentSize( "ln" )); - LogWriterUtils.getLogWriter().info("The size of the queue is in vm4 " + size); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - } - - /**Below test is disabled intentionally - 1> In this release 8.0, for rolling upgrade support queue name is changed to old style - 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about - ParallelGatewaySenderQueue#convertPathToName - 3> We have to enabled it in next release - 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 - and version prior to 8.0*/ - @Ignore - @Test - public void test_DR_PGS_4NODES_2NODESDOWN_Validate_Receiver() throws Exception { - - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, true, - 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, true, - 100, 10, false, false, null, true )); - vm6.invoke(() -> WANTestBase.createSender( "ln", 2, true, - 100, 10, false, false, null, true )); - vm7.invoke(() -> WANTestBase.createSender( "ln", 2, true, - 100, 10, false, false, null, true )); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - - Thread.sleep(60000); -/* ExpectedException exp1 = addExpectedException(CacheClosedException.class - .getName()); - try */{ - AsyncInvocation inv1 = vm7.invokeAsync(() -> ReplicatedRegion_ParallelWANPropagationDUnitTest.doPuts0( - getTestMethodName() + "_RR", 10000 )); - Thread.sleep(1000); - AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender()); - Thread.sleep(2000); - AsyncInvocation inv3 = vm6.invokeAsync(() -> ReplicatedRegion_ParallelWANPropagationDUnitTest.doPuts1( - getTestMethodName() + "_RR", 10000 )); - Thread.sleep(1500); - AsyncInvocation inv4 = vm5.invokeAsync(() -> WANTestBase.killSender()); - try { - inv1.join(); - inv2.join(); - inv3.join(); - inv4.join(); - } - catch (Exception e) { - Assert.fail("UnExpected Exception", e); - } - }/* - finally { - exp1.remove(); - }*/ - - vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" )); - vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 10000 )); - - } - - public static void doPuts0(String regionName, int numPuts) { - IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class - .getName()); - IgnoredException exp1 = IgnoredException.addIgnoredException(CacheClosedException.class - .getName()); - try { - - Region r = cache.getRegion(Region.SEPARATOR + regionName); - assertNotNull(r); - for (long i = 0; i < numPuts; i++) { - LogWriterUtils.getLogWriter().info("Put : key : " + i); - r.put(i, "0_" + i); - } - } finally { - exp.remove(); - exp1.remove(); - } - } - - public static void doPuts1(String regionName, int numPuts){ - IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class - .getName()); - IgnoredException exp1 = IgnoredException.addIgnoredException(CacheClosedException.class - .getName()); - try { - - Region r = cache.getRegion(Region.SEPARATOR + regionName); - assertNotNull(r); - for (long i = 0; i < numPuts; i++) { - LogWriterUtils.getLogWriter().info("Put : key : " + i); - r.put(i, "1_" + i); - } - } finally { - exp.remove(); - exp1.remove(); - } - } - - public static void doPuts2(String regionName, int numPuts){ - IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class - .getName()); - IgnoredException exp1 = IgnoredException.addIgnoredException(CacheClosedException.class - .getName()); - try { - Region r = cache.getRegion(Region.SEPARATOR + regionName); - assertNotNull(r); - for (long i = 0; i < numPuts; i++) { - LogWriterUtils.getLogWriter().info("Put : key : " + i); - r.put(i, "2_" + i); - } - } finally { - exp.remove(); - exp1.remove(); - } - } - - /** - * Test to validate that put on DR with no ack on multiple nodes are propagated to parallelQueue on multiple nodes - */ - - /** - * Test to validate that the single put in DR is propagated to remote site through parallelGatewaySender - */ - - -}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/SenderWithTransportFilterDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/SenderWithTransportFilterDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/SenderWithTransportFilterDUnitTest.java deleted file mode 100644 index 0ee78c5..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/SenderWithTransportFilterDUnitTest.java +++ /dev/null @@ -1,228 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.misc; - -import static com.gemstone.gemfire.distributed.ConfigurationProperties.*; -import static com.gemstone.gemfire.test.dunit.Assert.*; - -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Properties; -import java.util.zip.Adler32; -import java.util.zip.CheckedInputStream; -import java.util.zip.CheckedOutputStream; - -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import com.gemstone.gemfire.cache.CacheFactory; -import com.gemstone.gemfire.cache.DiskStore; -import com.gemstone.gemfire.cache.DiskStoreFactory; -import com.gemstone.gemfire.cache.wan.GatewayReceiver; -import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory; -import com.gemstone.gemfire.cache.wan.GatewaySenderFactory; -import com.gemstone.gemfire.cache.wan.GatewayTransportFilter; -import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; -import com.gemstone.gemfire.internal.AvailablePortHelper; -import com.gemstone.gemfire.internal.cache.wan.InternalGatewaySenderFactory; -import com.gemstone.gemfire.internal.cache.wan.WANTestBase; -import com.gemstone.gemfire.test.dunit.VM; -import com.gemstone.gemfire.test.junit.categories.DistributedTest; - -@Category(DistributedTest.class) -public class SenderWithTransportFilterDUnitTest extends WANTestBase { - - @Test - public void testSerialSenderWithTransportFilter() { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - vm2.invoke(() -> SenderWithTransportFilterDUnitTest.createReceiverWithTransportFilters( nyPort )); - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - vm3.invoke(() -> WANTestBase.createCache( lnPort )); - - vm3.invoke(() -> SenderWithTransportFilterDUnitTest.createSenderWithTransportFilter( "ln", 2, false, 100, - 1, false, false, true )); - - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - vm3.invoke(() -> WANTestBase.startSender( "ln" )); - - vm3.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", 100 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 100 )); - } - - @Test - public void testParallelSenderWithTransportFilter() { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - vm2.invoke(() -> SenderWithTransportFilterDUnitTest.createReceiverWithTransportFilters( nyPort )); - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 0, 10, isOffHeap() )); - - vm3.invoke(() -> WANTestBase.createCache( lnPort )); - - vm3.invoke(() -> SenderWithTransportFilterDUnitTest.createSenderWithTransportFilter( "ln", 2, true, 100, - 1, false, false, true )); - - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 0, 10, isOffHeap() )); - - vm3.invoke(() -> WANTestBase.startSender( "ln" )); - - vm3.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 100 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 100 )); - } - - public static int createReceiverWithTransportFilters(int locPort) { - WANTestBase test = new WANTestBase(); - Properties props = test.getDistributedSystemProperties(); - props.setProperty(MCAST_PORT, "0"); - props.setProperty(LOCATORS, "localhost[" + locPort - + "]"); - - InternalDistributedSystem ds = test.getSystem(props); - cache = CacheFactory.create(ds); - GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); - int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); - fact.setStartPort(port); - fact.setEndPort(port); - ArrayList<GatewayTransportFilter> transportFilters = new ArrayList<GatewayTransportFilter>(); - transportFilters.add(new CheckSumTransportFilter("CheckSumTransportFilter")); - if (!transportFilters.isEmpty()) { - for (GatewayTransportFilter filter : transportFilters) { - fact.addGatewayTransportFilter(filter); - } - } - GatewayReceiver receiver = fact.create(); - try { - receiver.start(); - } - catch (IOException e) { - fail("Test " + test.getName() + " failed to start GatewayReceiver on port " + port, e); - } - return port; - } - - public static void createSenderWithTransportFilter(String dsName, - int remoteDsId, boolean isParallel, Integer maxMemory, Integer batchSize, - boolean isConflation, boolean isPersistent, boolean isManualStart) { - File persistentDirectory = new File(dsName + "_disk_" - + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); - persistentDirectory.mkdir(); - DiskStoreFactory dsf = cache.createDiskStoreFactory(); - File[] dirs1 = new File[] { persistentDirectory }; - - if (isParallel) { - GatewaySenderFactory gateway = cache - .createGatewaySenderFactory(); - gateway.setParallel(true); - gateway.setMaximumQueueMemory(maxMemory); - gateway.setBatchSize(batchSize); - ((InternalGatewaySenderFactory)gateway).setLocatorDiscoveryCallback(new MyLocatorCallback()); - ArrayList<GatewayTransportFilter> transportFilters = new ArrayList<GatewayTransportFilter>(); - transportFilters.add(new CheckSumTransportFilter("CheckSumTransportFilter")); - if (!transportFilters.isEmpty()) { - for (GatewayTransportFilter filter : transportFilters) { - gateway.addGatewayTransportFilter(filter); - } - } - if (isPersistent) { - gateway.setPersistenceEnabled(true); - gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName) - .getName()); - } - else { - DiskStore store = dsf.setDiskDirs(dirs1).create(dsName); - gateway.setDiskStoreName(store.getName()); - } - gateway.setBatchConflationEnabled(isConflation); - gateway.create(dsName, remoteDsId); - - } - else { - GatewaySenderFactory gateway = cache - .createGatewaySenderFactory(); - gateway.setMaximumQueueMemory(maxMemory); - gateway.setBatchSize(batchSize); - gateway.setManualStart(isManualStart); - ((InternalGatewaySenderFactory)gateway) - .setLocatorDiscoveryCallback(new MyLocatorCallback()); - ArrayList<GatewayTransportFilter> transportFilters = new ArrayList<GatewayTransportFilter>(); - transportFilters.add(new CheckSumTransportFilter("CheckSumTransportFilter")); - if (!transportFilters.isEmpty()) { - for (GatewayTransportFilter filter : transportFilters) { - gateway.addGatewayTransportFilter(filter); - } - } - gateway.setBatchConflationEnabled(isConflation); - if (isPersistent) { - gateway.setPersistenceEnabled(true); - gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName) - .getName()); - } - else { - DiskStore store = dsf.setDiskDirs(dirs1).create(dsName); - gateway.setDiskStoreName(store.getName()); - } - gateway.create(dsName, remoteDsId); - } - } - - static class CheckSumTransportFilter implements GatewayTransportFilter { - - Adler32 checker = new Adler32(); - - private String name; - - public CheckSumTransportFilter(String name){ - this.name = name; - } - - @Override - public String toString(){ - return this.name; - } - - @Override - public InputStream getInputStream(InputStream stream) { - return new CheckedInputStream(stream, checker); - } - - @Override - public OutputStream getOutputStream(OutputStream stream) { - return new CheckedOutputStream(stream, checker); - } - - @Override - public void close() { - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ShutdownAllPersistentGatewaySenderDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ShutdownAllPersistentGatewaySenderDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ShutdownAllPersistentGatewaySenderDUnitTest.java deleted file mode 100644 index 6e2581e..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ShutdownAllPersistentGatewaySenderDUnitTest.java +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.misc; - -import static org.junit.Assert.*; - -import java.util.Set; - -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import com.gemstone.gemfire.admin.AdminDistributedSystemFactory; -import com.gemstone.gemfire.admin.AdminException; -import com.gemstone.gemfire.admin.DistributedSystemConfig; -import com.gemstone.gemfire.admin.internal.AdminDistributedSystemImpl; -import com.gemstone.gemfire.cache.Region; -import com.gemstone.gemfire.internal.cache.CacheObserverAdapter; -import com.gemstone.gemfire.internal.cache.CacheObserverHolder; -import com.gemstone.gemfire.internal.cache.LocalRegion; -import com.gemstone.gemfire.internal.cache.wan.WANTestBase; -import com.gemstone.gemfire.test.dunit.AsyncInvocation; -import com.gemstone.gemfire.test.dunit.IgnoredException; -import com.gemstone.gemfire.test.dunit.LogWriterUtils; -import com.gemstone.gemfire.test.dunit.SerializableRunnable; -import com.gemstone.gemfire.test.dunit.VM; -import com.gemstone.gemfire.test.dunit.Wait; -import com.gemstone.gemfire.test.dunit.WaitCriterion; -import com.gemstone.gemfire.test.junit.categories.DistributedTest; - -@Category(DistributedTest.class) -public class ShutdownAllPersistentGatewaySenderDUnitTest extends WANTestBase { - - private static final long MAX_WAIT = 70000; - - private static final int NUM_KEYS = 1000; - - public ShutdownAllPersistentGatewaySenderDUnitTest() { - super(); - } - - @Override - protected final void postSetUpWANTestBase() throws Exception { - IgnoredException.addIgnoredException("Cache is being closed by ShutdownAll"); - } - - private static final long serialVersionUID = 1L; - - @Test - public void testGatewaySender() throws Exception { - IgnoredException.addIgnoredException("Cache is shutting down"); - - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - vm2.invoke(() -> WANTestBase.createCache( nyPort )); - vm3.invoke(() -> WANTestBase.createCache( nyPort )); - vm2.invoke(() -> WANTestBase.createReceiver()); - - vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 400, false, false, null, true )); - - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - - vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - - // set the CacheObserver to block the ShutdownAll - SerializableRunnable waitAtShutdownAll = new SerializableRunnable() { - @Override - public void run() { - LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true; - CacheObserverHolder.setInstance(new CacheObserverAdapter() { - @Override - public void beforeShutdownAll() { - final Region region = cache.getRegion(getTestMethodName() + "_PR"); - Wait.waitForCriterion(new WaitCriterion() { - @Override - public boolean done() { - return region.size() >= 2; - } - - @Override - public String description() { - return "Wait for wan to have processed several events"; - } - }, 30000, 100, true); - } - }); - } - }; - vm2.invoke(waitAtShutdownAll); - vm3.invoke(waitAtShutdownAll); - - AsyncInvocation vm4_future = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", NUM_KEYS )); - - // ShutdownAll will be suspended at observer, so puts will continue - AsyncInvocation future = shutDownAllMembers(vm2, 2, MAX_WAIT); - future.join(MAX_WAIT); - - // now restart vm1 with gatewayHub - LogWriterUtils.getLogWriter().info("restart in VM2"); - vm2.invoke(() -> WANTestBase.createCache( nyPort )); - vm3.invoke(() -> WANTestBase.createCache( nyPort )); - AsyncInvocation vm3_future = vm3.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", - "ln", 1, 100, isOffHeap() )); - vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm3_future.join(MAX_WAIT); - - vm3.invoke(new SerializableRunnable() { - public void run() { - final Region region = cache.getRegion(getTestMethodName() + "_PR"); - cache.getLogger().info( - "vm1's region size before restart gatewayHub is " + region.size()); - } - }); - vm2.invoke(() -> WANTestBase.createReceiver()); - - // wait for vm0 to finish its work - vm4_future.join(MAX_WAIT); - vm4.invoke(new SerializableRunnable() { - public void run() { - Region region = cache.getRegion(getTestMethodName() + "_PR"); - assertEquals(NUM_KEYS, region.size()); - } - }); - - // verify the other side (vm1)'s entries received from gateway - vm2.invoke(new SerializableRunnable() { - public void run() { - final Region region = cache.getRegion(getTestMethodName() + "_PR"); - - cache.getLogger().info( - "vm1's region size after restart gatewayHub is " + region.size()); - Wait.waitForCriterion(new WaitCriterion() { - public boolean done() { - Object lastValue = region.get(NUM_KEYS - 1); - if (lastValue != null && lastValue.equals(NUM_KEYS - 1)) { - region.getCache().getLogger().info( - "Last key has arrived, its value is " + lastValue - + ", end of wait."); - return true; - } - else - return (region.size() == NUM_KEYS); - } - - public String description() { - return "Waiting for destination region to reach size: " + NUM_KEYS - + ", current is " + region.size(); - } - }, MAX_WAIT, 100, true); - assertEquals(NUM_KEYS, region.size()); - } - }); - - } - - private AsyncInvocation shutDownAllMembers(VM vm, final int expectedNumber, final long timeout) { - AsyncInvocation future = vm.invokeAsync(new SerializableRunnable("Shutdown all the members") { - - public void run() { - DistributedSystemConfig config; - AdminDistributedSystemImpl adminDS = null; - try { - config = AdminDistributedSystemFactory.defineDistributedSystem(cache - .getDistributedSystem(), ""); - adminDS = (AdminDistributedSystemImpl)AdminDistributedSystemFactory - .getDistributedSystem(config); - adminDS.connect(); - Set members = adminDS.shutDownAllMembers(timeout); - int num = members == null ? 0 : members.size(); - assertEquals(expectedNumber, num); - } - catch (AdminException e) { - throw new RuntimeException(e); - } - finally { - if (adminDS != null) { - adminDS.disconnect(); - } - } - } - }); - return future; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/WANConfigurationJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/WANConfigurationJUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/WANConfigurationJUnitTest.java deleted file mode 100644 index b504f87..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/WANConfigurationJUnitTest.java +++ /dev/null @@ -1,601 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.misc; - -import com.gemstone.gemfire.cache.*; -import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener; -import com.gemstone.gemfire.cache.wan.*; -import com.gemstone.gemfire.cache30.MyGatewayEventFilter1; -import com.gemstone.gemfire.cache30.MyGatewayTransportFilter1; -import com.gemstone.gemfire.cache30.MyGatewayTransportFilter2; -import com.gemstone.gemfire.internal.AvailablePortHelper; -import com.gemstone.gemfire.internal.cache.LocalRegion; -import com.gemstone.gemfire.internal.cache.wan.GatewayReceiverException; -import com.gemstone.gemfire.internal.cache.wan.GatewaySenderException; -import com.gemstone.gemfire.internal.cache.wan.InternalGatewaySenderFactory; -import com.gemstone.gemfire.internal.cache.wan.MyGatewaySenderEventListener; -import com.gemstone.gemfire.internal.i18n.LocalizedStrings; -import com.gemstone.gemfire.test.junit.categories.IntegrationTest; -import org.junit.After; -import org.junit.Ignore; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import java.io.IOException; -import java.util.HashSet; -import java.util.Set; - -import static com.gemstone.gemfire.distributed.ConfigurationProperties.MCAST_PORT; -import static org.junit.Assert.*; - -@Category(IntegrationTest.class) -public class WANConfigurationJUnitTest { - - private Cache cache; - - /** - * Test to validate that the sender can not be started without configuring - * locator - * @throws IOException - * - * @throws IOException - */ - @Test - public void test_GatewaySender_without_Locator() throws IOException { - try { - cache = new CacheFactory().set(MCAST_PORT, "0").create(); - GatewaySenderFactory fact = cache.createGatewaySenderFactory(); - fact.setParallel(true); - GatewaySender sender1 = fact.create("NYSender", 2); - sender1.start(); - fail("Expected IllegalStateException but not thrown"); - } - catch (Exception e) { - if ((e instanceof IllegalStateException && e - .getMessage() - .startsWith( - LocalizedStrings.AbstractGatewaySender_LOCATOR_SHOULD_BE_CONFIGURED_BEFORE_STARTING_GATEWAY_SENDER - .toLocalizedString()))) { - } - else { - fail("Expected IllegalStateException but received :" + e); - } - } - } - - /** - * Test to validate that sender with same Id can not be added to cache. - */ - @Test - public void test_SameGatewaySenderCreatedTwice() { - cache = new CacheFactory().set(MCAST_PORT, "0").create(); - try { - GatewaySenderFactory fact = cache.createGatewaySenderFactory(); - fact.setParallel(true); - fact.setManualStart(true); - fact.create("NYSender", 2); - fact.create("NYSender", 2); - fail("Expected IllegalStateException but not thrown"); - } - catch (Exception e) { - if (e instanceof IllegalStateException - && e.getMessage().contains("A GatewaySender with id")) { - - } - else { - fail("Expected IllegalStateException but received :" + e); - } - } - } - - /** - * Test to validate that same gatewaySender Id can not be added to the region attributes. - */ - @Test - public void test_SameGatewaySenderIdAddedTwice() { - try { - cache = new CacheFactory().set(MCAST_PORT, "0").create(); - GatewaySenderFactory fact = cache.createGatewaySenderFactory(); - fact.setParallel(true); - fact.setManualStart(true); - GatewaySender sender1 = fact.create("NYSender", 2); - AttributesFactory factory = new AttributesFactory(); - factory.addGatewaySenderId(sender1.getId()); - factory.addGatewaySenderId(sender1.getId()); - fail("Expected IllegalArgumentException but not thrown"); - } - catch (Exception e) { - if (e instanceof IllegalArgumentException - && e.getMessage().contains("is already added")) { - - } - else { - fail("Expected IllegalStateException but received :" + e); - } - } - } - - @Test - public void test_GatewaySenderIdAndAsyncEventId() { - cache = new CacheFactory().set(MCAST_PORT, "0").create(); - AttributesFactory factory = new AttributesFactory(); - factory.addGatewaySenderId("ln"); - factory.addGatewaySenderId("ny"); - factory.addAsyncEventQueueId("Async_LN"); - RegionAttributes attrs = factory.create(); - - Set<String> senderIds = new HashSet<String>(); - senderIds.add("ln"); - senderIds.add("ny"); - Set<String> attrsSenderIds = attrs.getGatewaySenderIds(); - assertEquals(senderIds, attrsSenderIds); - Region r = cache.createRegion("Customer", attrs); - assertEquals(senderIds, ((LocalRegion)r).getGatewaySenderIds()); - } - - /** - * Test to validate that distributed region can not have the gateway sender - * with parallel distribution policy - * - */ - @Ignore("Bug51491") - @Test - public void test_GatewaySender_Parallel_DistributedRegion() { - cache = new CacheFactory().set(MCAST_PORT, "0").create(); - GatewaySenderFactory fact = cache.createGatewaySenderFactory(); - fact.setParallel(true); - fact.setManualStart(true); - GatewaySender sender1 = fact.create("NYSender", 2); - AttributesFactory factory = new AttributesFactory(); - factory.addGatewaySenderId(sender1.getId()); - factory.setScope(Scope.DISTRIBUTED_ACK); - factory.setDataPolicy(DataPolicy.REPLICATE); - try { - RegionFactory regionFactory = cache.createRegionFactory(factory.create()); - Region region = regionFactory - .create("test_GatewaySender_Parallel_DistributedRegion"); - } - catch (Exception e) { - fail("Unexpected Exception :" + e); - } - } - - @Test - public void test_GatewaySender_Parallel_MultipleDispatcherThread() { - cache = new CacheFactory().set(MCAST_PORT, "0").create(); - GatewaySenderFactory fact = cache.createGatewaySenderFactory(); - fact.setParallel(true); - fact.setManualStart(true); - fact.setDispatcherThreads(4); - try { - GatewaySender sender1 = fact.create("NYSender", 2); - } - catch (GatewaySenderException e) { - fail("UnExpected Exception " + e); - } - } - - @Test - public void test_GatewaySender_Serial_ZERO_DispatcherThread() { - cache = new CacheFactory().set(MCAST_PORT, "0").create(); - GatewaySenderFactory fact = cache.createGatewaySenderFactory(); - fact.setManualStart(true); - fact.setDispatcherThreads(0); - try { - GatewaySender sender1 = fact.create("NYSender", 2); - fail("Expected GatewaySenderException but not thrown"); - } - catch (GatewaySenderException e) { - if (e.getMessage().contains("can not be created with dispatcher threads less than 1")) { - } - else { - fail("Expected IllegalStateException but received :" + e); - } - } - } - - /** - * Test to validate the gateway receiver attributes are correctly set - */ - @Test - public void test_ValidateGatewayReceiverAttributes() { - cache = new CacheFactory().set(MCAST_PORT, "0").create(); - int[] randomAvailableTCPPorts = AvailablePortHelper.getRandomAvailableTCPPorts(2); - int port1 = randomAvailableTCPPorts[0]; - int port2 = randomAvailableTCPPorts[1]; - - GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); - if(port1 < port2){ - fact.setStartPort(port1); - fact.setEndPort(port2); - }else{ - fact.setStartPort(port2); - fact.setEndPort(port1); - } - - fact.setMaximumTimeBetweenPings(2000); - fact.setSocketBufferSize(200); - GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1(); - GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2(); - fact.addGatewayTransportFilter(myStreamFilter2); - fact.addGatewayTransportFilter(myStreamFilter1); - GatewayReceiver receiver1 = fact.create(); - - - Region region = cache.createRegionFactory().create( - "test_ValidateGatewayReceiverAttributes"); - Set<GatewayReceiver> receivers = cache.getGatewayReceivers(); - GatewayReceiver rec = receivers.iterator().next(); - assertEquals(receiver1.getHost(), rec.getHost()); - assertEquals(receiver1.getStartPort(), rec.getStartPort()); - assertEquals(receiver1.getEndPort(), rec.getEndPort()); - assertEquals(receiver1.getBindAddress(), rec.getBindAddress()); - assertEquals(receiver1.getMaximumTimeBetweenPings(), rec - .getMaximumTimeBetweenPings()); - assertEquals(receiver1.getSocketBufferSize(), rec - .getSocketBufferSize()); - assertEquals(receiver1.getGatewayTransportFilters().size(), rec - .getGatewayTransportFilters().size()); - - } - - @Test - public void test_ValidateGatewayReceiverStatus() { - cache = new CacheFactory().set(MCAST_PORT, "0").create(); - int[] randomAvailableTCPPorts = AvailablePortHelper.getRandomAvailableTCPPorts(2); - int port1 = randomAvailableTCPPorts[0]; - int port2 = randomAvailableTCPPorts[1]; - - GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); - if(port1 < port2){ - fact.setStartPort(port1); - fact.setEndPort(port2); - }else{ - fact.setStartPort(port2); - fact.setEndPort(port1); - } - - fact.setMaximumTimeBetweenPings(2000); - fact.setSocketBufferSize(200); - GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1(); - GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2(); - fact.addGatewayTransportFilter(myStreamFilter2); - fact.addGatewayTransportFilter(myStreamFilter1); - GatewayReceiver receiver1 = fact.create(); - assertTrue(receiver1.isRunning()); - } - - /** - * Test to validate that serial gateway sender attributes are correctly set - */ - @Test - public void test_ValidateSerialGatewaySenderAttributes() { - cache = new CacheFactory().set(MCAST_PORT, "0").create(); - GatewaySenderFactory fact = cache.createGatewaySenderFactory(); - fact.setManualStart(true); - fact.setBatchConflationEnabled(true); - fact.setBatchSize(200); - fact.setBatchTimeInterval(300); - fact.setPersistenceEnabled(false); - fact.setDiskStoreName("FORNY"); - fact.setMaximumQueueMemory(200); - fact.setAlertThreshold(1200); - GatewayEventFilter myEventFilter1 = new MyGatewayEventFilter1(); - fact.addGatewayEventFilter(myEventFilter1); - GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1(); - fact.addGatewayTransportFilter(myStreamFilter1); - GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2(); - fact.addGatewayTransportFilter(myStreamFilter2); - GatewaySender sender1 = fact.create("TKSender", 2); - - - AttributesFactory factory = new AttributesFactory(); - factory.addGatewaySenderId(sender1.getId()); - factory.setDataPolicy(DataPolicy.PARTITION); - Region region = cache.createRegionFactory(factory.create()).create( - "test_ValidateGatewaySenderAttributes"); - Set<GatewaySender> senders = cache.getGatewaySenders(); - assertEquals(senders.size(), 1); - GatewaySender gatewaySender = senders.iterator().next(); - assertEquals(sender1.getRemoteDSId(), gatewaySender - .getRemoteDSId()); - assertEquals(sender1.isManualStart(), gatewaySender.isManualStart()); - assertEquals(sender1.isBatchConflationEnabled(), gatewaySender - .isBatchConflationEnabled()); - assertEquals(sender1.getBatchSize(), gatewaySender.getBatchSize()); - assertEquals(sender1.getBatchTimeInterval(), gatewaySender - .getBatchTimeInterval()); - assertEquals(sender1.isPersistenceEnabled(), gatewaySender - .isPersistenceEnabled()); - assertEquals(sender1.getDiskStoreName(), gatewaySender.getDiskStoreName()); - assertEquals(sender1.getMaximumQueueMemory(), gatewaySender - .getMaximumQueueMemory()); - assertEquals(sender1.getAlertThreshold(), gatewaySender - .getAlertThreshold()); - assertEquals(sender1.getGatewayEventFilters().size(), gatewaySender - .getGatewayEventFilters().size()); - assertEquals(sender1.getGatewayTransportFilters().size(), gatewaySender - .getGatewayTransportFilters().size()); - - } - - /** - * Test to validate that parallel gateway sender attributes are correctly set - */ - @Test - public void test_ValidateParallelGatewaySenderAttributes() { - cache = new CacheFactory().set(MCAST_PORT, "0").create(); - GatewaySenderFactory fact = cache.createGatewaySenderFactory(); - fact.setParallel(true); - fact.setManualStart(true); - fact.setBatchConflationEnabled(true); - fact.setBatchSize(200); - fact.setBatchTimeInterval(300); - fact.setPersistenceEnabled(false); - fact.setDiskStoreName("FORNY"); - fact.setMaximumQueueMemory(200); - fact.setAlertThreshold(1200); - GatewayEventFilter myEventFilter1 = new MyGatewayEventFilter1(); - fact.addGatewayEventFilter(myEventFilter1); - GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1(); - fact.addGatewayTransportFilter(myStreamFilter1); - GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2(); - fact.addGatewayTransportFilter(myStreamFilter2); - GatewaySender sender1 = fact.create("TKSender", 2); - - - AttributesFactory factory = new AttributesFactory(); - factory.addGatewaySenderId(sender1.getId()); - factory.setDataPolicy(DataPolicy.PARTITION); - Region region = cache.createRegionFactory(factory.create()).create( - "test_ValidateGatewaySenderAttributes"); - Set<GatewaySender> senders = cache.getGatewaySenders(); - assertEquals(1, senders.size()); - GatewaySender gatewaySender = senders.iterator().next(); - assertEquals(sender1.getRemoteDSId(), gatewaySender - .getRemoteDSId()); - assertEquals(sender1.isManualStart(), gatewaySender.isManualStart()); - assertEquals(sender1.isBatchConflationEnabled(), gatewaySender - .isBatchConflationEnabled()); - assertEquals(sender1.getBatchSize(), gatewaySender.getBatchSize()); - assertEquals(sender1.getBatchTimeInterval(), gatewaySender - .getBatchTimeInterval()); - assertEquals(sender1.isPersistenceEnabled(), gatewaySender - .isPersistenceEnabled()); - assertEquals(sender1.getDiskStoreName(), gatewaySender.getDiskStoreName()); - assertEquals(sender1.getMaximumQueueMemory(), gatewaySender - .getMaximumQueueMemory()); - assertEquals(sender1.getAlertThreshold(), gatewaySender - .getAlertThreshold()); - assertEquals(sender1.getGatewayEventFilters().size(), gatewaySender - .getGatewayEventFilters().size()); - assertEquals(sender1.getGatewayTransportFilters().size(), gatewaySender - .getGatewayTransportFilters().size()); - - } - - @Test - public void test_GatewaySenderWithGatewaySenderEventListener1() { - cache = new CacheFactory().set(MCAST_PORT, "0").create(); - InternalGatewaySenderFactory fact = (InternalGatewaySenderFactory)cache.createGatewaySenderFactory(); - AsyncEventListener listener = new MyGatewaySenderEventListener(); - ((InternalGatewaySenderFactory)fact).addAsyncEventListener(listener); - try { - fact.create("ln", 2); - fail("Expected GatewaySenderException. When a sender is added , remoteDSId should not be provided."); - } catch (Exception e) { - if (e instanceof GatewaySenderException - && e.getMessage() - .contains( - "cannot define a remote site because at least AsyncEventListener is already added.")) { - - } else { - fail("Expected GatewaySenderException but received :" + e); - } - } - } - - @Test - public void test_GatewaySenderWithGatewaySenderEventListener2() { - cache = new CacheFactory().set(MCAST_PORT, "0").create(); - GatewaySenderFactory fact = cache.createGatewaySenderFactory(); - AsyncEventListener listener = new MyGatewaySenderEventListener(); - ((InternalGatewaySenderFactory)fact).addAsyncEventListener(listener); - try { - ((InternalGatewaySenderFactory)fact).create("ln"); - } catch (Exception e) { - fail("Received Exception :" + e); - } - } - - @Test - public void test_ValidateGatewayReceiverAttributes_2() { - cache = new CacheFactory().set(MCAST_PORT, "0").create(); - GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); - fact.setStartPort(50504); - fact.setMaximumTimeBetweenPings(1000); - fact.setSocketBufferSize(4000); - fact.setEndPort(70707); - fact.setManualStart(true); - GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1(); - fact.addGatewayTransportFilter(myStreamFilter1); - - GatewayReceiver receiver = fact.create(); - try { - receiver.start(); - } - catch (IOException e) { - fail("The test failed with IOException"); - } - - assertEquals(50504, receiver.getStartPort()); - assertEquals(1000, receiver.getMaximumTimeBetweenPings()); - assertEquals(4000,receiver.getSocketBufferSize()); - assertEquals(70707, receiver.getEndPort()); - } - - /** - * This test takes a minimum of 120s to execute. It is known to hang on Mac OS - * X Yosemite do to changes in the the message string checked in - * GatewayReceiverImpl around line 167. Expects - * "Cannot assign requested address" but gets - * "Can't assign requested address". Timeout after 150s to safeguard against - * hanging on other platforms that may differ. - */ - @Test(timeout = 150000) - public void test_ValidateGatewayReceiverAttributes_WrongBindAddress() { - if (System.getProperty("os.name").equals("Mac OS X")) { - fail("Failing to avoid known hang on Mac OS X."); - } - cache = new CacheFactory().set(MCAST_PORT, "0").create(); - GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); - fact.setStartPort(50504); - fact.setMaximumTimeBetweenPings(1000); - fact.setSocketBufferSize(4000); - fact.setEndPort(70707); - fact.setManualStart(true); - fact.setBindAddress("200.112.204.10"); - GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1(); - fact.addGatewayTransportFilter(myStreamFilter1); - - GatewayReceiver receiver = fact.create(); - try { - receiver.start(); - fail("Expected GatewayReceiverException"); - } - catch (GatewayReceiverException gRE){ - assertTrue(gRE.getMessage().contains("Failed to create server socket on")); - } - catch (IOException e) { - e.printStackTrace(); - fail("The test failed with IOException"); - } - } - - @Test - public void test_ValidateGatewayReceiverDefaultStartPortAndDefaultEndPort() { - cache = new CacheFactory().set(MCAST_PORT, "0").create(); - GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); - fact.setMaximumTimeBetweenPings(1000); - fact.setSocketBufferSize(4000); - fact.setManualStart(true); - GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1(); - fact.addGatewayTransportFilter(myStreamFilter1); - - GatewayReceiver receiver = fact.create(); - try { - receiver.start(); - } - catch (IOException e) { - fail("The test failed with IOException"); - } - int port = receiver.getPort(); - if((port < 5000) || (port > 5500)) { - fail("GatewayReceiver started on out of range port"); - } - } - - @Test - public void test_ValidateGatewayReceiverDefaultStartPortAndEndPortProvided() { - cache = new CacheFactory().set(MCAST_PORT, "0").create(); - GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); - fact.setMaximumTimeBetweenPings(1000); - fact.setSocketBufferSize(4000); - fact.setEndPort(50707); - fact.setManualStart(true); - GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1(); - fact.addGatewayTransportFilter(myStreamFilter1); - - GatewayReceiver receiver = fact.create(); - try { - receiver.start(); - } - catch (IOException e) { - fail("The test failed with IOException"); - } - int port = receiver.getPort(); - if((port < GatewayReceiver.DEFAULT_START_PORT) || (port > 50707)) { - fail("GatewayReceiver started on out of range port"); - } - } - - @Test - public void test_ValidateGatewayReceiverWithManualStartFALSE() { - cache = new CacheFactory().set(MCAST_PORT, "0").create(); - GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); - fact.setMaximumTimeBetweenPings(1000); - fact.setSocketBufferSize(4000); - fact.setStartPort(5303); - fact.setManualStart(false); - GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1(); - fact.addGatewayTransportFilter(myStreamFilter1); - GatewayReceiver receiver = fact.create(); - int port = receiver.getPort(); - if ((port < 5303) || (port > GatewayReceiver.DEFAULT_END_PORT)) { - fail("GatewayReceiver started on out of range port"); - } - } - - @Test - public void test_ValidateGatewayReceiverWithStartPortAndDefaultEndPort() { - cache = new CacheFactory().set(MCAST_PORT, "0").create(); - GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); - fact.setMaximumTimeBetweenPings(1000); - fact.setSocketBufferSize(4000); - fact.setStartPort(5303); - fact.setManualStart(true); - GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1(); - fact.addGatewayTransportFilter(myStreamFilter1); - - GatewayReceiver receiver = fact.create(); - try { - receiver.start(); - } - catch (IOException e) { - fail("The test failed with IOException"); - } - int port = receiver.getPort(); - if ((port < 5303) || (port > GatewayReceiver.DEFAULT_END_PORT)) { - fail("GatewayReceiver started on out of range port"); - } - } - - @Test - public void test_ValidateGatewayReceiverWithWrongEndPortProvided() { - cache = new CacheFactory().set(MCAST_PORT, "0").create(); - try { - GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); - fact.setMaximumTimeBetweenPings(1000); - fact.setSocketBufferSize(4000); - fact.setEndPort(4999); - GatewayReceiver receiver = fact.create(); - fail("wrong end port set in the GatewayReceiver"); - } catch (IllegalStateException expected) { - if(!expected.getMessage().contains("Please specify either start port a value which is less than end port.")){ - fail("Caught IllegalStateException"); - expected.printStackTrace(); - } - } - } - - @After - public void tearDown() throws Exception { - if (this.cache != null) { - this.cache.close(); - } - } -}
