http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/PDXNewWanDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/PDXNewWanDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/PDXNewWanDUnitTest.java new file mode 100644 index 0000000..17c76ed --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/PDXNewWanDUnitTest.java @@ -0,0 +1,767 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.misc; + +import org.junit.Ignore; +import org.junit.experimental.categories.Category; +import org.junit.Test; + +import static org.junit.Assert.*; + +import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; +import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; +import com.gemstone.gemfire.test.dunit.IgnoredException; +import com.gemstone.gemfire.test.dunit.Wait; + +@Category(DistributedTest.class) +public class PDXNewWanDUnitTest extends WANTestBase{ + + private static final long serialVersionUID = 1L; + + public PDXNewWanDUnitTest() { + super(); + } + + /** + * Test + * 1> Site 1 : 1 locator, 1 member + * 2> Site 2 : 1 locator, 1 member + * 3> DR is defined on member 1 on site1 + * 4> Serial GatewaySender is defined on member 1 on site1 + * 5> Same DR is defined on site2 member 1 + * 6> Put is done with value which is PDXSerializable + * 7> Validate whether other sites member receive this put operation. + */ + @Test + public void testWANPDX_RR_SerialSender() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + createCacheInVMs(nyPort, vm2); + vm2.invoke(() -> WANTestBase.createReceiver()); + + createCacheInVMs(lnPort, vm3); + + vm3.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + vm3.invoke(() -> WANTestBase.startSender( "ln" )); + + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + vm3.invoke(() -> WANTestBase.doPutsPDXSerializable( getTestMethodName() + "_RR", + 1 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize_PDX( + getTestMethodName() + "_RR", 1 )); + } + + /** + * Test + * 1> Site 1 : 1 locator, 1 member + * 2> Site 2 : 1 locator, 1 member + * 3> DR is defined on member 1 on site1 + * 4> Serial GatewaySender is defined on member 1 on site1 + * 5> Same DR is defined on site2 member 1 + * 6> Put is done with value which is PDXSerializable + * 7> Validate whether other sites member receive this put operation. + * 8> Bounce site 1 and delete all of it's data + * 9> Make sure that site 1 get the the PDX types along with entries + * and can deserialize entries. + */ + @Test + public void testWANPDX_RemoveRemoteData() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + vm2.invoke(() -> WANTestBase.createReceiver_PDX( nyPort )); + + vm3.invoke(() -> WANTestBase.createCache_PDX( lnPort )); + + vm3.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + vm3.invoke(() -> WANTestBase.startSender( "ln" )); + + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + vm3.invoke(() -> WANTestBase.doPutsPDXSerializable( getTestMethodName() + "_RR", + 1 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize_PDX( + getTestMethodName() + "_RR", 1 )); + + + //bounce vm2 + vm2.invoke(() -> WANTestBase.closeCache()); + + vm2.invoke(() -> WANTestBase.deletePDXDir()); + + vm2.invoke(() -> WANTestBase.createReceiver_PDX( nyPort )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + vm3.invoke(() -> WANTestBase.doPutsPDXSerializable( getTestMethodName() + "_RR", + 2 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize_PDX( + getTestMethodName() + "_RR", 2 )); + } + + /** + * Test + * 1> Site 1 : 1 locator, 1 member + * 2> Site 2 : 1 locator, 1 member + * 3> DR is defined on member 1 on site1 + * 4> Serial GatewaySender is defined on member 1 on site1 + * 5> Same DR is defined on site2 member 1 + * 6> Put is done with value which is PDXSerializable + * 7> Validate whether other sites member receive this put operation. + * 8> Bounce site 1 and delete all of it's data + * 9> Make some conflicting PDX registries in site 1 before the reconnect + * 10> Make sure we flag a warning about the conflicting updates. + */ + @Test + public void testWANPDX_ConflictingData() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + vm2.invoke(() -> WANTestBase.createReceiver_PDX( nyPort )); + + vm3.invoke(() -> WANTestBase.createCache_PDX( lnPort )); + + vm3.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + vm3.invoke(() -> WANTestBase.startSender( "ln" )); + + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + vm3.invoke(() -> WANTestBase.doPutsPDXSerializable( getTestMethodName() + "_RR", + 1 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize_PDX( + getTestMethodName() + "_RR", 1 )); + + //bounce vm3 + vm3.invoke(() -> WANTestBase.closeCache()); + + IgnoredException ex1 = IgnoredException.addIgnoredException("Trying to add a PDXType with the same id"); + IgnoredException ex2 = IgnoredException.addIgnoredException("CacheWriterException"); + IgnoredException ex3 = IgnoredException.addIgnoredException("does match the existing PDX type"); + IgnoredException ex4 = IgnoredException.addIgnoredException("ServerOperationException"); + IgnoredException ex5 = IgnoredException.addIgnoredException("Stopping the processor"); + + try { + //blow away vm3's PDX data + vm3.invoke(() -> WANTestBase.deletePDXDir()); + + vm3.invoke(() -> WANTestBase.createCache_PDX( lnPort )); + + vm3.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + + vm3.invoke(() -> WANTestBase.startSender( "ln" )); + + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + //Define a different type from vm3 + vm3.invoke(() -> WANTestBase.doPutsPDXSerializable2( getTestMethodName() + "_RR", + 2 )); + + //Give the updates some time to make it over the WAN + Wait.pause(10000); + + vm2.invoke(() -> WANTestBase.validateRegionSize_PDX( + getTestMethodName() + "_RR", 1 )); + + vm3.invoke(() -> WANTestBase.closeCache()); + } finally { + ex1.remove(); + ex2.remove(); + ex3.remove(); + ex4.remove(); + ex5.remove(); + } + } + + /** + * Test + * 1> Site 1 : 1 locator, 1 member + * 2> Site 2 : 1 locator, 1 member + * 3> Site 3 : 1 locator, 1 member + * 3> DR is defined on member 1 on site1 + * 4> Serial GatewaySender is defined on member 1 on site1 + * 5> Same DR is defined on site2 member 1 + * 6> Put is done with value which is PDXSerializable + * 7> Validate whether other sites member receive this put operation. + */ + @Test + public void testWANPDX_RR_SerialSender3Sites() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + Integer tkPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort )); + + createCacheInVMs(lnPort, vm3); + createCacheInVMs(nyPort, vm4); + createCacheInVMs(tkPort, vm5); + vm3.invoke(() -> WANTestBase.createReceiver()); + vm4.invoke(() -> WANTestBase.createReceiver()); + vm5.invoke(() -> WANTestBase.createReceiver()); + + + //Create all of our gateway senders + vm3.invoke(() -> WANTestBase.createSender( "ny", 2, + false, 100, 10, false, false, null, true )); + vm3.invoke(() -> WANTestBase.createSender( "tk", 3, + false, 100, 10, false, false, null, true )); + vm4.invoke(() -> WANTestBase.createSender( "ln", 1, + false, 100, 10, false, false, null, true )); + vm4.invoke(() -> WANTestBase.createSender( "tk", 3, + false, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 1, + false, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ny", 2, + false, 100, 10, false, false, null, true )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ny,tk", isOffHeap() )); + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln,tk", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln,ny", isOffHeap() )); + + //Start all of the senders + vm3.invoke(() -> WANTestBase.startSender( "ny" )); + vm3.invoke(() -> WANTestBase.startSender( "tk" )); + vm4.invoke(() -> WANTestBase.startSender( "ln" )); + vm4.invoke(() -> WANTestBase.startSender( "tk" )); + vm5.invoke(() -> WANTestBase.startSender( "ln" )); + vm5.invoke(() -> WANTestBase.startSender( "ny" )); + + //Pause ln to ny. This means the PDX type will not be dispatched + //to ny from ln + vm3.invoke(() -> WANTestBase.pauseSender( "ny" )); + + Wait.pause(5000); + + //Do some puts that define a PDX type in ln + vm3.invoke(() -> WANTestBase.doPutsPDXSerializable( getTestMethodName() + "_RR", + 1 )); + + //Make sure that tk received the update + vm5.invoke(() -> WANTestBase.validateRegionSize_PDX( + getTestMethodName() + "_RR", 1 )); + + //Make ny didn't receive the update because the sender is paused + vm4.invoke(() -> WANTestBase.validateRegionSize_PDX( + getTestMethodName() + "_RR", 0 )); + + //Now, do a put from tk. This serialized object will be distributed + //to ny from tk, using the type defined by ln. + vm5.invoke(() -> WANTestBase.doPutsPDXSerializable( getTestMethodName() + "_RR", + 2 )); + + //Verify the ny can read the object + vm4.invoke(() -> WANTestBase.validateRegionSize_PDX( + getTestMethodName() + "_RR", 2 )); + + //Wait for vm3 to receive the update (prevents a broken pipe suspect string) + vm3.invoke(() -> WANTestBase.validateRegionSize_PDX( + getTestMethodName() + "_RR", 2 )); + } + + @Test + public void testWANPDX_RR_SerialSender_StartedLater() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2); + vm2.invoke(() -> WANTestBase.createReceiver()); + + createCacheInVMs(lnPort, vm3); + + vm3.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + vm3.invoke(() -> WANTestBase.doPutsPDXSerializable( getTestMethodName() + "_RR", + 10 )); + + vm3.invoke(() -> WANTestBase.startSender( "ln" )); + + vm3.invoke(() -> WANTestBase.doPutsPDXSerializable( getTestMethodName() + "_RR", + 40 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize_PDX( + getTestMethodName() + "_RR", 40 )); + } + + /** + * Test + * 1> Site 1 : 1 locator, 1 member + * 2> Site 2 : 1 locator, 1 member + * 3> PR is defined on member 1 on site1 + * 4> Serial GatewaySender is defined on member 1 on site1 + * 5> Same PR is defined on site2 member 1 + * 6> Put is done with value which is PDXSerializable + * 7> Validate whether other sites member receive this put operation. + */ + + @Test + public void testWANPDX_PR_SerialSender() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2); + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 0, 2, isOffHeap() )); + vm2.invoke(() -> WANTestBase.createReceiver()); + + createCacheInVMs(lnPort, vm3); + + vm3.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + + + vm3.invoke(() -> WANTestBase.startSender( "ln" )); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 0, 2, isOffHeap() )); + + vm3.invoke(() -> WANTestBase.doPutsPDXSerializable( getTestMethodName() + "_PR", + 1 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize_PDX( + getTestMethodName() + "_PR", 1 )); + } + + @Test + public void testWANPDX_PR_SerialSender_StartedLater() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + vm2.invoke(() -> WANTestBase.createReceiver_PDX( nyPort )); + + vm3.invoke(() -> WANTestBase.createCache_PDX( lnPort )); + + vm3.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 0, 2, isOffHeap() )); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 0, 2, isOffHeap() )); + + vm3.invoke(() -> WANTestBase.doPutsPDXSerializable( getTestMethodName() + "_PR", + 20 )); + + vm3.invoke(() -> WANTestBase.startSender( "ln" )); + + vm3.invoke(() -> WANTestBase.doPutsPDXSerializable( getTestMethodName() + "_PR", + 40 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize_PDX( + getTestMethodName() + "_PR", 40 )); + } + + /** + * Test + * 1> Site 1 : 1 locator, 2 member + * 2> Site 2 : 1 locator, 2 member + * 3> PR is defined on member 1, 2 on site1 + * 4> Serial GatewaySender is defined on member 1 on site1 + * 5> Same PR is defined on site2 member 1, 2 + * 6> Put is done with value which is PDXSerializable + * 7> Validate whether other sites member receive this put operation. + */ + + @Test + public void testWANPDX_PR_MultipleVM_SerialSender() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2); + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null,1, 5, isOffHeap() )); + vm2.invoke(() -> WANTestBase.createReceiver()); + + createCacheInVMs(lnPort, vm3, vm4); + + vm3.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + + vm3.invoke(() -> WANTestBase.startSender( "ln" )); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 5, isOffHeap() )); + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 5, isOffHeap() )); + + vm3.invoke(() -> WANTestBase.doPutsPDXSerializable( getTestMethodName() + "_PR", + 10 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize_PDX( + getTestMethodName() + "_PR", 10 )); + } + + @Test + public void testWANPDX_PR_MultipleVM_SerialSender_StartedLater() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + vm2.invoke(() -> WANTestBase.createReceiver_PDX( nyPort )); + + vm3.invoke(() -> WANTestBase.createCache_PDX( lnPort )); + vm4.invoke(() -> WANTestBase.createCache_PDX( lnPort )); + + vm3.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 5, isOffHeap() )); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 5, isOffHeap() )); + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 5, isOffHeap() )); + + vm3.invoke(() -> WANTestBase.doPutsPDXSerializable( getTestMethodName() + "_PR", + 10 )); + + vm3.invoke(() -> WANTestBase.startSender( "ln" )); + + vm4.invoke(() -> WANTestBase.doPutsPDXSerializable( getTestMethodName() + "_PR", + 40 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize_PDX( + getTestMethodName() + "_PR", 40 )); + } + + /** + * Test + * 1> Site 1 : 1 locator, 1 member + * 2> Site 2 : 1 locator, 1 member + * 3> PR is defined on member 1 on site1 + * 4> Parallel GatewaySender is defined on member 1 on site1 + * 5> Same PR is defined on site2 member 1 + * 6> Put is done with value which is PDXSerializable + * 7> Validate whether other sites member receive this put operation. + */ + + @Test + public void testWANPDX_PR_ParallelSender() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2); + vm2.invoke(() -> WANTestBase.createReceiver()); + + vm3.invoke(() -> WANTestBase.createCache( lnPort )); + + vm3.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 0, 1, isOffHeap() )); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 0, 1, isOffHeap() )); + + vm3.invoke(() -> WANTestBase.startSender( "ln" )); + + vm3.invoke(() -> WANTestBase.doPutsPDXSerializable( getTestMethodName() + "_PR", + 1 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize_PDX( + getTestMethodName() + "_PR", 1)); + } + + @Test + public void testWANPDX_PR_ParallelSender_47826() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2); + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 0, 1, isOffHeap() )); + vm2.invoke(() -> WANTestBase.createReceiver()); + + createCacheInVMs(lnPort, vm3); + + vm3.invoke(() -> WANTestBase.createSender( "ln", 2, true, + 100, 10, false, false, null, true )); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 0, 1, isOffHeap() )); + + vm3.invoke(() -> WANTestBase.startSender( "ln" )); + + vm3.invoke(() -> WANTestBase.doPutsPDXSerializable( + getTestMethodName() + "_PR", 1 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize_PDX( + getTestMethodName() + "_PR", 1 )); + } + + @Test + public void testWANPDX_PR_ParallelSender_StartedLater() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + vm2.invoke(() -> WANTestBase.createReceiver_PDX( nyPort )); + + vm3.invoke(() -> WANTestBase.createCache_PDX( lnPort )); + + vm3.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 0, 2, isOffHeap() )); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 0, 2, isOffHeap() )); + + vm3.invoke(() -> WANTestBase.doPutsPDXSerializable( getTestMethodName() + "_PR", + 10 )); + + vm3.invoke(() -> WANTestBase.startSender( "ln" )); + + vm3.invoke(() -> WANTestBase.doPutsPDXSerializable( getTestMethodName() + "_PR", + 40 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize_PDX( + getTestMethodName() + "_PR", 40 )); + } + + + @Test + public void testWANPDX_PR_MultipleVM_ParallelSender() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2); + vm2.invoke(() -> WANTestBase.createReceiver()); + + createCacheInVMs(lnPort, vm3, vm4); + + vm3.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 0, 2, isOffHeap() )); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 0, 2, isOffHeap() )); + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 0, 2, isOffHeap() )); + + startSenderInVMs("ln", vm3, vm4); + + vm3.invoke(() -> WANTestBase.doPutsPDXSerializable( getTestMethodName() + "_PR", + 10 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize_PDX( + getTestMethodName() + "_PR", 10 )); + } + + @Test + public void testWANPDX_PR_MultipleVM_ParallelSender_StartedLater() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + vm2.invoke(() -> WANTestBase.createReceiver_PDX( nyPort )); + + vm3.invoke(() -> WANTestBase.createCache_PDX( lnPort )); + vm4.invoke(() -> WANTestBase.createCache_PDX( lnPort )); + + vm3.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 0, 2, isOffHeap() )); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 0, 2, isOffHeap() )); + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 0, 2, isOffHeap() )); + + vm3.invoke(() -> WANTestBase.doPutsPDXSerializable( getTestMethodName() + "_PR", + 10 )); + + startSenderInVMsAsync("ln", vm3, vm4); + + vm4.invoke(() -> WANTestBase.doPutsPDXSerializable( getTestMethodName() + "_PR", + 40 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize_PDX( + getTestMethodName() + "_PR", 40 )); + } + + + @Test + public void testWANPDX_RR_SerialSenderWithFilter() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2); + vm2.invoke(() -> WANTestBase.createReceiver()); + + createCacheInVMs(lnPort, vm3); + + vm3.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, new PDXGatewayEventFilter(), true )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + vm3.invoke(() -> WANTestBase.startSender( "ln" )); + + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + vm3.invoke(() -> WANTestBase.doPutsPDXSerializable( getTestMethodName() + "_RR", + 1 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize_PDX( + getTestMethodName() + "_RR", 1 )); + + vm3.invoke(() -> PDXNewWanDUnitTest.verifyFilterInvocation( 1)); + } + + + @Test + public void testWANPDX_PR_MultipleVM_ParallelSenderWithFilter() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2); + vm2.invoke(() -> WANTestBase.createReceiver()); + + createCacheInVMs(lnPort, vm3, vm4); + + vm3.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, new PDXGatewayEventFilter(), true )); + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, new PDXGatewayEventFilter(), true )); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 0, 2, isOffHeap() )); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 0, 2, isOffHeap() )); + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 0, 2, isOffHeap() )); + + startSenderInVMs("ln", vm3, vm4); + + vm3.invoke(() -> WANTestBase.doPutsPDXSerializable( getTestMethodName() + "_PR", + 10 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize_PDX( + getTestMethodName() + "_PR", 10 )); + + vm3.invoke(() -> PDXNewWanDUnitTest.verifyFilterInvocation( 5)); + vm4.invoke(() -> PDXNewWanDUnitTest.verifyFilterInvocation( 5)); + } + + + /** + * When remote site bounces then we should send pdx event again. + */ + @Ignore + @Test + public void testWANPDX_PR_SerialSender_RemoteSite_Bounce() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2); + vm2.invoke(() -> WANTestBase.createReceiver()); + + createCacheInVMs(lnPort, vm3); + + vm3.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 0, 2, isOffHeap() )); + + vm3.invoke(() -> WANTestBase.startSender( "ln" )); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 0, 2, isOffHeap() )); + + vm3.invoke(() -> WANTestBase.doPutsPDXSerializable( getTestMethodName() + "_PR", + 1 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize_PDX( + getTestMethodName() + "_PR", 1 )); + + vm2.invoke(() -> WANTestBase.killSender()); + + createReceiverInVMs(vm2, vm4); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 2, isOffHeap() )); + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 2, isOffHeap() )); + + vm3.invoke(() -> WANTestBase.doPutsPDXSerializable( getTestMethodName() + "_PR", + 1 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize_PDX( + getTestMethodName() + "_PR", 1 )); + } + + + + + public static void verifyFilterInvocation(int invocation) { + assertEquals(((PDXGatewayEventFilter)eventFilter).beforeEnqueueInvoked, invocation); + assertEquals(((PDXGatewayEventFilter)eventFilter).beforeTransmitInvoked, invocation); + assertEquals(((PDXGatewayEventFilter)eventFilter).afterAckInvoked, invocation); + } + + + +}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPersistenceDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPersistenceDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPersistenceDUnitTest.java new file mode 100644 index 0000000..1981feb --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPersistenceDUnitTest.java @@ -0,0 +1,670 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.misc; + +import org.junit.Ignore; +import org.junit.experimental.categories.Category; +import org.junit.Test; + +import static org.junit.Assert.*; + +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +import com.gemstone.gemfire.cache.DataPolicy; +import com.gemstone.gemfire.cache.Scope; +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; +import com.gemstone.gemfire.test.dunit.Assert; +import com.gemstone.gemfire.test.dunit.AsyncInvocation; +import com.gemstone.gemfire.test.dunit.LogWriterUtils; + +@Category(DistributedTest.class) +public class ReplicatedRegion_ParallelWANPersistenceDUnitTest extends WANTestBase { + + public ReplicatedRegion_ParallelWANPersistenceDUnitTest() { + super(); + // TODO Auto-generated constructor stub + } + + final String expectedExceptions = null; + + /**Below test is disabled intentionally + 1> In this release 8.0, for rolling upgrade support queue name is changed to old style + 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about + ParallelGatewaySenderQueue#convertPathToName + 3> We have to enabled it in next release + 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 + and version prior to 8.0*/ + @Ignore + @Test + public void test_DR_PGSPERSISTENCE_VALIDATEQUEUE_Restart_Validate_Receiver() { + //create locator on local site + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + //create locator on remote site + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + //create receiver on remote site + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + //create cache in local site + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + //create senders with disk store + String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true )); + String diskStore2 = (String) vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true )); + String diskStore3 = (String) vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true )); + String diskStore4 = (String) vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true )); + + LogWriterUtils.getLogWriter().info("The DS are: " + diskStore1 + "," + diskStore2 + "," + diskStore3 + "," + diskStore4); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + + vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm6.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm7.invoke(() -> WANTestBase.pauseSender( "ln" )); + + //start puts in region on local site + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", 3000 )); + LogWriterUtils.getLogWriter().info("Completed puts in the region"); + + //--------------------close and rebuild local site ------------------------------------------------- + //kill the senders +/* ExpectedException exp1 = addExpectedException(CacheClosedException.class + .getName()); + try { +*/ vm4.invoke(() -> WANTestBase.killSender()); + vm5.invoke(() -> WANTestBase.killSender()); + vm6.invoke(() -> WANTestBase.killSender()); + vm7.invoke(() -> WANTestBase.killSender()); +/* } + finally { + exp1.remove(); + } +*/ + LogWriterUtils.getLogWriter().info("Killed all the senders."); + + //restart the vm + vm4.invoke(() -> WANTestBase.createCache( lnPort )); + vm5.invoke(() -> WANTestBase.createCache( lnPort )); + vm6.invoke(() -> WANTestBase.createCache( lnPort )); + vm7.invoke(() -> WANTestBase.createCache( lnPort )); + + LogWriterUtils.getLogWriter().info("Created back the cache"); + + //create senders with disk store + vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore1, true )); + vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore2, true )); + vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore3, true )); + vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore4, true )); + + LogWriterUtils.getLogWriter().info("Created the senders back from the disk store."); + + //create PR on local site + AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + AsyncInvocation inv2 = vm5.invokeAsync(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + AsyncInvocation inv3 = vm6.invokeAsync(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + AsyncInvocation inv4 = vm7.invokeAsync(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + try { + inv1.join(); + inv2.join(); + inv3.join(); + inv4.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + fail(); + } + + //start the senders in async mode. This will ensure that the + //node of shadow PR that went down last will come up first + startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7); + + LogWriterUtils.getLogWriter().info("Waiting for senders running."); + //wait for senders running + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + + LogWriterUtils.getLogWriter().info("All the senders are now running..."); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 3000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 3000 )); + + //---------------------------------------------------------------------------------------------------- + + vm4.invoke(() -> WANTestBase.doNextPuts( getTestMethodName() + "_RR", 3000, 10000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 10000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 10000 )); + + } + + /**Below test is disabled intentionally + 1> In this release 8.0, for rolling upgrade support queue name is changed to old style + 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about + ParallelGatewaySenderQueue#convertPathToName + 3> We have to enabled it in next release + 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 + and version prior to 8.0*/ + @Ignore + @Test + public void test_DRPERSISTENCE_PGSPERSISTENCE_VALIDATEQUEUE_Restart_Validate_Receiver() { + //create locator on local site + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + //create locator on remote site + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + //create receiver on remote site + createCacheInVMs(nyPort, vm2, vm3); + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + createReceiverInVMs(vm2, vm3); + + //create cache in local site + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + //create senders with disk store + String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true )); + String diskStore2 = (String) vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true )); + String diskStore3 = (String) vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true )); + String diskStore4 = (String) vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true )); + + LogWriterUtils.getLogWriter().info("The DS are: " + diskStore1 + "," + diskStore2 + "," + diskStore3 + "," + diskStore4); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( getTestMethodName() + "_RR", "ln", + Scope.DISTRIBUTED_ACK, DataPolicy.PERSISTENT_REPLICATE, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( getTestMethodName() + "_RR", "ln", + Scope.DISTRIBUTED_ACK, DataPolicy.PERSISTENT_REPLICATE, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( getTestMethodName() + "_RR", "ln", + Scope.DISTRIBUTED_ACK, DataPolicy.PERSISTENT_REPLICATE, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( getTestMethodName() + "_RR", "ln", + Scope.DISTRIBUTED_ACK, DataPolicy.PERSISTENT_REPLICATE, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + + vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm6.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm7.invoke(() -> WANTestBase.pauseSender( "ln" )); + + //start puts in region on local site + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", 3000 )); + LogWriterUtils.getLogWriter().info("Completed puts in the region"); + + //--------------------close and rebuild local site ------------------------------------------------- + //kill the senders +/* ExpectedException exp1 = addExpectedException(CacheClosedException.class + .getName()); + try {*/ + vm4.invoke(() -> WANTestBase.killSender()); + vm5.invoke(() -> WANTestBase.killSender()); + vm6.invoke(() -> WANTestBase.killSender()); + vm7.invoke(() -> WANTestBase.killSender()); +/* } + finally { + exp1.remove(); + }*/ + + //restart the vm + vm4.invoke(() -> WANTestBase.createCache( lnPort )); + vm5.invoke(() -> WANTestBase.createCache( lnPort )); + vm6.invoke(() -> WANTestBase.createCache( lnPort )); + vm7.invoke(() -> WANTestBase.createCache( lnPort )); + + LogWriterUtils.getLogWriter().info("Created back the cache"); + + //create senders with disk store + vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore1, true )); + vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore2, true )); + vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore3, true )); + vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore4, true )); + + LogWriterUtils.getLogWriter().info("Created the senders back from the disk store."); + + //create PR on local site + AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", Scope.DISTRIBUTED_ACK, DataPolicy.PERSISTENT_REPLICATE, isOffHeap() )); + AsyncInvocation inv2 = vm5.invokeAsync(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", Scope.DISTRIBUTED_ACK, DataPolicy.PERSISTENT_REPLICATE, isOffHeap() )); + AsyncInvocation inv3 = vm6.invokeAsync(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", Scope.DISTRIBUTED_ACK, DataPolicy.PERSISTENT_REPLICATE, isOffHeap() )); + AsyncInvocation inv4 = vm7.invokeAsync(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", Scope.DISTRIBUTED_ACK, DataPolicy.PERSISTENT_REPLICATE, isOffHeap() )); + + try { + inv1.join(); + inv2.join(); + inv3.join(); + inv4.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + fail(); + } + + //start the senders in async mode. This will ensure that the + //node of shadow PR that went down last will come up first + startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7); + + LogWriterUtils.getLogWriter().info("Waiting for senders running."); + //wait for senders running + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + + LogWriterUtils.getLogWriter().info("All the senders are now running..."); + + //---------------------------------------------------------------------------------------------------- + + vm4.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 3000 )); + vm5.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 3000 )); + vm6.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 3000 )); + vm7.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 3000 )); + +/* exp1 = addExpectedException(CacheClosedException.class.getName()); + try { +*/ vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 3000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 3000 )); + + vm4.invoke(() -> WANTestBase.doNextPuts( + getTestMethodName() + "_RR", 3000, 10000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 10000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 10000 )); +/* } + finally { + exp1.remove(); + } +*/ } + + /**Below test is disabled intentionally + 1> In this release 8.0, for rolling upgrade support queue name is changed to old style + 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about + ParallelGatewaySenderQueue#convertPathToName + 3> We have to enabled it in next release + 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 + and version prior to 8.0*/ + @Ignore + @Test + public void test_DRPERSISTENCE_PRPERSISTENCE_PGSPERSISTENCE_VALIDATEQUEUE_Restart_Validate_Receiver() { + //create locator on local site + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + //create locator on remote site + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + //create receiver on remote site + vm2.invoke(() -> WANTestBase.createCache( nyPort )); + vm3.invoke(() -> WANTestBase.createCache( nyPort )); + vm2.invoke(() -> WANTestBase.createReceiver()); + vm3.invoke(() -> WANTestBase.createReceiver()); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + + //create cache in local site + vm4.invoke(() -> WANTestBase.createCache( lnPort )); + vm5.invoke(() -> WANTestBase.createCache( lnPort )); + vm6.invoke(() -> WANTestBase.createCache( lnPort )); + vm7.invoke(() -> WANTestBase.createCache( lnPort )); + + //create senders with disk store + String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true )); + String diskStore2 = (String) vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true )); + String diskStore3 = (String) vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true )); + String diskStore4 = (String) vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true )); + + LogWriterUtils.getLogWriter().info("The DS are: " + diskStore1 + "," + diskStore2 + "," + diskStore3 + "," + diskStore4); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", Scope.DISTRIBUTED_ACK, + DataPolicy.PERSISTENT_REPLICATE, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", Scope.DISTRIBUTED_ACK, + DataPolicy.PERSISTENT_REPLICATE, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", Scope.DISTRIBUTED_ACK, + DataPolicy.PERSISTENT_REPLICATE, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", Scope.DISTRIBUTED_ACK, + DataPolicy.PERSISTENT_REPLICATE, isOffHeap() )); + + vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + + vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm6.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm7.invoke(() -> WANTestBase.pauseSender( "ln" )); + + //start puts in region on local site + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", 3000 )); + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 3000 )); + LogWriterUtils.getLogWriter().info("Completed puts in the region"); + + //--------------------close and rebuild local site ------------------------------------------------- + //kill the senders +/* ExpectedException exp1 = addExpectedException(CacheClosedException.class + .getName()); + try { +*/ vm4.invoke(() -> WANTestBase.killSender()); + vm5.invoke(() -> WANTestBase.killSender()); + vm6.invoke(() -> WANTestBase.killSender()); + vm7.invoke(() -> WANTestBase.killSender()); +/* } + finally { + exp1.remove(); + } +*/ + LogWriterUtils.getLogWriter().info("Killed all the senders."); + + //restart the vm + vm4.invoke(() -> WANTestBase.createCache( lnPort )); + vm5.invoke(() -> WANTestBase.createCache( lnPort )); + vm6.invoke(() -> WANTestBase.createCache( lnPort )); + vm7.invoke(() -> WANTestBase.createCache( lnPort )); + + LogWriterUtils.getLogWriter().info("Created back the cache"); + + //create senders with disk store + vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore1, true )); + vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore2, true )); + vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore3, true )); + vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore4, true )); + + LogWriterUtils.getLogWriter().info("Created the senders back from the disk store."); + + //create PR on local site + AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", Scope.DISTRIBUTED_ACK, DataPolicy.PERSISTENT_REPLICATE, isOffHeap() )); + AsyncInvocation inv2 = vm5.invokeAsync(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", Scope.DISTRIBUTED_ACK, DataPolicy.PERSISTENT_REPLICATE, isOffHeap() )); + AsyncInvocation inv3 = vm6.invokeAsync(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", Scope.DISTRIBUTED_ACK, DataPolicy.PERSISTENT_REPLICATE, isOffHeap() )); + AsyncInvocation inv4 = vm7.invokeAsync(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", Scope.DISTRIBUTED_ACK, DataPolicy.PERSISTENT_REPLICATE, isOffHeap() )); + + try { + inv1.join(); + inv2.join(); + inv3.join(); + inv4.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + fail(); + } + + inv1 = vm4.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, + 100, isOffHeap() )); + inv2 = vm5.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, + 100, isOffHeap() )); + inv3 = vm6.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, + 100, isOffHeap() )); + inv4 = vm7.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, + 100, isOffHeap() )); + + try { + inv1.join(); + inv2.join(); + inv3.join(); + inv4.join(); + } + catch (InterruptedException e) { + e.printStackTrace(); + fail(); + } + + //start the senders in async mode. This will ensure that the + //node of shadow PR that went down last will come up first + startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7); + + LogWriterUtils.getLogWriter().info("Waiting for senders running."); + //wait for senders running + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + + LogWriterUtils.getLogWriter().info("All the senders are now running..."); + + //---------------------------------------------------------------------------------------------------- + + vm4.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 3000 )); + vm5.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 3000 )); + vm6.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 3000 )); + vm7.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 3000 )); + + vm4.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 3000 )); + vm5.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 3000 )); + vm6.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 3000 )); + vm7.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 3000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 3000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 3000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 3000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 3000 )); + + vm4.invoke(() -> WANTestBase.doNextPuts( getTestMethodName() + "_RR", 3000, 10000 )); + vm4.invoke(() -> WANTestBase.doNextPuts( getTestMethodName() + "_PR", 3000, 10000 )); + +/* exp1 = addExpectedException(CacheClosedException.class.getName()); + try {*/ + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 10000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 10000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 10000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 10000 )); +/* } + finally { + exp1.remove(); + } +*/ + } + + /**Below test is disabled intentionally + 1> In this release 8.0, for rolling upgrade support queue name is changed to old style + 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about + ParallelGatewaySenderQueue#convertPathToName + 3> We have to enabled it in next release + 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 + and version prior to 8.0*/ + @Ignore + @Test + public void test_DRPERSISTENCE_PGSPERSISTENCE_4NODES_2NODESDOWN_Validate_Receiver() + throws Exception { + + Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + // create senders with disk store + String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, + false, true, null, null, true )); + String diskStore2 = (String) vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, + false, true, null, null, true )); + String diskStore3 = (String) vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, + false, true, null, null, true )); + String diskStore4 = (String) vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, + false, true, null, null, true )); + + LogWriterUtils.getLogWriter().info( + "The DS are: " + diskStore1 + "," + diskStore2 + "," + diskStore3 + "," + + diskStore4); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", Scope.DISTRIBUTED_ACK, + DataPolicy.PERSISTENT_REPLICATE, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", Scope.DISTRIBUTED_ACK, + DataPolicy.PERSISTENT_REPLICATE, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", Scope.DISTRIBUTED_ACK, + DataPolicy.PERSISTENT_REPLICATE, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", Scope.DISTRIBUTED_ACK, + DataPolicy.PERSISTENT_REPLICATE, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + + Thread.sleep(60000); + { + AsyncInvocation inv1 = vm7.invokeAsync(() -> ReplicatedRegion_ParallelWANPropagationDUnitTest.doPuts0(getTestMethodName() + "_RR", 10000 )); + Thread.sleep(1000); + AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender()); + Thread.sleep(2000); + AsyncInvocation inv3 = vm6.invokeAsync(() -> ReplicatedRegion_ParallelWANPropagationDUnitTest.doPuts1(getTestMethodName() + "_RR", 10000 )); + Thread.sleep(1500); + AsyncInvocation inv4 = vm5.invokeAsync(() -> WANTestBase.killSender()); + try { + inv1.join(); + inv2.join(); + inv3.join(); + inv4.join(); + } catch (Exception e) { + Assert.fail("UnExpected Exception", e); + } + } + + vm4.invoke(() -> WANTestBase.createCache( lnPort )); + vm5.invoke(() -> WANTestBase.createCache( lnPort )); + + LogWriterUtils.getLogWriter().info("Created back the cache"); + + // create senders with disk store + vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( + "ln", 2, true, 100, 10, false, true, null, diskStore1, true )); + vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( + "ln", 2, true, 100, 10, false, true, null, diskStore2, true )); + + LogWriterUtils.getLogWriter().info("Created the senders back from the disk store."); + + AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.createReplicatedRegion( getTestMethodName() + "_RR", "ln", Scope.DISTRIBUTED_ACK, + DataPolicy.PERSISTENT_REPLICATE, isOffHeap() )); + AsyncInvocation inv2 = vm5.invokeAsync(() -> WANTestBase.createReplicatedRegion( getTestMethodName() + "_RR", "ln", Scope.DISTRIBUTED_ACK, + DataPolicy.PERSISTENT_REPLICATE, isOffHeap() )); + AsyncInvocation inv3 = vm6.invokeAsync(() -> ReplicatedRegion_ParallelWANPropagationDUnitTest.doPuts2(getTestMethodName() + "_RR", 15000 )); + try { + inv1.join(); + inv2.join(); + + } catch (InterruptedException e) { + e.printStackTrace(); + fail(); + } + + startSenderInVMsAsync("ln", vm4, vm5); + + LogWriterUtils.getLogWriter().info("Waiting for senders running."); + // wait for senders running + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + + vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" )); + vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 15000 )); + } +}
