http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/CommonParallelGatewaySenderDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/CommonParallelGatewaySenderDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/CommonParallelGatewaySenderDUnitTest.java new file mode 100644 index 0000000..9c6cbdd --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/CommonParallelGatewaySenderDUnitTest.java @@ -0,0 +1,460 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.misc; + +import org.junit.Ignore; +import org.junit.experimental.categories.Category; +import org.junit.Test; + +import static org.junit.Assert.*; + +import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; +import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +import com.gemstone.gemfire.cache.wan.GatewaySender; +import com.gemstone.gemfire.internal.cache.BucketRegion; +import com.gemstone.gemfire.internal.cache.PartitionedRegion; +import com.gemstone.gemfire.internal.cache.RegionQueue; +import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender; +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; +import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue; +import com.gemstone.gemfire.test.dunit.Assert; +import com.gemstone.gemfire.test.dunit.AsyncInvocation; +import com.gemstone.gemfire.test.dunit.IgnoredException; +import com.gemstone.gemfire.test.dunit.LogWriterUtils; +import com.gemstone.gemfire.test.dunit.Wait; +import com.gemstone.gemfire.test.dunit.WaitCriterion; + +import java.util.Set; + +/** + * + */ +@Category(DistributedTest.class) +public class CommonParallelGatewaySenderDUnitTest extends WANTestBase { + + @Test + public void testSameSenderWithNonColocatedRegions() throws Exception { + IgnoredException.addIgnoredException("cannot have the same parallel"); + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + vm4.invoke(() -> WANTestBase.createCache( lnPort )); + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() )); + try { + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() )); + fail("Expected IllegalStateException : cannot have the same parallel gateway sender"); + } + catch (Exception e) { + if (!(e.getCause() instanceof IllegalStateException) + || !(e.getCause().getMessage() + .contains("cannot have the same parallel gateway sender id"))) { + Assert.fail("Expected IllegalStateException", e); + } + } + } + + /** + * Simple scenario. Two regions attach the same PGS + * @throws Exception + * Below test is disabled intentionally + 1> In this release 8.0, for rolling upgrade support queue name is changed to old style + 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about + ParallelGatewaySenderQueue#convertPathToName + 3> We have to enabled it in next release + 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 + and version prior to 8.0 + */ + @Test + @Ignore("TODO") + public void testParallelPropagation() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm6.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm7.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR1", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR1", null, 1, 100, isOffHeap() )); + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR2", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR2", null, 1, 100, isOffHeap() )); + //before doing any puts, let the senders be running in order to ensure that + //not a single event will be lost + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR1", + 1000 )); + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR2", + 1000 )); + + //verify all buckets drained on all sender nodes. + vm4.invoke(() -> CommonParallelGatewaySenderDUnitTest.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> CommonParallelGatewaySenderDUnitTest.validateParallelSenderQueueAllBucketsDrained("ln")); + vm6.invoke(() -> CommonParallelGatewaySenderDUnitTest.validateParallelSenderQueueAllBucketsDrained("ln")); + vm7.invoke(() -> CommonParallelGatewaySenderDUnitTest.validateParallelSenderQueueAllBucketsDrained("ln")); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR1", 1000 )); + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR2", 1000 )); + } + + /** + * The PGS is persistence enabled but not the Regions + * Below test is disabled intentionally + 1> In this release 8.0, for rolling upgrade support queue name is changed to old style + 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about + ParallelGatewaySenderQueue#convertPathToName + 3> We have to enabled it in next release + 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 + and version prior to 8.0 + */ + @Test + @Ignore("TODO") + public void testParallelPropagationPersistenceEnabled() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, true, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, true, null, true )); + vm6.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, true, null, true )); + vm7.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, true, null, true )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR1", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR1", null, 1, 100, isOffHeap() )); + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR2", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR2", null, 1, 100, isOffHeap() )); + //before doing any puts, let the senders be running in order to ensure that + //not a single event will be lost + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR1", + 1000 )); + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR2", + 1000 )); + + //verify all buckets drained on all sender nodes. + vm4.invoke(() -> CommonParallelGatewaySenderDUnitTest.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> CommonParallelGatewaySenderDUnitTest.validateParallelSenderQueueAllBucketsDrained("ln")); + vm6.invoke(() -> CommonParallelGatewaySenderDUnitTest.validateParallelSenderQueueAllBucketsDrained("ln")); + vm7.invoke(() -> CommonParallelGatewaySenderDUnitTest.validateParallelSenderQueueAllBucketsDrained("ln")); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR1", 1000 )); + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR2", 1000 )); + } + + + /** + * Enable persistence for GatewaySender. + * Pause the sender and do some puts in local region. + * Close the local site and rebuild the region and sender from disk store. + * Dispatcher should not start dispatching events recovered from persistent sender. + * Check if the remote site receives all the events. + * Below test is disabled intentionally + 1> In this release 8.0, for rolling upgrade support queue name is changed to old style + 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about + ParallelGatewaySenderQueue#convertPathToName + 3> We have to enabled it in next release + 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 + and version prior to 8.0 + */ + @Test + @Ignore("TODO") + public void testPRWithGatewaySenderPersistenceEnabled_Restart() { + //create locator on local site + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + //create locator on remote site + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + //create receiver on remote site + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + //create cache in local site + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + //create senders with disk store + String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true )); + String diskStore2 = (String) vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true )); + String diskStore3 = (String) vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true )); + String diskStore4 = (String) vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true )); + + LogWriterUtils.getLogWriter().info("The DS are: " + diskStore1 + "," + diskStore2 + "," + diskStore3 + "," + diskStore4); + + //create PR on remote site + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName()+"PR1", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName()+"PR1", null, 1, 100, isOffHeap() )); + + //create PR on remote site + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName()+"PR2", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName()+"PR2", null, 1, 100, isOffHeap() )); + + //create PR on local site + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName()+"PR1", "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName()+"PR1", "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName()+"PR1", "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName()+"PR1", "ln", 1, 100, isOffHeap() )); + + //create PR on local site + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName()+"PR2", "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName()+"PR2", "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName()+"PR2", "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName()+"PR2", "ln", 1, 100, isOffHeap() )); + + + //start the senders on local site + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + //wait for senders to become running + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + + //pause the senders + vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm6.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm7.invoke(() -> WANTestBase.pauseSender( "ln" )); + + //start puts in region on local site + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName()+"PR1", 3000 )); + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName()+"PR2", 5000 )); + LogWriterUtils.getLogWriter().info("Completed puts in the region"); + + //--------------------close and rebuild local site ------------------------------------------------- + //kill the senders + vm4.invoke(() -> WANTestBase.killSender()); + vm5.invoke(() -> WANTestBase.killSender()); + vm6.invoke(() -> WANTestBase.killSender()); + vm7.invoke(() -> WANTestBase.killSender()); + + LogWriterUtils.getLogWriter().info("Killed all the senders."); + + //restart the vm + vm4.invoke(() -> WANTestBase.createCache( lnPort )); + vm5.invoke(() -> WANTestBase.createCache( lnPort )); + vm6.invoke(() -> WANTestBase.createCache( lnPort )); + vm7.invoke(() -> WANTestBase.createCache( lnPort )); + + LogWriterUtils.getLogWriter().info("Created back the cache"); + + //create senders with disk store + vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore1, true )); + vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore2, true )); + vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore3, true )); + vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore4, true )); + + LogWriterUtils.getLogWriter().info("Created the senders back from the disk store."); + //create PR on local site + AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.createPartitionedRegion( + getTestMethodName()+"PR1", "ln", 1, 100, isOffHeap() )); + AsyncInvocation inv2 = vm5.invokeAsync(() -> WANTestBase.createPartitionedRegion( + getTestMethodName()+"PR1", "ln", 1, 100, isOffHeap() )); + AsyncInvocation inv3 = vm6.invokeAsync(() -> WANTestBase.createPartitionedRegion( + getTestMethodName()+"PR1", "ln", 1, 100, isOffHeap() )); + AsyncInvocation inv4 = vm7.invokeAsync(() -> WANTestBase.createPartitionedRegion( + getTestMethodName()+"PR1", "ln", 1, 100, isOffHeap() )); + + try { + inv1.join(); + inv2.join(); + inv3.join(); + inv4.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + fail(); + } + + inv1 = vm4.invokeAsync(() -> WANTestBase.createPartitionedRegion( + getTestMethodName()+"PR2", "ln", 1, 100, isOffHeap() )); + inv2 = vm5.invokeAsync(() -> WANTestBase.createPartitionedRegion( + getTestMethodName()+"PR2", "ln", 1, 100, isOffHeap() )); + inv3 = vm6.invokeAsync(() -> WANTestBase.createPartitionedRegion( + getTestMethodName()+"PR2", "ln", 1, 100, isOffHeap() )); + inv4 = vm7.invokeAsync(() -> WANTestBase.createPartitionedRegion( + getTestMethodName()+"PR2", "ln", 1, 100, isOffHeap() )); + + try { + inv1.join(); + inv2.join(); + inv3.join(); + inv4.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + fail(); + } + + LogWriterUtils.getLogWriter().info("Created back the partitioned regions"); + + //start the senders in async mode. This will ensure that the + //node of shadow PR that went down last will come up first + startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7); + + LogWriterUtils.getLogWriter().info("Waiting for senders running."); + //wait for senders running + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + + LogWriterUtils.getLogWriter().info("All the senders are now running..."); + + //---------------------------------------------------------------------------------------------------- + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName()+"PR1", 3000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName()+"PR1", 3000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName()+"PR2", 5000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName()+"PR2", 5000 )); + } + + public static void validateParallelSenderQueueAllBucketsDrained(final String senderId) { + Set<GatewaySender> senders = cache.getGatewaySenders(); + GatewaySender sender = null; + for (GatewaySender s : senders) { + if (s.getId().equals(senderId)) { + sender = s; + break; + } + } + ConcurrentParallelGatewaySenderQueue regionQueue = (ConcurrentParallelGatewaySenderQueue)((AbstractGatewaySender)sender).getQueues().toArray(new RegionQueue[1])[0]; + + Set<PartitionedRegion> shadowPRs = (Set<PartitionedRegion>)regionQueue.getRegions(); + + for(PartitionedRegion shadowPR: shadowPRs) { + Set<BucketRegion> buckets = shadowPR.getDataStore().getAllLocalBucketRegions(); + + for (final BucketRegion bucket : buckets) { + WaitCriterion wc = new WaitCriterion() { + public boolean done() { + if (bucket.keySet().size() == 0) { + LogWriterUtils.getLogWriter().info("Bucket " + bucket.getId() + " is empty"); + return true; + } + return false; + } + + public String description() { + return "Expected bucket entries for bucket: " + bucket.getId() + " is: 0 but actual entries: " + + bucket.keySet().size() + " This bucket isPrimary: " + bucket.getBucketAdvisor().isPrimary() + " KEYSET: " + bucket.keySet(); + } + }; + Wait.waitForCriterion(wc, 180000, 50, true); + + }//for loop ends + } + + + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/CommonParallelGatewaySenderOffHeapDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/CommonParallelGatewaySenderOffHeapDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/CommonParallelGatewaySenderOffHeapDUnitTest.java new file mode 100644 index 0000000..2fb77ff --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/CommonParallelGatewaySenderOffHeapDUnitTest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.misc; + +import org.junit.experimental.categories.Category; +import org.junit.Test; + +import static org.junit.Assert.*; + +import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; +import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +@SuppressWarnings("serial") +@Category(DistributedTest.class) +public class CommonParallelGatewaySenderOffHeapDUnitTest extends + CommonParallelGatewaySenderDUnitTest { + + public CommonParallelGatewaySenderOffHeapDUnitTest() { + super(); + } + + @Override + public boolean isOffHeap() { + return true; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/NewWANConcurrencyCheckForDestroyDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/NewWANConcurrencyCheckForDestroyDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/NewWANConcurrencyCheckForDestroyDUnitTest.java new file mode 100644 index 0000000..8b8c624 --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/NewWANConcurrencyCheckForDestroyDUnitTest.java @@ -0,0 +1,532 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.misc; + +import org.junit.experimental.categories.Category; +import org.junit.Test; + +import static org.junit.Assert.*; + +import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; +import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +import java.util.HashMap; +import java.util.Map; + +import com.gemstone.gemfire.cache.CacheException; +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.cache30.CacheSerializableRunnable; +import com.gemstone.gemfire.internal.cache.DistributedCacheOperation; +import com.gemstone.gemfire.internal.cache.EntrySnapshot; +import com.gemstone.gemfire.internal.cache.LocalRegion; +import com.gemstone.gemfire.internal.cache.LocalRegion.NonTXEntry; +import com.gemstone.gemfire.internal.cache.RegionEntry; +import com.gemstone.gemfire.internal.cache.Token.Tombstone; +import com.gemstone.gemfire.internal.cache.versions.VersionTag; +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; +import com.gemstone.gemfire.test.dunit.AsyncInvocation; +import com.gemstone.gemfire.test.dunit.LogWriterUtils; +import com.gemstone.gemfire.test.dunit.Wait; + +/** + * + * Test verifies that version tag for destroyed entry is propagated back to + * origin distributed system if the version tag is applied and replaces old + * version information in destination distributed system. + * + * Version tag information which is relevant between multiple distributed + * systems consistency check is basically dsid and timestamp. + */ +@Category(DistributedTest.class) +public class NewWANConcurrencyCheckForDestroyDUnitTest extends WANTestBase { + + public NewWANConcurrencyCheckForDestroyDUnitTest() { + super(); + } + + @Test + public void testVersionTagTimestampForDestroy() { + + + // create three distributed systems with each having a cache containing + // a Replicated Region with one entry and concurrency checks enabled. + + // Site 2 and Site 3 only know about Site 1 but Site 1 knows about both + // Site 2 and Site 3. + + // Site 1 + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + createCacheInVMs(lnPort, vm1); + Integer lnRecPort = (Integer) vm1.invoke(() -> WANTestBase.createReceiver()); + + //Site 2 + Integer nyPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + createCacheInVMs(nyPort, vm3); + Integer nyRecPort = (Integer) vm3.invoke(() -> WANTestBase.createReceiver()); + + //Site 3 + Integer tkPort = (Integer)vm4.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort )); + createCacheInVMs(tkPort, vm5); + Integer tkRecPort = (Integer) vm5.invoke(() -> WANTestBase.createReceiver()); + + LogWriterUtils.getLogWriter().info("Created locators and receivers in 3 distributed systems"); + + //Site 1 + vm1.invoke(() -> WANTestBase.createSender( "ln1", 2, + true, 10, 1, false, false, null, true )); + vm1.invoke(() -> WANTestBase.createSender( "ln2", 3, + true, 10, 1, false, false, null, true )); + + vm1.invoke(() -> WANTestBase.createPartitionedRegion("repRegion", "ln1,ln2", 0, 1, isOffHeap() )); + vm1.invoke(() -> WANTestBase.startSender( "ln1" )); + vm1.invoke(() -> WANTestBase.startSender( "ln2" )); + vm1.invoke(() -> WANTestBase.waitForSenderRunningState( "ln1" )); + vm1.invoke(() -> WANTestBase.waitForSenderRunningState( "ln2" )); + + //Site 2 + vm3.invoke(() -> WANTestBase.createSender( "ny1", 1, + true, 10, 1, false, false, null, true )); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion("repRegion", "ny1", 0, 1, isOffHeap() )); + vm3.invoke(() -> WANTestBase.startSender( "ny1" )); + vm3.invoke(() -> WANTestBase.waitForSenderRunningState( "ny1" )); + + //Site 3 which only knows about Site 1. + vm5.invoke(() -> WANTestBase.createSender( "tk1", 1, + true, 10, 1, false, false, null, true )); + + vm5.invoke(() -> WANTestBase.createPartitionedRegion("repRegion", "tk1", 0, 1, isOffHeap() )); + vm5.invoke(() -> WANTestBase.startSender( "tk1" )); + vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "tk1" )); + + Wait.pause(2000); + + // Perform a put in vm1 + vm1.invoke(new CacheSerializableRunnable("Putting an entry in ds1") { + + @Override + public void run2() throws CacheException { + assertNotNull(cache); + + Region region = cache.getRegion("/repRegion"); + region.put("testKey", "testValue"); + + assertEquals(1, region.size()); + } + }); + + //wait for vm1 to propagate put to vm3 and vm5 + Wait.pause(2000); + + long destroyTimeStamp = (Long) vm3.invoke(() -> NewWANConcurrencyCheckForDestroyDUnitTest.getVersionTimestampAfterOp()); + + //wait for vm1 to propagate destroyed entry's new version tag to vm5 + Wait.pause(2000); + + vm5.invoke(() -> NewWANConcurrencyCheckForDestroyDUnitTest.verifyTimestampAfterOp(destroyTimeStamp, 1 /* ds 3 receives gateway event only from ds 1*/)); + } + + /** + * Test creates two sites and one Replicated Region on each with Serial + * GatewaySender on each. Test checks for sequence of events being sent from + * site1 to site2 for PUTALL and PUT and finally checks for final timestamp in + * version for RegionEntry with key "testKey". If timestamp on both site is + * same that means events were transferred in correct sequence. + */ + @Test + public void testPutAllEventSequenceOnSerialGatewaySenderWithRR() { + + // create two distributed systems with each having a cache containing + // a Replicated Region with one entry and concurrency checks enabled. + + // Site 1 + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + vm1.invoke(() -> WANTestBase.createCache(lnPort)); + Integer lnRecPort = (Integer) vm1.invoke(() -> WANTestBase.createReceiver()); + + //Site 2 + Integer nyPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + vm3.invoke(() -> WANTestBase.createCache(nyPort)); + Integer nyRecPort = (Integer) vm3.invoke(() -> WANTestBase.createReceiver()); + + LogWriterUtils.getLogWriter().info("Created locators and receivers in 2 distributed systems"); + + //Site 1 + vm1.invoke(() -> WANTestBase.createSender( "ln1", 2, + false, 10, 1, false, false, null, true )); + + vm1.invoke(() -> WANTestBase.createReplicatedRegion("repRegion", "ln1", isOffHeap() )); + vm1.invoke(() -> WANTestBase.startSender( "ln1" )); + vm1.invoke(() -> WANTestBase.waitForSenderRunningState( "ln1" )); + + //Site 2 + vm3.invoke(() -> WANTestBase.createSender( "ny1", 1, + false, 10, 1, false, false, null, true )); + + vm3.invoke(() -> WANTestBase.createReplicatedRegion("repRegion", "ny1", isOffHeap() )); + vm3.invoke(() -> WANTestBase.startSender( "ny1" )); + vm3.invoke(() -> WANTestBase.waitForSenderRunningState( "ny1" )); + + Wait.pause(2000); + + // Perform a put in vm1 + AsyncInvocation asynch1 = vm1.invokeAsync(new CacheSerializableRunnable("Putting an entry in ds1") { + + @Override + public void run2() throws CacheException { + assertNotNull(cache); + // Test hook to make put wait after RE lock is released but before Gateway events are sent. + DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 2000; + + Region region = cache.getRegion("/repRegion"); + Map testMap = new HashMap(); + testMap.put("testKey", "testValue1"); + region.putAll(testMap); + + assertEquals(1, region.size()); + assertEquals("testValue2", region.get("testKey")); + } + }); + + //wait for vm1 to propagate put to vm3 + Wait.pause(1000); + + AsyncInvocation asynch2 = vm1.invokeAsync(new CacheSerializableRunnable("Putting an entry in ds1") { + + @Override + public void run2() throws CacheException { + assertNotNull(cache); + Region region = cache.getRegion("/repRegion"); + + while (!region.containsKey("testKey")) { + Wait.pause(10); + } + // Test hook to make put wait after RE lock is released but before Gateway events are sent. + DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 0; + + region.put("testKey", "testValue2"); + + assertEquals(1, region.size()); + assertEquals("testValue2", region.get("testKey")); + } + }); + + try { + asynch1.join(5000); + asynch2.join(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + vm1.invoke(new CacheSerializableRunnable("Reset Test Hook") { + + @Override + public void run2() throws CacheException { + DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 0; + } + }); + } + + //Wait for all Gateway events be received by vm3. + Wait.pause(1000); + + long putAllTimeStampVm1 = (Long) vm1.invoke(() -> NewWANConcurrencyCheckForDestroyDUnitTest.getVersionTimestampAfterPutAllOp()); + + long putAllTimeStampVm3 = (Long) vm3.invoke(() -> NewWANConcurrencyCheckForDestroyDUnitTest.getVersionTimestampAfterPutAllOp()); + + assertEquals(putAllTimeStampVm1, putAllTimeStampVm3); + } + +/** + * This is similar to above test but for PartitionedRegion. + */ + @Test + public void testPutAllEventSequenceOnSerialGatewaySenderWithPR() { + + // create two distributed systems with each having a cache containing + // a Replicated Region with one entry and concurrency checks enabled. + + // Site 1 + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + createCacheInVMs(lnPort, vm1); + Integer lnRecPort = (Integer) vm1.invoke(() -> WANTestBase.createReceiver()); + + //Site 2 + Integer nyPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + createCacheInVMs(nyPort, vm3); + Integer nyRecPort = (Integer) vm3.invoke(() -> WANTestBase.createReceiver()); + + LogWriterUtils.getLogWriter().info("Created locators and receivers in 2 distributed systems"); + + //Site 1 + vm1.invoke(() -> WANTestBase.createSender( "ln1", 2, + false, 10, 1, false, false, null, true )); + + vm1.invoke(() -> WANTestBase.createPartitionedRegion("repRegion", "ln1", 0, 1, isOffHeap() )); + vm1.invoke(() -> WANTestBase.startSender( "ln1" )); + vm1.invoke(() -> WANTestBase.waitForSenderRunningState( "ln1" )); + + //Site 2 + vm3.invoke(() -> WANTestBase.createSender( "ny1", 1, + false, 10, 1, false, false, null, true )); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion("repRegion", "ny1", 0, 1, isOffHeap() )); + vm3.invoke(() -> WANTestBase.startSender( "ny1" )); + vm3.invoke(() -> WANTestBase.waitForSenderRunningState( "ny1" )); + + Wait.pause(2000); + + // Perform a put in vm1 + AsyncInvocation asynch1 = vm1.invokeAsync(new CacheSerializableRunnable("Putting an entry in ds1") { + + @Override + public void run2() throws CacheException { + assertNotNull(cache); + // Test hook to make put wait after RE lock is released but before Gateway events are sent. + DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 2000; + + Region region = cache.getRegion("/repRegion"); + Map testMap = new HashMap(); + testMap.put("testKey", "testValue1"); + region.putAll(testMap); + + assertEquals(1, region.size()); + assertEquals("testValue2", region.get("testKey")); + } + }); + + //wait for vm1 to propagate put to vm3 + Wait.pause(1000); + + AsyncInvocation asynch2 = vm1.invokeAsync(new CacheSerializableRunnable("Putting an entry in ds1") { + + @Override + public void run2() throws CacheException { + assertNotNull(cache); + Region region = cache.getRegion("/repRegion"); + + while (!region.containsKey("testKey")) { + Wait.pause(10); + } + // Test hook to make put wait after RE lock is released but before Gateway events are sent. + DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 0; + + region.put("testKey", "testValue2"); + + assertEquals(1, region.size()); + assertEquals("testValue2", region.get("testKey")); + } + }); + + try { + asynch1.join(5000); + asynch2.join(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + vm1.invoke(new CacheSerializableRunnable("Reset Test Hook") { + + @Override + public void run2() throws CacheException { + DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 0; + } + }); + } + + //Wait for all Gateway events be received by vm3. + Wait.pause(1000); + + long putAllTimeStampVm1 = (Long) vm1.invoke(() -> NewWANConcurrencyCheckForDestroyDUnitTest.getVersionTimestampAfterPutAllOp()); + + long putAllTimeStampVm3 = (Long) vm3.invoke(() -> NewWANConcurrencyCheckForDestroyDUnitTest.getVersionTimestampAfterPutAllOp()); + + assertEquals(putAllTimeStampVm1, putAllTimeStampVm3); + } + + /** + * Tests if conflict checks are happening based on DSID and timestamp even if + * version tag is generated in local distributed system. + */ + @Test + public void testConflictChecksBasedOnDsidAndTimeStamp() { + + + // create two distributed systems with each having a cache containing + // a Replicated Region with one entry and concurrency checks enabled. + + // Site 1 + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + createCacheInVMs(lnPort, vm1); + Integer lnRecPort = (Integer) vm1.invoke(() -> WANTestBase.createReceiver()); + + //Site 2 + Integer nyPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + createCacheInVMs(nyPort, vm3); + Integer nyRecPort = (Integer) vm3.invoke(() -> WANTestBase.createReceiver()); + + LogWriterUtils.getLogWriter().info("Created locators and receivers in 2 distributed systems"); + + //Site 1 + vm1.invoke(() -> WANTestBase.createSender( "ln1", 2, + false, 10, 1, false, false, null, true )); + + vm1.invoke(() -> WANTestBase.createReplicatedRegion("repRegion", "ln1", isOffHeap() )); + vm1.invoke(() -> WANTestBase.startSender( "ln1" )); + vm1.invoke(() -> WANTestBase.waitForSenderRunningState( "ln1" )); + + //Site 2 + vm3.invoke(() -> WANTestBase.createReplicatedRegion("repRegion", "ny1", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.createCache( nyPort )); + vm4.invoke(() -> WANTestBase.createSender( "ny1", 1, + false, 10, 1, false, false, null, true )); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion("repRegion", "ny1", isOffHeap() )); + vm4.invoke(() -> WANTestBase.startSender( "ny1" )); + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ny1" )); + + Wait.pause(2000); + + // Perform a put in vm1 + vm1.invoke(new CacheSerializableRunnable("Putting an entry in ds1") { + + @Override + public void run2() throws CacheException { + assertNotNull(cache); + + Region region = cache.getRegion("/repRegion"); + region.put("testKey", "testValue1"); + + assertEquals(1, region.size()); + } + }); + + //wait for vm4 to have later timestamp before sending operation to vm1 + Wait.pause(300); + + AsyncInvocation asynch = vm4.invokeAsync(new CacheSerializableRunnable("Putting an entry in ds2 in vm4") { + + @Override + public void run2() throws CacheException { + assertNotNull(cache); + Region region = cache.getRegion("/repRegion"); + + region.put("testKey", "testValue2"); + + assertEquals(1, region.size()); + assertEquals("testValue2", region.get("testKey")); + } + }); + + try { + asynch.join(2000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + //Wait for all local ds events be received by vm3. + Wait.pause(1000); + + vm3.invoke(new CacheSerializableRunnable("Check dsid") { + + @Override + public void run2() throws CacheException { + Region region = cache.getRegion("repRegion"); + + Region.Entry entry = ((LocalRegion)region).getEntry("testKey", /*null,*/ + true); //commented while merging revision 43582 + RegionEntry re = null; + if (entry instanceof EntrySnapshot) { + re = ((EntrySnapshot)entry).getRegionEntry(); + } else if (entry instanceof NonTXEntry) { + re = ((NonTXEntry)entry).getRegionEntry(); + } + VersionTag tag = re.getVersionStamp().asVersionTag(); + assertEquals(2, tag.getDistributedSystemId()); + } + }); + + // Check vm3 has latest timestamp from vm4. + long putAllTimeStampVm1 = (Long) vm4.invoke(() -> NewWANConcurrencyCheckForDestroyDUnitTest.getVersionTimestampAfterPutAllOp()); + + long putAllTimeStampVm3 = (Long) vm3.invoke(() -> NewWANConcurrencyCheckForDestroyDUnitTest.getVersionTimestampAfterPutAllOp()); + + assertEquals(putAllTimeStampVm1, putAllTimeStampVm3); + } + + /* + * For VM1 in ds 1. Used in testPutAllEventSequenceOnSerialGatewaySender. + */ + public static long getVersionTimestampAfterPutAllOp() { + Region region = cache.getRegion("repRegion"); + + while (!(region.containsKey("testKey") /*&& region.get("testKey").equals("testValue2") */)) { + Wait.pause(10); + } + assertEquals(1, region.size()); + + Region.Entry entry = ((LocalRegion)region).getEntry("testKey", /*null,*/ true); + RegionEntry re = null; + if (entry instanceof EntrySnapshot) { + re = ((EntrySnapshot)entry).getRegionEntry(); + } else if (entry instanceof NonTXEntry) { + re = ((NonTXEntry)entry).getRegionEntry(); + } + if (re != null) { + LogWriterUtils.getLogWriter().fine("RegionEntry for testKey: " + re.getKey() + " " + re.getValueInVM((LocalRegion) region)); + + VersionTag tag = re.getVersionStamp().asVersionTag(); + return tag.getVersionTimeStamp(); + } else { + return -1; + } + } + + /* + * For VM3 in ds 2. + */ + public static long getVersionTimestampAfterOp() { + Region region = cache.getRegion("repRegion"); + assertEquals(1, region.size()); + + region.destroy("testKey"); + + Region.Entry entry = ((LocalRegion)region).getEntry("testKey", /*null,*/ true); + RegionEntry re = ((EntrySnapshot)entry).getRegionEntry(); + LogWriterUtils.getLogWriter().fine("RegionEntry for testKey: " + re.getKey() + " " + re.getValueInVM((LocalRegion) region)); + assertTrue(re.getValueInVM((LocalRegion) region) instanceof Tombstone); + + VersionTag tag = re.getVersionStamp().asVersionTag(); + return tag.getVersionTimeStamp(); + } + + /* + * For VM 5 in ds 3. + */ + public static void verifyTimestampAfterOp(long timestamp, int memberid) { + Region region = cache.getRegion("repRegion"); + assertEquals(0, region.size()); + + Region.Entry entry = ((LocalRegion)region).getEntry("testKey", /*null,*/ true); + RegionEntry re = ((EntrySnapshot)entry).getRegionEntry(); + assertTrue(re.getValueInVM((LocalRegion) region) instanceof Tombstone); + + VersionTag tag = re.getVersionStamp().asVersionTag(); + assertEquals(timestamp, tag.getVersionTimeStamp()); + assertEquals(memberid, tag.getDistributedSystemId()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.java new file mode 100644 index 0000000..76940ea --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.java @@ -0,0 +1,469 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.misc; + +import static com.gemstone.gemfire.distributed.ConfigurationProperties.*; +import static com.gemstone.gemfire.test.dunit.Assert.*; + +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import com.jayway.awaitility.Awaitility; +import org.apache.geode.security.templates.SampleSecurityManager; +import org.apache.logging.log4j.Logger; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.gemstone.gemfire.cache.CacheFactory; +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.distributed.DistributedMember; +import com.gemstone.gemfire.distributed.DistributedSystem; +import com.gemstone.gemfire.internal.Assert; +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; +import com.gemstone.gemfire.internal.logging.LogService; +import com.gemstone.gemfire.security.AuthInitialize; +import com.gemstone.gemfire.security.AuthenticationFailedException; +import com.gemstone.gemfire.security.SecurityTestUtils; +import com.gemstone.gemfire.security.generator.CredentialGenerator; +import com.gemstone.gemfire.security.generator.DummyCredentialGenerator; +import com.gemstone.gemfire.security.templates.UserPasswordAuthInit; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +@Category(DistributedTest.class) +public class NewWanAuthenticationDUnitTest extends WANTestBase { + + public static final Logger logger = LogService.getLogger(); + + public static boolean isDifferentServerInGetCredentialCall = false; + + /** + * Authentication test for new WAN with valid credentials. Although, nothing + * related to authentication has been changed in new WAN, this test case is + * added on request from QA for defect 44650. + */ + @Test + public void testWanAuthValidCredentials() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + logger.info("Created locator on local site"); + + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + logger.info("Created locator on remote site"); + + + CredentialGenerator gen = new DummyCredentialGenerator(); + Properties extraProps = gen.getSystemProperties(); + + String clientauthenticator = gen.getAuthenticator(); + String clientauthInit = gen.getAuthInit(); + + Properties credentials1 = gen.getValidCredentials(1); + if (extraProps != null) { + credentials1.putAll(extraProps); + } + Properties javaProps1 = gen.getJavaProperties(); + + // vm3's invalid credentials + Properties credentials2 = gen.getInvalidCredentials(1); + if (extraProps != null) { + credentials2.putAll(extraProps); + } + Properties javaProps2 = gen.getJavaProperties(); + + Properties props1 = buildProperties(clientauthenticator, clientauthInit, + null, credentials1, null); + + // have vm 3 start a cache with invalid credentails + Properties props2 = buildProperties(clientauthenticator, clientauthInit, + null, credentials2, null); + + vm2.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache( + props1, javaProps1, lnPort )); + logger.info("Created secured cache in vm2"); + + vm3.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache( + props2, javaProps2, nyPort )); + logger.info("Created secured cache in vm3"); + + vm2.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + logger.info("Created sender in vm2"); + + vm3.invoke(() -> createReceiverInSecuredCache()); + logger.info("Created receiver in vm3"); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + logger.info("Created RR in vm2"); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + logger.info("Created RR in vm3"); + + // this tests verifies that even though vm3 has invalid credentials, vm2 can still send data to vm3 because + // vm2 has valid credentials + vm2.invoke(() -> WANTestBase.startSender( "ln" )); + vm2.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + + vm2.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1)); + vm3.invoke(() -> { + Region r = cache.getRegion(Region.SEPARATOR + getTestMethodName() + "_RR"); + Awaitility.waitAtMost(20, TimeUnit.SECONDS).until(() -> assertTrue(r.size() > 0)); + }); + logger.info("Done successfully."); + } + + @Test + public void testWanIntegratedSecurityWithValidCredentials() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + logger.info("Created locator on local site"); + + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + logger.info("Created locator on remote site"); + + + Properties props1 = buildSecurityProperties("admin", "secret"); + Properties props2 = buildSecurityProperties("guest", "guest"); + + vm2.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache( + props1, null, lnPort )); + logger.info("Created secured cache in vm2"); + + vm3.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache( + props2, null, nyPort )); + logger.info("Created secured cache in vm3"); + + vm2.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + logger.info("Created sender in vm2"); + + vm3.invoke(() -> createReceiverInSecuredCache()); + logger.info("Created receiver in vm3"); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + logger.info("Created RR in vm2"); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + logger.info("Created RR in vm3"); + + vm2.invoke(() -> WANTestBase.startSender( "ln" )); + vm2.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm2.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1)); + vm3.invoke(() -> { + Region r = cache.getRegion(Region.SEPARATOR + getTestMethodName() + "_RR"); + Awaitility.waitAtMost(20, TimeUnit.SECONDS).until(() -> assertTrue(r.size() > 0)); + + }); + logger.info("Done successfully."); + } + + /** + * Test authentication with new WAN with invalid credentials. Although, + * nothing related to authentication has been changed in new WAN, this test + * case is added on request from QA for defect 44650. + */ + @Test + public void testWanAuthInvalidCredentials() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + logger.info("Created locator on local site"); + + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + logger.info("Created locator on remote site"); + + + CredentialGenerator gen = new DummyCredentialGenerator(); + logger.info("Picked up credential: " + gen); + + Properties extraProps = gen.getSystemProperties(); + + String clientauthenticator = gen.getAuthenticator(); + String clientauthInit = gen.getAuthInit(); + + Properties credentials1 = gen.getInvalidCredentials(1); + if (extraProps != null) { + credentials1.putAll(extraProps); + } + Properties javaProps1 = gen.getJavaProperties(); + Properties credentials2 = gen.getInvalidCredentials(2); + if (extraProps != null) { + credentials2.putAll(extraProps); + } + Properties javaProps2 = gen.getJavaProperties(); + + Properties props1 = buildProperties(clientauthenticator, clientauthInit, + null, credentials1, null); + Properties props2 = buildProperties(clientauthenticator, clientauthInit, + null, credentials2, null); + + logger.info("Done building auth properties"); + + vm2.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache( + props1, javaProps1, lnPort )); + logger.info("Created secured cache in vm2"); + + vm3.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache( + props2, javaProps2, nyPort )); + logger.info("Created secured cache in vm3"); + + vm2.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + logger.info("Created sender in vm2"); + + vm3.invoke(() -> createReceiverInSecuredCache()); + logger.info("Created receiver in vm3"); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + logger.info("Created RR in vm2"); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + logger.info("Created RR in vm3"); + + try { + vm2.invoke(() -> WANTestBase.startSender( "ln" )); + fail("Authentication Failed: While starting the sender, an exception should have been thrown"); + } catch (Exception e) { + if (!(e.getCause().getCause() instanceof AuthenticationFailedException)) { + fail("Authentication is not working as expected", e); + } + } + } + + /** + * Test authentication with new WAN with invalid credentials. Although, + * nothing related to authentication has been changed in new WAN, this test + * case is added on request from QA for defect 44650. + */ + @Test + public void testWanSecurityManagerWithInvalidCredentials() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + logger.info("Created locator on local site"); + + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + logger.info("Created locator on remote site"); + + Properties props1 = buildSecurityProperties("admin", "wrongPswd"); + Properties props2 = buildSecurityProperties("guest", "wrongPswd"); + + logger.info("Done building auth properties"); + + vm2.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache( + props1, null, lnPort )); + logger.info("Created secured cache in vm2"); + + vm3.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache( + props2, null, nyPort )); + logger.info("Created secured cache in vm3"); + + vm2.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + logger.info("Created sender in vm2"); + + vm3.invoke(() -> createReceiverInSecuredCache()); + logger.info("Created receiver in vm3"); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + logger.info("Created RR in vm2"); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + logger.info("Created RR in vm3"); + + try { + vm2.invoke(() -> WANTestBase.startSender( "ln" )); + fail("Authentication Failed: While starting the sender, an exception should have been thrown"); + } catch (Exception e) { + if (!(e.getCause().getCause() instanceof AuthenticationFailedException)) { + fail("Authentication is not working as expected", e); + } + } + } + + private static Properties buildProperties(String clientauthenticator, + String clientAuthInit, String accessor, Properties extraAuthProps, + Properties extraAuthzProps) { + Properties authProps = new Properties(); + if (clientauthenticator != null) { + authProps.setProperty( + SECURITY_CLIENT_AUTHENTICATOR, + clientauthenticator); + } + if (accessor != null) { + authProps.setProperty(SECURITY_CLIENT_ACCESSOR, + accessor); + } + if (clientAuthInit != null) { + authProps.setProperty(SECURITY_CLIENT_AUTH_INIT, clientAuthInit); + } + if (extraAuthProps != null) { + authProps.putAll(extraAuthProps); + } + if (extraAuthzProps != null) { + authProps.putAll(extraAuthzProps); + } + return authProps; + } + + private static Properties buildSecurityProperties(String username, String password){ + Properties props = new Properties(); + props.put(SECURITY_MANAGER, SampleSecurityManager.class.getName()); + props.put("security-json", "org/apache/geode/security/templates/security.json"); + props.put(SECURITY_CLIENT_AUTH_INIT, UserPasswdAI.class.getName()); + props.put("security-username", username); + props.put("security-password", password); + return props; + } + + public static void createSecuredCache(Properties authProps, Object javaProps, Integer locPort) { + authProps.setProperty(MCAST_PORT, "0"); + authProps.setProperty(LOCATORS, "localhost[" + locPort + "]"); + + logger.info("Set the server properties to: " + authProps); + logger.info("Set the java properties to: " + javaProps); + + SecurityTestUtils tmpInstance = new SecurityTestUtils("temp"); + DistributedSystem ds = tmpInstance.createSystem(authProps, (Properties)javaProps); + assertNotNull(ds); + assertTrue(ds.isConnected()); + cache = CacheFactory.create(ds); + assertNotNull(cache); + } + + public static class UserPasswdAI extends UserPasswordAuthInit { + + public static AuthInitialize createAI() { + return new UserPasswdAI(); + } + + @Override + public Properties getCredentials(Properties props, + DistributedMember server, boolean isPeer) + throws AuthenticationFailedException { + boolean val = ( CacheFactory.getAnyInstance().getDistributedSystem().getDistributedMember().getProcessId() != server.getProcessId()); + Assert.assertTrue(val, "getCredentials: Server should be different"); + Properties p = super.getCredentials(props, server, isPeer); + if(val) { + isDifferentServerInGetCredentialCall = true; + CacheFactory.getAnyInstance().getLoggerI18n().convertToLogWriter().config("setting isDifferentServerInGetCredentialCall " + isDifferentServerInGetCredentialCall); + } else { + CacheFactory.getAnyInstance().getLoggerI18n().convertToLogWriter().config("setting22 isDifferentServerInGetCredentialCall " + isDifferentServerInGetCredentialCall); + } + return p; + } + } + + public static void verifyDifferentServerInGetCredentialCall(){ + Assert.assertTrue(isDifferentServerInGetCredentialCall, "verifyDifferentServerInGetCredentialCall: Server should be different"); + isDifferentServerInGetCredentialCall = false; + } + + @Test + public void testWanAuthValidCredentialsWithServer() { + disconnectAllFromDS(); + { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + logger.info("Created locator on local site"); + + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + logger.info("Created locator on remote site"); + + DummyCredentialGenerator gen = new DummyCredentialGenerator(); + gen.init(); + Properties extraProps = gen.getSystemProperties(); + + String clientauthenticator = gen.getAuthenticator(); + String clientauthInit = UserPasswdAI.class.getName() + ".createAI"; + + Properties credentials1 = gen.getValidCredentials(1); + if (extraProps != null) { + credentials1.putAll(extraProps); + } + Properties javaProps1 = gen.getJavaProperties(); + + Properties credentials2 = gen.getInvalidCredentials(2); + if (extraProps != null) { + credentials2.putAll(extraProps); + } + Properties javaProps2 = gen.getJavaProperties(); + + Properties props1 = buildProperties(clientauthenticator, clientauthInit, + null, credentials1, null); + Properties props2 = buildProperties(clientauthenticator, clientauthInit, + null, credentials2, null); + + vm2.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache( + props1, javaProps1, lnPort )); + logger.info("Created secured cache in vm2"); + + vm3.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache( + props2, javaProps2, nyPort )); + logger.info("Created secured cache in vm3"); + + vm2.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + logger.info("Created sender in vm2"); + + vm3.invoke(() -> createReceiverInSecuredCache()); + logger.info("Created receiver in vm3"); + + vm2.invoke(() -> WANTestBase.startSender( "ln" )); + vm2.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + + vm2.invoke(() -> verifyDifferentServerInGetCredentialCall()); + vm3.invoke(() -> verifyDifferentServerInGetCredentialCall()); + } + } + + @Test + public void testWanSecurityManagerAuthValidCredentialsWithServer() { + disconnectAllFromDS(); + { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + logger.info("Created locator on local site"); + + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + logger.info("Created locator on remote site"); + + Properties props1 = buildSecurityProperties("admin", "secret"); + Properties props2 = buildSecurityProperties("guest", "guest"); + + vm2.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache( + props1, null, lnPort )); + logger.info("Created secured cache in vm2"); + + vm3.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache( + props2, null, nyPort )); + logger.info("Created secured cache in vm3"); + + vm2.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + logger.info("Created sender in vm2"); + + vm3.invoke(() -> createReceiverInSecuredCache()); + logger.info("Created receiver in vm3"); + + vm2.invoke(() -> WANTestBase.startSender( "ln" )); + vm2.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + + vm2.invoke(() -> verifyDifferentServerInGetCredentialCall()); + + // this would fail for now because for integrated security, we are not sending the receiver's credentials back + // to the sender. Because in the old security implementation, even though the receiver's credentials are sent back to the sender + // the sender is not checking it. + //vm3.invoke(() -> verifyDifferentServerInGetCredentialCall()); + } + } +}
