http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderDUnitTest.java deleted file mode 100644 index 9c6cbdd..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderDUnitTest.java +++ /dev/null @@ -1,460 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.misc; - -import org.junit.Ignore; -import org.junit.experimental.categories.Category; -import org.junit.Test; - -import static org.junit.Assert.*; - -import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; -import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; -import com.gemstone.gemfire.test.junit.categories.DistributedTest; - -import com.gemstone.gemfire.cache.wan.GatewaySender; -import com.gemstone.gemfire.internal.cache.BucketRegion; -import com.gemstone.gemfire.internal.cache.PartitionedRegion; -import com.gemstone.gemfire.internal.cache.RegionQueue; -import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender; -import com.gemstone.gemfire.internal.cache.wan.WANTestBase; -import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue; -import com.gemstone.gemfire.test.dunit.Assert; -import com.gemstone.gemfire.test.dunit.AsyncInvocation; -import com.gemstone.gemfire.test.dunit.IgnoredException; -import com.gemstone.gemfire.test.dunit.LogWriterUtils; -import com.gemstone.gemfire.test.dunit.Wait; -import com.gemstone.gemfire.test.dunit.WaitCriterion; - -import java.util.Set; - -/** - * - */ -@Category(DistributedTest.class) -public class CommonParallelGatewaySenderDUnitTest extends WANTestBase { - - @Test - public void testSameSenderWithNonColocatedRegions() throws Exception { - IgnoredException.addIgnoredException("cannot have the same parallel"); - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() )); - try { - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() )); - fail("Expected IllegalStateException : cannot have the same parallel gateway sender"); - } - catch (Exception e) { - if (!(e.getCause() instanceof IllegalStateException) - || !(e.getCause().getMessage() - .contains("cannot have the same parallel gateway sender id"))) { - Assert.fail("Expected IllegalStateException", e); - } - } - } - - /** - * Simple scenario. Two regions attach the same PGS - * @throws Exception - * Below test is disabled intentionally - 1> In this release 8.0, for rolling upgrade support queue name is changed to old style - 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about - ParallelGatewaySenderQueue#convertPathToName - 3> We have to enabled it in next release - 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 - and version prior to 8.0 - */ - @Test - @Ignore("TODO") - public void testParallelPropagation() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm6.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm7.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR1", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR1", null, 1, 100, isOffHeap() )); - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR2", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR2", null, 1, 100, isOffHeap() )); - //before doing any puts, let the senders be running in order to ensure that - //not a single event will be lost - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR1", - 1000 )); - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR2", - 1000 )); - - //verify all buckets drained on all sender nodes. - vm4.invoke(() -> CommonParallelGatewaySenderDUnitTest.validateParallelSenderQueueAllBucketsDrained("ln")); - vm5.invoke(() -> CommonParallelGatewaySenderDUnitTest.validateParallelSenderQueueAllBucketsDrained("ln")); - vm6.invoke(() -> CommonParallelGatewaySenderDUnitTest.validateParallelSenderQueueAllBucketsDrained("ln")); - vm7.invoke(() -> CommonParallelGatewaySenderDUnitTest.validateParallelSenderQueueAllBucketsDrained("ln")); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR1", 1000 )); - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR2", 1000 )); - } - - /** - * The PGS is persistence enabled but not the Regions - * Below test is disabled intentionally - 1> In this release 8.0, for rolling upgrade support queue name is changed to old style - 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about - ParallelGatewaySenderQueue#convertPathToName - 3> We have to enabled it in next release - 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 - and version prior to 8.0 - */ - @Test - @Ignore("TODO") - public void testParallelPropagationPersistenceEnabled() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, true, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, true, null, true )); - vm6.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, true, null, true )); - vm7.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, true, null, true )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR1", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR1", null, 1, 100, isOffHeap() )); - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR2", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR2", null, 1, 100, isOffHeap() )); - //before doing any puts, let the senders be running in order to ensure that - //not a single event will be lost - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR1", - 1000 )); - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR2", - 1000 )); - - //verify all buckets drained on all sender nodes. - vm4.invoke(() -> CommonParallelGatewaySenderDUnitTest.validateParallelSenderQueueAllBucketsDrained("ln")); - vm5.invoke(() -> CommonParallelGatewaySenderDUnitTest.validateParallelSenderQueueAllBucketsDrained("ln")); - vm6.invoke(() -> CommonParallelGatewaySenderDUnitTest.validateParallelSenderQueueAllBucketsDrained("ln")); - vm7.invoke(() -> CommonParallelGatewaySenderDUnitTest.validateParallelSenderQueueAllBucketsDrained("ln")); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR1", 1000 )); - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR2", 1000 )); - } - - - /** - * Enable persistence for GatewaySender. - * Pause the sender and do some puts in local region. - * Close the local site and rebuild the region and sender from disk store. - * Dispatcher should not start dispatching events recovered from persistent sender. - * Check if the remote site receives all the events. - * Below test is disabled intentionally - 1> In this release 8.0, for rolling upgrade support queue name is changed to old style - 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about - ParallelGatewaySenderQueue#convertPathToName - 3> We have to enabled it in next release - 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 - and version prior to 8.0 - */ - @Test - @Ignore("TODO") - public void testPRWithGatewaySenderPersistenceEnabled_Restart() { - //create locator on local site - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - //create locator on remote site - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - //create receiver on remote site - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - //create cache in local site - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - //create senders with disk store - String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true )); - String diskStore2 = (String) vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true )); - String diskStore3 = (String) vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true )); - String diskStore4 = (String) vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true )); - - LogWriterUtils.getLogWriter().info("The DS are: " + diskStore1 + "," + diskStore2 + "," + diskStore3 + "," + diskStore4); - - //create PR on remote site - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName()+"PR1", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName()+"PR1", null, 1, 100, isOffHeap() )); - - //create PR on remote site - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName()+"PR2", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName()+"PR2", null, 1, 100, isOffHeap() )); - - //create PR on local site - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName()+"PR1", "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName()+"PR1", "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName()+"PR1", "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName()+"PR1", "ln", 1, 100, isOffHeap() )); - - //create PR on local site - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName()+"PR2", "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName()+"PR2", "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName()+"PR2", "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName()+"PR2", "ln", 1, 100, isOffHeap() )); - - - //start the senders on local site - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - //wait for senders to become running - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - - //pause the senders - vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm6.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm7.invoke(() -> WANTestBase.pauseSender( "ln" )); - - //start puts in region on local site - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName()+"PR1", 3000 )); - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName()+"PR2", 5000 )); - LogWriterUtils.getLogWriter().info("Completed puts in the region"); - - //--------------------close and rebuild local site ------------------------------------------------- - //kill the senders - vm4.invoke(() -> WANTestBase.killSender()); - vm5.invoke(() -> WANTestBase.killSender()); - vm6.invoke(() -> WANTestBase.killSender()); - vm7.invoke(() -> WANTestBase.killSender()); - - LogWriterUtils.getLogWriter().info("Killed all the senders."); - - //restart the vm - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); - - LogWriterUtils.getLogWriter().info("Created back the cache"); - - //create senders with disk store - vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore1, true )); - vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore2, true )); - vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore3, true )); - vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore4, true )); - - LogWriterUtils.getLogWriter().info("Created the senders back from the disk store."); - //create PR on local site - AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.createPartitionedRegion( - getTestMethodName()+"PR1", "ln", 1, 100, isOffHeap() )); - AsyncInvocation inv2 = vm5.invokeAsync(() -> WANTestBase.createPartitionedRegion( - getTestMethodName()+"PR1", "ln", 1, 100, isOffHeap() )); - AsyncInvocation inv3 = vm6.invokeAsync(() -> WANTestBase.createPartitionedRegion( - getTestMethodName()+"PR1", "ln", 1, 100, isOffHeap() )); - AsyncInvocation inv4 = vm7.invokeAsync(() -> WANTestBase.createPartitionedRegion( - getTestMethodName()+"PR1", "ln", 1, 100, isOffHeap() )); - - try { - inv1.join(); - inv2.join(); - inv3.join(); - inv4.join(); - } catch (InterruptedException e) { - e.printStackTrace(); - fail(); - } - - inv1 = vm4.invokeAsync(() -> WANTestBase.createPartitionedRegion( - getTestMethodName()+"PR2", "ln", 1, 100, isOffHeap() )); - inv2 = vm5.invokeAsync(() -> WANTestBase.createPartitionedRegion( - getTestMethodName()+"PR2", "ln", 1, 100, isOffHeap() )); - inv3 = vm6.invokeAsync(() -> WANTestBase.createPartitionedRegion( - getTestMethodName()+"PR2", "ln", 1, 100, isOffHeap() )); - inv4 = vm7.invokeAsync(() -> WANTestBase.createPartitionedRegion( - getTestMethodName()+"PR2", "ln", 1, 100, isOffHeap() )); - - try { - inv1.join(); - inv2.join(); - inv3.join(); - inv4.join(); - } catch (InterruptedException e) { - e.printStackTrace(); - fail(); - } - - LogWriterUtils.getLogWriter().info("Created back the partitioned regions"); - - //start the senders in async mode. This will ensure that the - //node of shadow PR that went down last will come up first - startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7); - - LogWriterUtils.getLogWriter().info("Waiting for senders running."); - //wait for senders running - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - - LogWriterUtils.getLogWriter().info("All the senders are now running..."); - - //---------------------------------------------------------------------------------------------------- - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName()+"PR1", 3000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName()+"PR1", 3000 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName()+"PR2", 5000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName()+"PR2", 5000 )); - } - - public static void validateParallelSenderQueueAllBucketsDrained(final String senderId) { - Set<GatewaySender> senders = cache.getGatewaySenders(); - GatewaySender sender = null; - for (GatewaySender s : senders) { - if (s.getId().equals(senderId)) { - sender = s; - break; - } - } - ConcurrentParallelGatewaySenderQueue regionQueue = (ConcurrentParallelGatewaySenderQueue)((AbstractGatewaySender)sender).getQueues().toArray(new RegionQueue[1])[0]; - - Set<PartitionedRegion> shadowPRs = (Set<PartitionedRegion>)regionQueue.getRegions(); - - for(PartitionedRegion shadowPR: shadowPRs) { - Set<BucketRegion> buckets = shadowPR.getDataStore().getAllLocalBucketRegions(); - - for (final BucketRegion bucket : buckets) { - WaitCriterion wc = new WaitCriterion() { - public boolean done() { - if (bucket.keySet().size() == 0) { - LogWriterUtils.getLogWriter().info("Bucket " + bucket.getId() + " is empty"); - return true; - } - return false; - } - - public String description() { - return "Expected bucket entries for bucket: " + bucket.getId() + " is: 0 but actual entries: " - + bucket.keySet().size() + " This bucket isPrimary: " + bucket.getBucketAdvisor().isPrimary() + " KEYSET: " + bucket.keySet(); - } - }; - Wait.waitForCriterion(wc, 180000, 50, true); - - }//for loop ends - } - - - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderOffHeapDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderOffHeapDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderOffHeapDUnitTest.java deleted file mode 100644 index 2fb77ff..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderOffHeapDUnitTest.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.misc; - -import org.junit.experimental.categories.Category; -import org.junit.Test; - -import static org.junit.Assert.*; - -import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; -import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; -import com.gemstone.gemfire.test.junit.categories.DistributedTest; - -@SuppressWarnings("serial") -@Category(DistributedTest.class) -public class CommonParallelGatewaySenderOffHeapDUnitTest extends - CommonParallelGatewaySenderDUnitTest { - - public CommonParallelGatewaySenderOffHeapDUnitTest() { - super(); - } - - @Override - public boolean isOffHeap() { - return true; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/NewWANConcurrencyCheckForDestroyDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/NewWANConcurrencyCheckForDestroyDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/NewWANConcurrencyCheckForDestroyDUnitTest.java deleted file mode 100644 index 8b8c624..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/NewWANConcurrencyCheckForDestroyDUnitTest.java +++ /dev/null @@ -1,532 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.misc; - -import org.junit.experimental.categories.Category; -import org.junit.Test; - -import static org.junit.Assert.*; - -import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; -import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; -import com.gemstone.gemfire.test.junit.categories.DistributedTest; - -import java.util.HashMap; -import java.util.Map; - -import com.gemstone.gemfire.cache.CacheException; -import com.gemstone.gemfire.cache.Region; -import com.gemstone.gemfire.cache30.CacheSerializableRunnable; -import com.gemstone.gemfire.internal.cache.DistributedCacheOperation; -import com.gemstone.gemfire.internal.cache.EntrySnapshot; -import com.gemstone.gemfire.internal.cache.LocalRegion; -import com.gemstone.gemfire.internal.cache.LocalRegion.NonTXEntry; -import com.gemstone.gemfire.internal.cache.RegionEntry; -import com.gemstone.gemfire.internal.cache.Token.Tombstone; -import com.gemstone.gemfire.internal.cache.versions.VersionTag; -import com.gemstone.gemfire.internal.cache.wan.WANTestBase; -import com.gemstone.gemfire.test.dunit.AsyncInvocation; -import com.gemstone.gemfire.test.dunit.LogWriterUtils; -import com.gemstone.gemfire.test.dunit.Wait; - -/** - * - * Test verifies that version tag for destroyed entry is propagated back to - * origin distributed system if the version tag is applied and replaces old - * version information in destination distributed system. - * - * Version tag information which is relevant between multiple distributed - * systems consistency check is basically dsid and timestamp. - */ -@Category(DistributedTest.class) -public class NewWANConcurrencyCheckForDestroyDUnitTest extends WANTestBase { - - public NewWANConcurrencyCheckForDestroyDUnitTest() { - super(); - } - - @Test - public void testVersionTagTimestampForDestroy() { - - - // create three distributed systems with each having a cache containing - // a Replicated Region with one entry and concurrency checks enabled. - - // Site 2 and Site 3 only know about Site 1 but Site 1 knows about both - // Site 2 and Site 3. - - // Site 1 - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - createCacheInVMs(lnPort, vm1); - Integer lnRecPort = (Integer) vm1.invoke(() -> WANTestBase.createReceiver()); - - //Site 2 - Integer nyPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - createCacheInVMs(nyPort, vm3); - Integer nyRecPort = (Integer) vm3.invoke(() -> WANTestBase.createReceiver()); - - //Site 3 - Integer tkPort = (Integer)vm4.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort )); - createCacheInVMs(tkPort, vm5); - Integer tkRecPort = (Integer) vm5.invoke(() -> WANTestBase.createReceiver()); - - LogWriterUtils.getLogWriter().info("Created locators and receivers in 3 distributed systems"); - - //Site 1 - vm1.invoke(() -> WANTestBase.createSender( "ln1", 2, - true, 10, 1, false, false, null, true )); - vm1.invoke(() -> WANTestBase.createSender( "ln2", 3, - true, 10, 1, false, false, null, true )); - - vm1.invoke(() -> WANTestBase.createPartitionedRegion("repRegion", "ln1,ln2", 0, 1, isOffHeap() )); - vm1.invoke(() -> WANTestBase.startSender( "ln1" )); - vm1.invoke(() -> WANTestBase.startSender( "ln2" )); - vm1.invoke(() -> WANTestBase.waitForSenderRunningState( "ln1" )); - vm1.invoke(() -> WANTestBase.waitForSenderRunningState( "ln2" )); - - //Site 2 - vm3.invoke(() -> WANTestBase.createSender( "ny1", 1, - true, 10, 1, false, false, null, true )); - - vm3.invoke(() -> WANTestBase.createPartitionedRegion("repRegion", "ny1", 0, 1, isOffHeap() )); - vm3.invoke(() -> WANTestBase.startSender( "ny1" )); - vm3.invoke(() -> WANTestBase.waitForSenderRunningState( "ny1" )); - - //Site 3 which only knows about Site 1. - vm5.invoke(() -> WANTestBase.createSender( "tk1", 1, - true, 10, 1, false, false, null, true )); - - vm5.invoke(() -> WANTestBase.createPartitionedRegion("repRegion", "tk1", 0, 1, isOffHeap() )); - vm5.invoke(() -> WANTestBase.startSender( "tk1" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "tk1" )); - - Wait.pause(2000); - - // Perform a put in vm1 - vm1.invoke(new CacheSerializableRunnable("Putting an entry in ds1") { - - @Override - public void run2() throws CacheException { - assertNotNull(cache); - - Region region = cache.getRegion("/repRegion"); - region.put("testKey", "testValue"); - - assertEquals(1, region.size()); - } - }); - - //wait for vm1 to propagate put to vm3 and vm5 - Wait.pause(2000); - - long destroyTimeStamp = (Long) vm3.invoke(() -> NewWANConcurrencyCheckForDestroyDUnitTest.getVersionTimestampAfterOp()); - - //wait for vm1 to propagate destroyed entry's new version tag to vm5 - Wait.pause(2000); - - vm5.invoke(() -> NewWANConcurrencyCheckForDestroyDUnitTest.verifyTimestampAfterOp(destroyTimeStamp, 1 /* ds 3 receives gateway event only from ds 1*/)); - } - - /** - * Test creates two sites and one Replicated Region on each with Serial - * GatewaySender on each. Test checks for sequence of events being sent from - * site1 to site2 for PUTALL and PUT and finally checks for final timestamp in - * version for RegionEntry with key "testKey". If timestamp on both site is - * same that means events were transferred in correct sequence. - */ - @Test - public void testPutAllEventSequenceOnSerialGatewaySenderWithRR() { - - // create two distributed systems with each having a cache containing - // a Replicated Region with one entry and concurrency checks enabled. - - // Site 1 - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - vm1.invoke(() -> WANTestBase.createCache(lnPort)); - Integer lnRecPort = (Integer) vm1.invoke(() -> WANTestBase.createReceiver()); - - //Site 2 - Integer nyPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm3.invoke(() -> WANTestBase.createCache(nyPort)); - Integer nyRecPort = (Integer) vm3.invoke(() -> WANTestBase.createReceiver()); - - LogWriterUtils.getLogWriter().info("Created locators and receivers in 2 distributed systems"); - - //Site 1 - vm1.invoke(() -> WANTestBase.createSender( "ln1", 2, - false, 10, 1, false, false, null, true )); - - vm1.invoke(() -> WANTestBase.createReplicatedRegion("repRegion", "ln1", isOffHeap() )); - vm1.invoke(() -> WANTestBase.startSender( "ln1" )); - vm1.invoke(() -> WANTestBase.waitForSenderRunningState( "ln1" )); - - //Site 2 - vm3.invoke(() -> WANTestBase.createSender( "ny1", 1, - false, 10, 1, false, false, null, true )); - - vm3.invoke(() -> WANTestBase.createReplicatedRegion("repRegion", "ny1", isOffHeap() )); - vm3.invoke(() -> WANTestBase.startSender( "ny1" )); - vm3.invoke(() -> WANTestBase.waitForSenderRunningState( "ny1" )); - - Wait.pause(2000); - - // Perform a put in vm1 - AsyncInvocation asynch1 = vm1.invokeAsync(new CacheSerializableRunnable("Putting an entry in ds1") { - - @Override - public void run2() throws CacheException { - assertNotNull(cache); - // Test hook to make put wait after RE lock is released but before Gateway events are sent. - DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 2000; - - Region region = cache.getRegion("/repRegion"); - Map testMap = new HashMap(); - testMap.put("testKey", "testValue1"); - region.putAll(testMap); - - assertEquals(1, region.size()); - assertEquals("testValue2", region.get("testKey")); - } - }); - - //wait for vm1 to propagate put to vm3 - Wait.pause(1000); - - AsyncInvocation asynch2 = vm1.invokeAsync(new CacheSerializableRunnable("Putting an entry in ds1") { - - @Override - public void run2() throws CacheException { - assertNotNull(cache); - Region region = cache.getRegion("/repRegion"); - - while (!region.containsKey("testKey")) { - Wait.pause(10); - } - // Test hook to make put wait after RE lock is released but before Gateway events are sent. - DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 0; - - region.put("testKey", "testValue2"); - - assertEquals(1, region.size()); - assertEquals("testValue2", region.get("testKey")); - } - }); - - try { - asynch1.join(5000); - asynch2.join(5000); - } catch (InterruptedException e) { - e.printStackTrace(); - } finally { - vm1.invoke(new CacheSerializableRunnable("Reset Test Hook") { - - @Override - public void run2() throws CacheException { - DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 0; - } - }); - } - - //Wait for all Gateway events be received by vm3. - Wait.pause(1000); - - long putAllTimeStampVm1 = (Long) vm1.invoke(() -> NewWANConcurrencyCheckForDestroyDUnitTest.getVersionTimestampAfterPutAllOp()); - - long putAllTimeStampVm3 = (Long) vm3.invoke(() -> NewWANConcurrencyCheckForDestroyDUnitTest.getVersionTimestampAfterPutAllOp()); - - assertEquals(putAllTimeStampVm1, putAllTimeStampVm3); - } - -/** - * This is similar to above test but for PartitionedRegion. - */ - @Test - public void testPutAllEventSequenceOnSerialGatewaySenderWithPR() { - - // create two distributed systems with each having a cache containing - // a Replicated Region with one entry and concurrency checks enabled. - - // Site 1 - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - createCacheInVMs(lnPort, vm1); - Integer lnRecPort = (Integer) vm1.invoke(() -> WANTestBase.createReceiver()); - - //Site 2 - Integer nyPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - createCacheInVMs(nyPort, vm3); - Integer nyRecPort = (Integer) vm3.invoke(() -> WANTestBase.createReceiver()); - - LogWriterUtils.getLogWriter().info("Created locators and receivers in 2 distributed systems"); - - //Site 1 - vm1.invoke(() -> WANTestBase.createSender( "ln1", 2, - false, 10, 1, false, false, null, true )); - - vm1.invoke(() -> WANTestBase.createPartitionedRegion("repRegion", "ln1", 0, 1, isOffHeap() )); - vm1.invoke(() -> WANTestBase.startSender( "ln1" )); - vm1.invoke(() -> WANTestBase.waitForSenderRunningState( "ln1" )); - - //Site 2 - vm3.invoke(() -> WANTestBase.createSender( "ny1", 1, - false, 10, 1, false, false, null, true )); - - vm3.invoke(() -> WANTestBase.createPartitionedRegion("repRegion", "ny1", 0, 1, isOffHeap() )); - vm3.invoke(() -> WANTestBase.startSender( "ny1" )); - vm3.invoke(() -> WANTestBase.waitForSenderRunningState( "ny1" )); - - Wait.pause(2000); - - // Perform a put in vm1 - AsyncInvocation asynch1 = vm1.invokeAsync(new CacheSerializableRunnable("Putting an entry in ds1") { - - @Override - public void run2() throws CacheException { - assertNotNull(cache); - // Test hook to make put wait after RE lock is released but before Gateway events are sent. - DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 2000; - - Region region = cache.getRegion("/repRegion"); - Map testMap = new HashMap(); - testMap.put("testKey", "testValue1"); - region.putAll(testMap); - - assertEquals(1, region.size()); - assertEquals("testValue2", region.get("testKey")); - } - }); - - //wait for vm1 to propagate put to vm3 - Wait.pause(1000); - - AsyncInvocation asynch2 = vm1.invokeAsync(new CacheSerializableRunnable("Putting an entry in ds1") { - - @Override - public void run2() throws CacheException { - assertNotNull(cache); - Region region = cache.getRegion("/repRegion"); - - while (!region.containsKey("testKey")) { - Wait.pause(10); - } - // Test hook to make put wait after RE lock is released but before Gateway events are sent. - DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 0; - - region.put("testKey", "testValue2"); - - assertEquals(1, region.size()); - assertEquals("testValue2", region.get("testKey")); - } - }); - - try { - asynch1.join(5000); - asynch2.join(5000); - } catch (InterruptedException e) { - e.printStackTrace(); - } finally { - vm1.invoke(new CacheSerializableRunnable("Reset Test Hook") { - - @Override - public void run2() throws CacheException { - DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 0; - } - }); - } - - //Wait for all Gateway events be received by vm3. - Wait.pause(1000); - - long putAllTimeStampVm1 = (Long) vm1.invoke(() -> NewWANConcurrencyCheckForDestroyDUnitTest.getVersionTimestampAfterPutAllOp()); - - long putAllTimeStampVm3 = (Long) vm3.invoke(() -> NewWANConcurrencyCheckForDestroyDUnitTest.getVersionTimestampAfterPutAllOp()); - - assertEquals(putAllTimeStampVm1, putAllTimeStampVm3); - } - - /** - * Tests if conflict checks are happening based on DSID and timestamp even if - * version tag is generated in local distributed system. - */ - @Test - public void testConflictChecksBasedOnDsidAndTimeStamp() { - - - // create two distributed systems with each having a cache containing - // a Replicated Region with one entry and concurrency checks enabled. - - // Site 1 - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - createCacheInVMs(lnPort, vm1); - Integer lnRecPort = (Integer) vm1.invoke(() -> WANTestBase.createReceiver()); - - //Site 2 - Integer nyPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - createCacheInVMs(nyPort, vm3); - Integer nyRecPort = (Integer) vm3.invoke(() -> WANTestBase.createReceiver()); - - LogWriterUtils.getLogWriter().info("Created locators and receivers in 2 distributed systems"); - - //Site 1 - vm1.invoke(() -> WANTestBase.createSender( "ln1", 2, - false, 10, 1, false, false, null, true )); - - vm1.invoke(() -> WANTestBase.createReplicatedRegion("repRegion", "ln1", isOffHeap() )); - vm1.invoke(() -> WANTestBase.startSender( "ln1" )); - vm1.invoke(() -> WANTestBase.waitForSenderRunningState( "ln1" )); - - //Site 2 - vm3.invoke(() -> WANTestBase.createReplicatedRegion("repRegion", "ny1", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.createCache( nyPort )); - vm4.invoke(() -> WANTestBase.createSender( "ny1", 1, - false, 10, 1, false, false, null, true )); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion("repRegion", "ny1", isOffHeap() )); - vm4.invoke(() -> WANTestBase.startSender( "ny1" )); - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ny1" )); - - Wait.pause(2000); - - // Perform a put in vm1 - vm1.invoke(new CacheSerializableRunnable("Putting an entry in ds1") { - - @Override - public void run2() throws CacheException { - assertNotNull(cache); - - Region region = cache.getRegion("/repRegion"); - region.put("testKey", "testValue1"); - - assertEquals(1, region.size()); - } - }); - - //wait for vm4 to have later timestamp before sending operation to vm1 - Wait.pause(300); - - AsyncInvocation asynch = vm4.invokeAsync(new CacheSerializableRunnable("Putting an entry in ds2 in vm4") { - - @Override - public void run2() throws CacheException { - assertNotNull(cache); - Region region = cache.getRegion("/repRegion"); - - region.put("testKey", "testValue2"); - - assertEquals(1, region.size()); - assertEquals("testValue2", region.get("testKey")); - } - }); - - try { - asynch.join(2000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - //Wait for all local ds events be received by vm3. - Wait.pause(1000); - - vm3.invoke(new CacheSerializableRunnable("Check dsid") { - - @Override - public void run2() throws CacheException { - Region region = cache.getRegion("repRegion"); - - Region.Entry entry = ((LocalRegion)region).getEntry("testKey", /*null,*/ - true); //commented while merging revision 43582 - RegionEntry re = null; - if (entry instanceof EntrySnapshot) { - re = ((EntrySnapshot)entry).getRegionEntry(); - } else if (entry instanceof NonTXEntry) { - re = ((NonTXEntry)entry).getRegionEntry(); - } - VersionTag tag = re.getVersionStamp().asVersionTag(); - assertEquals(2, tag.getDistributedSystemId()); - } - }); - - // Check vm3 has latest timestamp from vm4. - long putAllTimeStampVm1 = (Long) vm4.invoke(() -> NewWANConcurrencyCheckForDestroyDUnitTest.getVersionTimestampAfterPutAllOp()); - - long putAllTimeStampVm3 = (Long) vm3.invoke(() -> NewWANConcurrencyCheckForDestroyDUnitTest.getVersionTimestampAfterPutAllOp()); - - assertEquals(putAllTimeStampVm1, putAllTimeStampVm3); - } - - /* - * For VM1 in ds 1. Used in testPutAllEventSequenceOnSerialGatewaySender. - */ - public static long getVersionTimestampAfterPutAllOp() { - Region region = cache.getRegion("repRegion"); - - while (!(region.containsKey("testKey") /*&& region.get("testKey").equals("testValue2") */)) { - Wait.pause(10); - } - assertEquals(1, region.size()); - - Region.Entry entry = ((LocalRegion)region).getEntry("testKey", /*null,*/ true); - RegionEntry re = null; - if (entry instanceof EntrySnapshot) { - re = ((EntrySnapshot)entry).getRegionEntry(); - } else if (entry instanceof NonTXEntry) { - re = ((NonTXEntry)entry).getRegionEntry(); - } - if (re != null) { - LogWriterUtils.getLogWriter().fine("RegionEntry for testKey: " + re.getKey() + " " + re.getValueInVM((LocalRegion) region)); - - VersionTag tag = re.getVersionStamp().asVersionTag(); - return tag.getVersionTimeStamp(); - } else { - return -1; - } - } - - /* - * For VM3 in ds 2. - */ - public static long getVersionTimestampAfterOp() { - Region region = cache.getRegion("repRegion"); - assertEquals(1, region.size()); - - region.destroy("testKey"); - - Region.Entry entry = ((LocalRegion)region).getEntry("testKey", /*null,*/ true); - RegionEntry re = ((EntrySnapshot)entry).getRegionEntry(); - LogWriterUtils.getLogWriter().fine("RegionEntry for testKey: " + re.getKey() + " " + re.getValueInVM((LocalRegion) region)); - assertTrue(re.getValueInVM((LocalRegion) region) instanceof Tombstone); - - VersionTag tag = re.getVersionStamp().asVersionTag(); - return tag.getVersionTimeStamp(); - } - - /* - * For VM 5 in ds 3. - */ - public static void verifyTimestampAfterOp(long timestamp, int memberid) { - Region region = cache.getRegion("repRegion"); - assertEquals(0, region.size()); - - Region.Entry entry = ((LocalRegion)region).getEntry("testKey", /*null,*/ true); - RegionEntry re = ((EntrySnapshot)entry).getRegionEntry(); - assertTrue(re.getValueInVM((LocalRegion) region) instanceof Tombstone); - - VersionTag tag = re.getVersionStamp().asVersionTag(); - assertEquals(timestamp, tag.getVersionTimeStamp()); - assertEquals(memberid, tag.getDistributedSystemId()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.java deleted file mode 100644 index 76940ea..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.java +++ /dev/null @@ -1,469 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.misc; - -import static com.gemstone.gemfire.distributed.ConfigurationProperties.*; -import static com.gemstone.gemfire.test.dunit.Assert.*; - -import java.util.Properties; -import java.util.concurrent.TimeUnit; - -import com.jayway.awaitility.Awaitility; -import org.apache.geode.security.templates.SampleSecurityManager; -import org.apache.logging.log4j.Logger; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import com.gemstone.gemfire.cache.CacheFactory; -import com.gemstone.gemfire.cache.Region; -import com.gemstone.gemfire.distributed.DistributedMember; -import com.gemstone.gemfire.distributed.DistributedSystem; -import com.gemstone.gemfire.internal.Assert; -import com.gemstone.gemfire.internal.cache.wan.WANTestBase; -import com.gemstone.gemfire.internal.logging.LogService; -import com.gemstone.gemfire.security.AuthInitialize; -import com.gemstone.gemfire.security.AuthenticationFailedException; -import com.gemstone.gemfire.security.SecurityTestUtils; -import com.gemstone.gemfire.security.generator.CredentialGenerator; -import com.gemstone.gemfire.security.generator.DummyCredentialGenerator; -import com.gemstone.gemfire.security.templates.UserPasswordAuthInit; -import com.gemstone.gemfire.test.junit.categories.DistributedTest; - -@Category(DistributedTest.class) -public class NewWanAuthenticationDUnitTest extends WANTestBase { - - public static final Logger logger = LogService.getLogger(); - - public static boolean isDifferentServerInGetCredentialCall = false; - - /** - * Authentication test for new WAN with valid credentials. Although, nothing - * related to authentication has been changed in new WAN, this test case is - * added on request from QA for defect 44650. - */ - @Test - public void testWanAuthValidCredentials() { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - logger.info("Created locator on local site"); - - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - logger.info("Created locator on remote site"); - - - CredentialGenerator gen = new DummyCredentialGenerator(); - Properties extraProps = gen.getSystemProperties(); - - String clientauthenticator = gen.getAuthenticator(); - String clientauthInit = gen.getAuthInit(); - - Properties credentials1 = gen.getValidCredentials(1); - if (extraProps != null) { - credentials1.putAll(extraProps); - } - Properties javaProps1 = gen.getJavaProperties(); - - // vm3's invalid credentials - Properties credentials2 = gen.getInvalidCredentials(1); - if (extraProps != null) { - credentials2.putAll(extraProps); - } - Properties javaProps2 = gen.getJavaProperties(); - - Properties props1 = buildProperties(clientauthenticator, clientauthInit, - null, credentials1, null); - - // have vm 3 start a cache with invalid credentails - Properties props2 = buildProperties(clientauthenticator, clientauthInit, - null, credentials2, null); - - vm2.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache( - props1, javaProps1, lnPort )); - logger.info("Created secured cache in vm2"); - - vm3.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache( - props2, javaProps2, nyPort )); - logger.info("Created secured cache in vm3"); - - vm2.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - logger.info("Created sender in vm2"); - - vm3.invoke(() -> createReceiverInSecuredCache()); - logger.info("Created receiver in vm3"); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - logger.info("Created RR in vm2"); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - logger.info("Created RR in vm3"); - - // this tests verifies that even though vm3 has invalid credentials, vm2 can still send data to vm3 because - // vm2 has valid credentials - vm2.invoke(() -> WANTestBase.startSender( "ln" )); - vm2.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - - vm2.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1)); - vm3.invoke(() -> { - Region r = cache.getRegion(Region.SEPARATOR + getTestMethodName() + "_RR"); - Awaitility.waitAtMost(20, TimeUnit.SECONDS).until(() -> assertTrue(r.size() > 0)); - }); - logger.info("Done successfully."); - } - - @Test - public void testWanIntegratedSecurityWithValidCredentials() { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - logger.info("Created locator on local site"); - - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - logger.info("Created locator on remote site"); - - - Properties props1 = buildSecurityProperties("admin", "secret"); - Properties props2 = buildSecurityProperties("guest", "guest"); - - vm2.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache( - props1, null, lnPort )); - logger.info("Created secured cache in vm2"); - - vm3.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache( - props2, null, nyPort )); - logger.info("Created secured cache in vm3"); - - vm2.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - logger.info("Created sender in vm2"); - - vm3.invoke(() -> createReceiverInSecuredCache()); - logger.info("Created receiver in vm3"); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - logger.info("Created RR in vm2"); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - logger.info("Created RR in vm3"); - - vm2.invoke(() -> WANTestBase.startSender( "ln" )); - vm2.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm2.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1)); - vm3.invoke(() -> { - Region r = cache.getRegion(Region.SEPARATOR + getTestMethodName() + "_RR"); - Awaitility.waitAtMost(20, TimeUnit.SECONDS).until(() -> assertTrue(r.size() > 0)); - - }); - logger.info("Done successfully."); - } - - /** - * Test authentication with new WAN with invalid credentials. Although, - * nothing related to authentication has been changed in new WAN, this test - * case is added on request from QA for defect 44650. - */ - @Test - public void testWanAuthInvalidCredentials() { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - logger.info("Created locator on local site"); - - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - logger.info("Created locator on remote site"); - - - CredentialGenerator gen = new DummyCredentialGenerator(); - logger.info("Picked up credential: " + gen); - - Properties extraProps = gen.getSystemProperties(); - - String clientauthenticator = gen.getAuthenticator(); - String clientauthInit = gen.getAuthInit(); - - Properties credentials1 = gen.getInvalidCredentials(1); - if (extraProps != null) { - credentials1.putAll(extraProps); - } - Properties javaProps1 = gen.getJavaProperties(); - Properties credentials2 = gen.getInvalidCredentials(2); - if (extraProps != null) { - credentials2.putAll(extraProps); - } - Properties javaProps2 = gen.getJavaProperties(); - - Properties props1 = buildProperties(clientauthenticator, clientauthInit, - null, credentials1, null); - Properties props2 = buildProperties(clientauthenticator, clientauthInit, - null, credentials2, null); - - logger.info("Done building auth properties"); - - vm2.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache( - props1, javaProps1, lnPort )); - logger.info("Created secured cache in vm2"); - - vm3.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache( - props2, javaProps2, nyPort )); - logger.info("Created secured cache in vm3"); - - vm2.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - logger.info("Created sender in vm2"); - - vm3.invoke(() -> createReceiverInSecuredCache()); - logger.info("Created receiver in vm3"); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - logger.info("Created RR in vm2"); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - logger.info("Created RR in vm3"); - - try { - vm2.invoke(() -> WANTestBase.startSender( "ln" )); - fail("Authentication Failed: While starting the sender, an exception should have been thrown"); - } catch (Exception e) { - if (!(e.getCause().getCause() instanceof AuthenticationFailedException)) { - fail("Authentication is not working as expected", e); - } - } - } - - /** - * Test authentication with new WAN with invalid credentials. Although, - * nothing related to authentication has been changed in new WAN, this test - * case is added on request from QA for defect 44650. - */ - @Test - public void testWanSecurityManagerWithInvalidCredentials() { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - logger.info("Created locator on local site"); - - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - logger.info("Created locator on remote site"); - - Properties props1 = buildSecurityProperties("admin", "wrongPswd"); - Properties props2 = buildSecurityProperties("guest", "wrongPswd"); - - logger.info("Done building auth properties"); - - vm2.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache( - props1, null, lnPort )); - logger.info("Created secured cache in vm2"); - - vm3.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache( - props2, null, nyPort )); - logger.info("Created secured cache in vm3"); - - vm2.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - logger.info("Created sender in vm2"); - - vm3.invoke(() -> createReceiverInSecuredCache()); - logger.info("Created receiver in vm3"); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - logger.info("Created RR in vm2"); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - logger.info("Created RR in vm3"); - - try { - vm2.invoke(() -> WANTestBase.startSender( "ln" )); - fail("Authentication Failed: While starting the sender, an exception should have been thrown"); - } catch (Exception e) { - if (!(e.getCause().getCause() instanceof AuthenticationFailedException)) { - fail("Authentication is not working as expected", e); - } - } - } - - private static Properties buildProperties(String clientauthenticator, - String clientAuthInit, String accessor, Properties extraAuthProps, - Properties extraAuthzProps) { - Properties authProps = new Properties(); - if (clientauthenticator != null) { - authProps.setProperty( - SECURITY_CLIENT_AUTHENTICATOR, - clientauthenticator); - } - if (accessor != null) { - authProps.setProperty(SECURITY_CLIENT_ACCESSOR, - accessor); - } - if (clientAuthInit != null) { - authProps.setProperty(SECURITY_CLIENT_AUTH_INIT, clientAuthInit); - } - if (extraAuthProps != null) { - authProps.putAll(extraAuthProps); - } - if (extraAuthzProps != null) { - authProps.putAll(extraAuthzProps); - } - return authProps; - } - - private static Properties buildSecurityProperties(String username, String password){ - Properties props = new Properties(); - props.put(SECURITY_MANAGER, SampleSecurityManager.class.getName()); - props.put("security-json", "org/apache/geode/security/templates/security.json"); - props.put(SECURITY_CLIENT_AUTH_INIT, UserPasswdAI.class.getName()); - props.put("security-username", username); - props.put("security-password", password); - return props; - } - - public static void createSecuredCache(Properties authProps, Object javaProps, Integer locPort) { - authProps.setProperty(MCAST_PORT, "0"); - authProps.setProperty(LOCATORS, "localhost[" + locPort + "]"); - - logger.info("Set the server properties to: " + authProps); - logger.info("Set the java properties to: " + javaProps); - - SecurityTestUtils tmpInstance = new SecurityTestUtils("temp"); - DistributedSystem ds = tmpInstance.createSystem(authProps, (Properties)javaProps); - assertNotNull(ds); - assertTrue(ds.isConnected()); - cache = CacheFactory.create(ds); - assertNotNull(cache); - } - - public static class UserPasswdAI extends UserPasswordAuthInit { - - public static AuthInitialize createAI() { - return new UserPasswdAI(); - } - - @Override - public Properties getCredentials(Properties props, - DistributedMember server, boolean isPeer) - throws AuthenticationFailedException { - boolean val = ( CacheFactory.getAnyInstance().getDistributedSystem().getDistributedMember().getProcessId() != server.getProcessId()); - Assert.assertTrue(val, "getCredentials: Server should be different"); - Properties p = super.getCredentials(props, server, isPeer); - if(val) { - isDifferentServerInGetCredentialCall = true; - CacheFactory.getAnyInstance().getLoggerI18n().convertToLogWriter().config("setting isDifferentServerInGetCredentialCall " + isDifferentServerInGetCredentialCall); - } else { - CacheFactory.getAnyInstance().getLoggerI18n().convertToLogWriter().config("setting22 isDifferentServerInGetCredentialCall " + isDifferentServerInGetCredentialCall); - } - return p; - } - } - - public static void verifyDifferentServerInGetCredentialCall(){ - Assert.assertTrue(isDifferentServerInGetCredentialCall, "verifyDifferentServerInGetCredentialCall: Server should be different"); - isDifferentServerInGetCredentialCall = false; - } - - @Test - public void testWanAuthValidCredentialsWithServer() { - disconnectAllFromDS(); - { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - logger.info("Created locator on local site"); - - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - logger.info("Created locator on remote site"); - - DummyCredentialGenerator gen = new DummyCredentialGenerator(); - gen.init(); - Properties extraProps = gen.getSystemProperties(); - - String clientauthenticator = gen.getAuthenticator(); - String clientauthInit = UserPasswdAI.class.getName() + ".createAI"; - - Properties credentials1 = gen.getValidCredentials(1); - if (extraProps != null) { - credentials1.putAll(extraProps); - } - Properties javaProps1 = gen.getJavaProperties(); - - Properties credentials2 = gen.getInvalidCredentials(2); - if (extraProps != null) { - credentials2.putAll(extraProps); - } - Properties javaProps2 = gen.getJavaProperties(); - - Properties props1 = buildProperties(clientauthenticator, clientauthInit, - null, credentials1, null); - Properties props2 = buildProperties(clientauthenticator, clientauthInit, - null, credentials2, null); - - vm2.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache( - props1, javaProps1, lnPort )); - logger.info("Created secured cache in vm2"); - - vm3.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache( - props2, javaProps2, nyPort )); - logger.info("Created secured cache in vm3"); - - vm2.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - logger.info("Created sender in vm2"); - - vm3.invoke(() -> createReceiverInSecuredCache()); - logger.info("Created receiver in vm3"); - - vm2.invoke(() -> WANTestBase.startSender( "ln" )); - vm2.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - - vm2.invoke(() -> verifyDifferentServerInGetCredentialCall()); - vm3.invoke(() -> verifyDifferentServerInGetCredentialCall()); - } - } - - @Test - public void testWanSecurityManagerAuthValidCredentialsWithServer() { - disconnectAllFromDS(); - { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - logger.info("Created locator on local site"); - - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - logger.info("Created locator on remote site"); - - Properties props1 = buildSecurityProperties("admin", "secret"); - Properties props2 = buildSecurityProperties("guest", "guest"); - - vm2.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache( - props1, null, lnPort )); - logger.info("Created secured cache in vm2"); - - vm3.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache( - props2, null, nyPort )); - logger.info("Created secured cache in vm3"); - - vm2.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - logger.info("Created sender in vm2"); - - vm3.invoke(() -> createReceiverInSecuredCache()); - logger.info("Created receiver in vm3"); - - vm2.invoke(() -> WANTestBase.startSender( "ln" )); - vm2.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - - vm2.invoke(() -> verifyDifferentServerInGetCredentialCall()); - - // this would fail for now because for integrated security, we are not sending the receiver's credentials back - // to the sender. Because in the old security implementation, even though the receiver's credentials are sent back to the sender - // the sender is not checking it. - //vm3.invoke(() -> verifyDifferentServerInGetCredentialCall()); - } - } -}
