GEODE-37 change package name from com.gemstone.gemfire (for ./geode-wan/src/test/java/com/gemstone/gemfire)to org.apache.geode for(to ./geode-wan/src/test/java/org/apache/geode)
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/f39e2394 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/f39e2394 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/f39e2394 Branch: refs/heads/feature/GEODE-37_2 Commit: f39e2394ba9af9b5ceb0268281dcc339ecb0b1ef Parents: 701c686 Author: Hitesh Khamesra <[email protected]> Authored: Tue Sep 13 15:43:20 2016 -0700 Committer: Hitesh Khamesra <[email protected]> Committed: Tue Sep 13 15:43:20 2016 -0700 ---------------------------------------------------------------------- .../cache/CacheXml70GatewayDUnitTest.java | 255 -- .../cache/CacheXml80GatewayDUnitTest.java | 150 - .../AnalyzeWANSerializablesJUnitTest.java | 90 - .../internal/cache/UpdateVersionDUnitTest.java | 962 ----- .../cache/wan/CacheClientNotifierDUnitTest.java | 276 -- .../cache/wan/Simple2CacheServerDUnitTest.java | 185 - .../gemfire/internal/cache/wan/WANTestBase.java | 3765 ------------------ ...oncurrentParallelGatewaySenderDUnitTest.java | 737 ---- ...ntParallelGatewaySenderOffHeapDUnitTest.java | 42 - ...allelGatewaySenderOperation_1_DUnitTest.java | 796 ---- ...allelGatewaySenderOperation_2_DUnitTest.java | 625 --- ...tSerialGatewaySenderOperationsDUnitTest.java | 120 - ...GatewaySenderOperationsOffHeapDUnitTest.java | 42 - .../ConcurrentWANPropagation_1_DUnitTest.java | 568 --- .../ConcurrentWANPropagation_2_DUnitTest.java | 448 --- .../cache/wan/disttx/DistTXWANDUnitTest.java | 182 - .../CommonParallelGatewaySenderDUnitTest.java | 460 --- ...onParallelGatewaySenderOffHeapDUnitTest.java | 42 - ...wWANConcurrencyCheckForDestroyDUnitTest.java | 532 --- .../wan/misc/NewWanAuthenticationDUnitTest.java | 469 --- .../cache/wan/misc/PDXNewWanDUnitTest.java | 767 ---- ...dRegion_ParallelWANPersistenceDUnitTest.java | 670 ---- ...dRegion_ParallelWANPropagationDUnitTest.java | 1063 ----- .../SenderWithTransportFilterDUnitTest.java | 228 -- ...downAllPersistentGatewaySenderDUnitTest.java | 206 - .../wan/misc/WANConfigurationJUnitTest.java | 601 --- .../wan/misc/WANLocatorServerDUnitTest.java | 192 - .../cache/wan/misc/WANSSLDUnitTest.java | 160 - .../wan/misc/WanAutoDiscoveryDUnitTest.java | 561 --- .../cache/wan/misc/WanValidationsDUnitTest.java | 1507 ------- ...tewaySenderOperation_2_OffHeapDUnitTest.java | 42 - ...tewaySenderOperation_2_OffHeapDUnitTest.java | 42 - ...GatewaySenderOperationsOffHeapDUnitTest.java | 44 - ...ewaySenderQueueOverflowOffHeapDUnitTest.java | 44 - .../ParallelWANConflationOffHeapDUnitTest.java | 44 - ...nceEnabledGatewaySenderOffHeapDUnitTest.java | 44 - ...ropagationConcurrentOpsOffHeapDUnitTest.java | 44 - .../ParallelWANPropagationOffHeapDUnitTest.java | 43 - ...erialGatewaySenderQueueOffHeapDUnitTest.java | 44 - ...nceEnabledGatewaySenderOffHeapDUnitTest.java | 44 - .../SerialWANPropagationOffHeapDUnitTest.java | 38 - ...ation_PartitionedRegionOffHeapDUnitTest.java | 39 - ...allelGatewaySenderOperation_2_DUnitTest.java | 48 - ...arallelGatewaySenderOperationsDUnitTest.java | 692 ---- ...llelGatewaySenderQueueOverflowDUnitTest.java | 500 --- .../ParallelWANConflationDUnitTest.java | 497 --- ...ersistenceEnabledGatewaySenderDUnitTest.java | 1593 -------- ...llelWANPropagationClientServerDUnitTest.java | 97 - ...lelWANPropagationConcurrentOpsDUnitTest.java | 285 -- .../ParallelWANPropagationDUnitTest.java | 1234 ------ ...ParallelWANPropagationLoopBackDUnitTest.java | 415 -- .../wan/parallel/ParallelWANStatsDUnitTest.java | 499 --- ...tewaySenderDistributedDeadlockDUnitTest.java | 405 -- ...rialGatewaySenderEventListenerDUnitTest.java | 383 -- .../SerialGatewaySenderOperationsDUnitTest.java | 665 ---- .../SerialGatewaySenderQueueDUnitTest.java | 327 -- ...ersistenceEnabledGatewaySenderDUnitTest.java | 547 --- .../serial/SerialWANPropagationDUnitTest.java | 1336 ------- .../SerialWANPropagationLoopBackDUnitTest.java | 513 --- ...NPropagation_PartitionedRegionDUnitTest.java | 412 -- .../SerialWANPropagationsFeatureDUnitTest.java | 338 -- .../wan/serial/SerialWANStatsDUnitTest.java | 588 --- .../wan/wancommand/WANCommandTestBase.java | 490 --- ...anCommandCreateGatewayReceiverDUnitTest.java | 630 --- .../WanCommandCreateGatewaySenderDUnitTest.java | 706 ---- ...WanCommandGatewayReceiverStartDUnitTest.java | 276 -- .../WanCommandGatewayReceiverStopDUnitTest.java | 281 -- .../WanCommandGatewaySenderStartDUnitTest.java | 400 -- .../WanCommandGatewaySenderStopDUnitTest.java | 352 -- .../wan/wancommand/WanCommandListDUnitTest.java | 381 -- .../WanCommandPauseResumeDUnitTest.java | 688 ---- .../wancommand/WanCommandStatusDUnitTest.java | 546 --- .../management/WANManagementDUnitTest.java | 513 --- .../ClusterConfigurationDUnitTest.java | 1013 ----- .../pulse/TestRemoteClusterDUnitTest.java | 272 -- .../geode/cache/CacheXml70GatewayDUnitTest.java | 255 ++ .../geode/cache/CacheXml80GatewayDUnitTest.java | 150 + .../AnalyzeWANSerializablesJUnitTest.java | 90 + .../internal/cache/UpdateVersionDUnitTest.java | 962 +++++ .../cache/wan/CacheClientNotifierDUnitTest.java | 276 ++ .../cache/wan/Simple2CacheServerDUnitTest.java | 185 + .../geode/internal/cache/wan/WANTestBase.java | 3765 ++++++++++++++++++ ...oncurrentParallelGatewaySenderDUnitTest.java | 737 ++++ ...ntParallelGatewaySenderOffHeapDUnitTest.java | 42 + ...allelGatewaySenderOperation_1_DUnitTest.java | 796 ++++ ...allelGatewaySenderOperation_2_DUnitTest.java | 625 +++ ...tSerialGatewaySenderOperationsDUnitTest.java | 120 + ...GatewaySenderOperationsOffHeapDUnitTest.java | 42 + .../ConcurrentWANPropagation_1_DUnitTest.java | 568 +++ .../ConcurrentWANPropagation_2_DUnitTest.java | 448 +++ .../cache/wan/disttx/DistTXWANDUnitTest.java | 182 + .../CommonParallelGatewaySenderDUnitTest.java | 460 +++ ...onParallelGatewaySenderOffHeapDUnitTest.java | 42 + ...wWANConcurrencyCheckForDestroyDUnitTest.java | 532 +++ .../wan/misc/NewWanAuthenticationDUnitTest.java | 469 +++ .../cache/wan/misc/PDXNewWanDUnitTest.java | 767 ++++ ...dRegion_ParallelWANPersistenceDUnitTest.java | 670 ++++ ...dRegion_ParallelWANPropagationDUnitTest.java | 1063 +++++ .../SenderWithTransportFilterDUnitTest.java | 228 ++ ...downAllPersistentGatewaySenderDUnitTest.java | 206 + .../wan/misc/WANConfigurationJUnitTest.java | 601 +++ .../wan/misc/WANLocatorServerDUnitTest.java | 192 + .../cache/wan/misc/WANSSLDUnitTest.java | 160 + .../wan/misc/WanAutoDiscoveryDUnitTest.java | 561 +++ .../cache/wan/misc/WanValidationsDUnitTest.java | 1507 +++++++ ...tewaySenderOperation_2_OffHeapDUnitTest.java | 42 + ...tewaySenderOperation_2_OffHeapDUnitTest.java | 42 + ...GatewaySenderOperationsOffHeapDUnitTest.java | 44 + ...ewaySenderQueueOverflowOffHeapDUnitTest.java | 44 + .../ParallelWANConflationOffHeapDUnitTest.java | 44 + ...nceEnabledGatewaySenderOffHeapDUnitTest.java | 44 + ...ropagationConcurrentOpsOffHeapDUnitTest.java | 44 + .../ParallelWANPropagationOffHeapDUnitTest.java | 43 + ...erialGatewaySenderQueueOffHeapDUnitTest.java | 44 + ...nceEnabledGatewaySenderOffHeapDUnitTest.java | 44 + .../SerialWANPropagationOffHeapDUnitTest.java | 38 + ...ation_PartitionedRegionOffHeapDUnitTest.java | 39 + ...allelGatewaySenderOperation_2_DUnitTest.java | 48 + ...arallelGatewaySenderOperationsDUnitTest.java | 692 ++++ ...llelGatewaySenderQueueOverflowDUnitTest.java | 500 +++ .../ParallelWANConflationDUnitTest.java | 497 +++ ...ersistenceEnabledGatewaySenderDUnitTest.java | 1593 ++++++++ ...llelWANPropagationClientServerDUnitTest.java | 97 + ...lelWANPropagationConcurrentOpsDUnitTest.java | 285 ++ .../ParallelWANPropagationDUnitTest.java | 1234 ++++++ ...ParallelWANPropagationLoopBackDUnitTest.java | 415 ++ .../wan/parallel/ParallelWANStatsDUnitTest.java | 499 +++ ...tewaySenderDistributedDeadlockDUnitTest.java | 405 ++ ...rialGatewaySenderEventListenerDUnitTest.java | 383 ++ .../SerialGatewaySenderOperationsDUnitTest.java | 665 ++++ .../SerialGatewaySenderQueueDUnitTest.java | 327 ++ ...ersistenceEnabledGatewaySenderDUnitTest.java | 547 +++ .../serial/SerialWANPropagationDUnitTest.java | 1336 +++++++ .../SerialWANPropagationLoopBackDUnitTest.java | 513 +++ ...NPropagation_PartitionedRegionDUnitTest.java | 412 ++ .../SerialWANPropagationsFeatureDUnitTest.java | 338 ++ .../wan/serial/SerialWANStatsDUnitTest.java | 588 +++ .../wan/wancommand/WANCommandTestBase.java | 490 +++ ...anCommandCreateGatewayReceiverDUnitTest.java | 630 +++ .../WanCommandCreateGatewaySenderDUnitTest.java | 706 ++++ ...WanCommandGatewayReceiverStartDUnitTest.java | 276 ++ .../WanCommandGatewayReceiverStopDUnitTest.java | 281 ++ .../WanCommandGatewaySenderStartDUnitTest.java | 400 ++ .../WanCommandGatewaySenderStopDUnitTest.java | 352 ++ .../wan/wancommand/WanCommandListDUnitTest.java | 381 ++ .../WanCommandPauseResumeDUnitTest.java | 688 ++++ .../wancommand/WanCommandStatusDUnitTest.java | 546 +++ .../management/WANManagementDUnitTest.java | 513 +++ .../ClusterConfigurationDUnitTest.java | 1013 +++++ .../pulse/TestRemoteClusterDUnitTest.java | 272 ++ 150 files changed, 35135 insertions(+), 35135 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/cache/CacheXml70GatewayDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/cache/CacheXml70GatewayDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/cache/CacheXml70GatewayDUnitTest.java deleted file mode 100644 index 3014b1b..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/cache/CacheXml70GatewayDUnitTest.java +++ /dev/null @@ -1,255 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache; - -import org.junit.experimental.categories.Category; -import org.junit.Test; - -import static org.junit.Assert.*; - -import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; -import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; -import com.gemstone.gemfire.test.junit.categories.DistributedTest; - -import java.util.Properties; -import java.util.Set; - -import org.junit.experimental.categories.Category; - -import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener; -import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue; -import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueueFactory; -import com.gemstone.gemfire.cache.wan.GatewayEventFilter; -import com.gemstone.gemfire.cache.wan.GatewayQueueEvent; -import com.gemstone.gemfire.cache.wan.GatewayReceiver; -import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory; -import com.gemstone.gemfire.cache.wan.GatewaySender; -import com.gemstone.gemfire.cache.wan.GatewaySenderFactory; -import com.gemstone.gemfire.cache.wan.GatewayTransportFilter; -import com.gemstone.gemfire.cache30.CacheXml70DUnitTest; -import com.gemstone.gemfire.cache30.CacheXmlTestCase; -import com.gemstone.gemfire.cache30.MyGatewayEventFilter1; -import com.gemstone.gemfire.cache30.MyGatewayTransportFilter1; -import com.gemstone.gemfire.cache30.MyGatewayTransportFilter2; -import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation; -import com.gemstone.gemfire.internal.cache.xmlcache.CacheXml; -import com.gemstone.gemfire.internal.cache.xmlcache.ParallelGatewaySenderCreation; -import com.gemstone.gemfire.internal.cache.xmlcache.RegionAttributesCreation; -import com.gemstone.gemfire.internal.cache.xmlcache.SerialGatewaySenderCreation; - -@Category(DistributedTest.class) -public class CacheXml70GatewayDUnitTest extends CacheXmlTestCase { - - public CacheXml70GatewayDUnitTest() { - super(); - } - - protected String getGemFireVersion() { - return CacheXml.VERSION_7_0; - } - - /** - * Added to test the scenario of defect #50600. - */ - @Test - public void testAsyncEventQueueWithGatewayEventFilter() { - getSystem(); - CacheCreation cache = new CacheCreation(); - - String id = "WBCLChannel"; - AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory(); - factory.setBatchSize(100); - factory.setBatchTimeInterval(500); - factory.setBatchConflationEnabled(true); - factory.setMaximumQueueMemory(200); - factory.setDiskSynchronous(true); - factory.setParallel(false); - factory.setDispatcherThreads(33); - factory.addGatewayEventFilter(new MyGatewayEventFilter()); - - AsyncEventListener eventListener = new CacheXml70DUnitTest.MyAsyncEventListener(); - AsyncEventQueue asyncEventQueue = factory.create(id, eventListener); - - RegionAttributesCreation attrs = new RegionAttributesCreation(); - attrs.addAsyncEventQueueId(asyncEventQueue.getId()); - cache.createRegion("UserRegion", attrs); - - testXml(cache); - Cache c = getCache(); - assertNotNull(c); - - Set<AsyncEventQueue> asyncEventQueuesOnCache = c.getAsyncEventQueues(); - assertTrue("Size of asyncEventQueues should be greater than 0", asyncEventQueuesOnCache.size() > 0); - - for (AsyncEventQueue asyncEventQueueOnCache : asyncEventQueuesOnCache) { - CacheXml70DUnitTest.validateAsyncEventQueue(asyncEventQueue, asyncEventQueueOnCache); - } - } - - @Test - public void testGatewayReceiver() throws Exception{ - getSystem(); - CacheCreation cache = new CacheCreation(); - - GatewayReceiverFactory gatewayReceiverFactory = cache.createGatewayReceiverFactory(); - gatewayReceiverFactory.setBindAddress(""); - gatewayReceiverFactory.setStartPort(20000); - gatewayReceiverFactory.setEndPort(29999); - gatewayReceiverFactory.setMaximumTimeBetweenPings(2000); - gatewayReceiverFactory.setSocketBufferSize(1500); - GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1(); - gatewayReceiverFactory.addGatewayTransportFilter(myStreamFilter1); - GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2(); - gatewayReceiverFactory.addGatewayTransportFilter(myStreamFilter2); - GatewayReceiver receiver1 = gatewayReceiverFactory.create(); - - receiver1.start(); - - testXml(cache); - Cache c = getCache(); - assertNotNull(c); - Set<GatewayReceiver> receivers = c.getGatewayReceivers(); - for(GatewayReceiver receiver : receivers){ - validateGatewayReceiver(receiver1, receiver); - } - } - - @Test - public void testParallelGatewaySender() throws CacheException{ - getSystem(); - CacheCreation cache = new CacheCreation(); - - GatewaySenderFactory gatewaySenderFactory = cache.createGatewaySenderFactory(); - gatewaySenderFactory.setParallel(true); - gatewaySenderFactory.setDispatcherThreads(13); - gatewaySenderFactory.setManualStart(true); - gatewaySenderFactory.setSocketBufferSize(1234); - gatewaySenderFactory.setSocketReadTimeout(1050); - gatewaySenderFactory.setBatchConflationEnabled(false); - gatewaySenderFactory.setBatchSize(88); - gatewaySenderFactory.setBatchTimeInterval(9); - gatewaySenderFactory.setPersistenceEnabled(true); - gatewaySenderFactory.setDiskStoreName("LNSender"); - gatewaySenderFactory.setDiskSynchronous(true); - gatewaySenderFactory.setMaximumQueueMemory(211); - gatewaySenderFactory.setAlertThreshold(35); - - GatewayEventFilter myEventFilter1 = new MyGatewayEventFilter1(); - gatewaySenderFactory.addGatewayEventFilter(myEventFilter1); - GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1(); - gatewaySenderFactory.addGatewayTransportFilter(myStreamFilter1); - GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2(); - gatewaySenderFactory.addGatewayTransportFilter(myStreamFilter2); - GatewaySender parallelGatewaySender = gatewaySenderFactory.create("LN", 2); - - testXml(cache); - Cache c = getCache(); - assertNotNull(c); - Set<GatewaySender> sendersOnCache = c.getGatewaySenders(); - for(GatewaySender sender : sendersOnCache){ - assertEquals(true, sender.isParallel()); - validateGatewaySender(parallelGatewaySender, sender); - } - } - - @Test - public void testSerialGatewaySender() throws CacheException{ - getSystem(); - CacheCreation cache = new CacheCreation(); - GatewaySenderFactory gatewaySenderFactory = cache.createGatewaySenderFactory(); - gatewaySenderFactory.setParallel(false); - gatewaySenderFactory.setManualStart(true); - gatewaySenderFactory.setSocketBufferSize(124); - gatewaySenderFactory.setSocketReadTimeout(1000); - gatewaySenderFactory.setBatchConflationEnabled(false); - gatewaySenderFactory.setBatchSize(100); - gatewaySenderFactory.setBatchTimeInterval(10); - gatewaySenderFactory.setPersistenceEnabled(true); - gatewaySenderFactory.setDiskStoreName("LNSender"); - gatewaySenderFactory.setDiskSynchronous(true); - gatewaySenderFactory.setMaximumQueueMemory(200); - gatewaySenderFactory.setAlertThreshold(30); - - GatewayEventFilter myEventFilter1 = new MyGatewayEventFilter1(); - gatewaySenderFactory.addGatewayEventFilter(myEventFilter1); - GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1(); - gatewaySenderFactory.addGatewayTransportFilter(myStreamFilter1); - GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2(); - gatewaySenderFactory.addGatewayTransportFilter(myStreamFilter2); - GatewaySender serialGatewaySender = gatewaySenderFactory.create("LN", 2); - - RegionAttributesCreation attrs = new RegionAttributesCreation(); - attrs.addGatewaySenderId(serialGatewaySender.getId()); - cache.createRegion("UserRegion", attrs); - - testXml(cache); - Cache c = getCache(); - assertNotNull(c); - Set<GatewaySender> sendersOnCache = c.getGatewaySenders(); - for(GatewaySender sender : sendersOnCache){ - assertEquals(false, sender.isParallel()); - validateGatewaySender(serialGatewaySender, sender); - } - } - - public static class MyGatewayEventFilter implements GatewayEventFilter, Declarable { - public void afterAcknowledgement(GatewayQueueEvent event) { - } - public boolean beforeEnqueue(GatewayQueueEvent event) { - return true; - } - public boolean beforeTransmit(GatewayQueueEvent event) { - return true; - } - public void close() { - } - public void init(Properties properties) { - } - } - - static void validateGatewayReceiver(GatewayReceiver receiver1, GatewayReceiver gatewayReceiver) { - assertEquals(receiver1.getHost(), gatewayReceiver.getHost()); - assertEquals(receiver1.getStartPort(), gatewayReceiver.getStartPort()); - assertEquals(receiver1.getEndPort(), gatewayReceiver.getEndPort()); - assertEquals(receiver1.getMaximumTimeBetweenPings(), gatewayReceiver.getMaximumTimeBetweenPings()); - assertEquals(receiver1.getSocketBufferSize(), gatewayReceiver.getSocketBufferSize()); - assertEquals(receiver1.getGatewayTransportFilters().size(), gatewayReceiver.getGatewayTransportFilters().size()); - } - - static void validateGatewaySender(GatewaySender sender1, GatewaySender gatewaySender) { - assertEquals(sender1.getId(), gatewaySender.getId()); - assertEquals(sender1.getRemoteDSId(), gatewaySender.getRemoteDSId()); - assertEquals(sender1.isParallel(), gatewaySender.isParallel()); - assertEquals(sender1.isBatchConflationEnabled(), gatewaySender.isBatchConflationEnabled()); - assertEquals(sender1.getBatchSize(), gatewaySender.getBatchSize()); - assertEquals(sender1.getBatchTimeInterval(), gatewaySender.getBatchTimeInterval()); - assertEquals(sender1.isPersistenceEnabled(), gatewaySender.isPersistenceEnabled()); - assertEquals(sender1.getDiskStoreName(),gatewaySender.getDiskStoreName()); - assertEquals(sender1.isDiskSynchronous(),gatewaySender.isDiskSynchronous()); - assertEquals(sender1.getMaximumQueueMemory(), gatewaySender.getMaximumQueueMemory()); - assertEquals(sender1.getAlertThreshold(), gatewaySender.getAlertThreshold()); - assertEquals(sender1.getGatewayEventFilters().size(), gatewaySender.getGatewayEventFilters().size()); - assertEquals(sender1.getGatewayTransportFilters().size(), gatewaySender.getGatewayTransportFilters().size()); - - boolean isParallel = sender1.isParallel(); - if (isParallel) { - assertTrue("sender should be instanceof Creation", sender1 instanceof ParallelGatewaySenderCreation); - } else { - assertTrue("sender should be instanceof Creation", sender1 instanceof SerialGatewaySenderCreation); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/cache/CacheXml80GatewayDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/cache/CacheXml80GatewayDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/cache/CacheXml80GatewayDUnitTest.java deleted file mode 100644 index c140ebc..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/cache/CacheXml80GatewayDUnitTest.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache; - -import org.junit.experimental.categories.Category; -import org.junit.Test; - -import static org.junit.Assert.*; - -import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; -import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; -import com.gemstone.gemfire.test.junit.categories.DistributedTest; - -import java.io.IOException; -import java.util.Properties; -import java.util.Set; - -import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue; -import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueueFactory; -import com.gemstone.gemfire.cache.wan.*; -import com.gemstone.gemfire.cache30.*; -import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation; -import com.gemstone.gemfire.internal.cache.xmlcache.CacheXml; - -@Category(DistributedTest.class) -public class CacheXml80GatewayDUnitTest extends CacheXmlTestCase { - - public CacheXml80GatewayDUnitTest() { - super(); - } - - protected String getGemFireVersion() { - return CacheXml.VERSION_8_0; - } - - @Test - public void testGatewayReceiverWithManualStartTRUE() throws CacheException{ - //getSystem(); - CacheCreation cache = new CacheCreation(); - - GatewayReceiverFactory gatewayReceiverFactory = cache.createGatewayReceiverFactory(); - gatewayReceiverFactory.setBindAddress(""); - gatewayReceiverFactory.setStartPort(20000); - gatewayReceiverFactory.setEndPort(29999); - gatewayReceiverFactory.setMaximumTimeBetweenPings(2000); - gatewayReceiverFactory.setSocketBufferSize(1500); - gatewayReceiverFactory.setManualStart(true); - GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1(); - gatewayReceiverFactory.addGatewayTransportFilter(myStreamFilter1); - GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2(); - gatewayReceiverFactory.addGatewayTransportFilter(myStreamFilter2); - GatewayReceiver receiver1 = gatewayReceiverFactory.create(); - try { - receiver1.start(); - } - catch (IOException e) { - fail("Could not start GatewayReceiver"); - } - testXml(cache); - Cache c = getCache(); - assertNotNull(c); - Set<GatewayReceiver> receivers = c.getGatewayReceivers(); - for(GatewayReceiver receiver : receivers){ - validateGatewayReceiver(receiver1, receiver); - } - } - - @Test - public void testAsyncEventQueueWithSubstitutionFilter() { - getSystem(); - CacheCreation cache = new CacheCreation(); - - // Create an AsyncEventQueue with GatewayEventSubstitutionFilter. - String id = getName(); - AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory(); - factory.setGatewayEventSubstitutionListener(new MyGatewayEventSubstitutionFilter()); - AsyncEventQueue queue = factory.create(id, new CacheXml70DUnitTest.MyAsyncEventListener()); - - // Verify the GatewayEventSubstitutionFilter is set on the AsyncEventQueue. - assertNotNull(queue.getGatewayEventSubstitutionFilter()); - - testXml(cache); - Cache c = getCache(); - assertNotNull(c); - - // Get the AsyncEventQueue. Verify the GatewayEventSubstitutionFilter is not null. - AsyncEventQueue queueOnCache = c.getAsyncEventQueue(id); - assertNotNull(queueOnCache); - assertNotNull(queueOnCache.getGatewayEventSubstitutionFilter()); - } - - @Test - public void testGatewaySenderWithSubstitutionFilter() { - getSystem(); - CacheCreation cache = new CacheCreation(); - - // Create a GatewaySender with GatewayEventSubstitutionFilter. - // Don't start the sender to avoid 'Locators must be configured before starting gateway-sender' exception. - String id = getName(); - GatewaySenderFactory factory = cache.createGatewaySenderFactory(); - factory.setManualStart(true); - factory.setGatewayEventSubstitutionFilter(new MyGatewayEventSubstitutionFilter()); - GatewaySender sender = factory.create(id, 2); - - // Verify the GatewayEventSubstitutionFilter is set on the GatewaySender. - assertNotNull(sender.getGatewayEventSubstitutionFilter()); - - testXml(cache); - Cache c = getCache(); - assertNotNull(c); - - // Get the GatewaySender. Verify the GatewayEventSubstitutionFilter is not null. - GatewaySender senderOnCache = c.getGatewaySender(id); - assertNotNull(senderOnCache); - assertNotNull(senderOnCache.getGatewayEventSubstitutionFilter()); - } - - protected void validateGatewayReceiver(GatewayReceiver receiver1, - GatewayReceiver gatewayReceiver){ - CacheXml70GatewayDUnitTest.validateGatewayReceiver(receiver1, gatewayReceiver); - assertEquals(receiver1.isManualStart(), gatewayReceiver.isManualStart()); - } - - public static class MyGatewayEventSubstitutionFilter implements GatewayEventSubstitutionFilter, Declarable { - - public Object getSubstituteValue(EntryEvent event) { - return event.getKey(); - } - - public void close() { - } - - public void init(Properties properties) { - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/codeAnalysis/AnalyzeWANSerializablesJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/codeAnalysis/AnalyzeWANSerializablesJUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/codeAnalysis/AnalyzeWANSerializablesJUnitTest.java deleted file mode 100755 index d94920b..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/codeAnalysis/AnalyzeWANSerializablesJUnitTest.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.codeAnalysis; - -import static org.junit.Assert.fail; - -import java.io.File; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.junit.AfterClass; -import org.junit.Assume; -import org.junit.Before; -import org.junit.experimental.categories.Category; - -import com.gemstone.gemfire.codeAnalysis.decode.CompiledClass; -import com.gemstone.gemfire.test.junit.categories.IntegrationTest; -import com.gemstone.gemfire.util.test.TestUtil; - -/** - * - */ -@Category(IntegrationTest.class) -public class AnalyzeWANSerializablesJUnitTest extends AnalyzeSerializablesJUnitTest { - - @Before - public void loadClasses() throws Exception { - if (classes.size() > 0) { - return; - } - System.out.println("loadClasses starting"); - List<String> excludedClasses = loadExcludedClasses(new File(TestUtil.getResourcePath(AnalyzeWANSerializablesJUnitTest.class, "excludedClasses.txt"))); - List<String> openBugs = loadOpenBugs(new File(TestUtil.getResourcePath(AnalyzeWANSerializablesJUnitTest.class, "openBugs.txt"))); - excludedClasses.addAll(openBugs); - - String cp = System.getProperty("java.class.path"); - System.out.println("java classpath is " + cp); - System.out.flush(); - String[] entries = cp.split(File.pathSeparator); - String buildDirName = - "geode-wan"+File.separatorChar - +"build"+File.separatorChar - +"classes"+File.separatorChar - +"main"; - String buildDir = null; - - for (int i=0; i<entries.length && buildDir==null; i++) { - System.out.println("examining '" + entries[i] + "'"); - System.out.flush(); - if (entries[i].endsWith(buildDirName)) { - buildDir = entries[i]; - } - } - if (buildDir != null) { - System.out.println("loading class files from " + buildDir); - System.out.flush(); - long start = System.currentTimeMillis(); - loadClassesFromBuild(new File(buildDir), excludedClasses); - long finish = System.currentTimeMillis(); - System.out.println("done loading " + classes.size() + " classes. elapsed time = " - + (finish-start)/1000 + " seconds"); - } - else { - fail("unable to find WAN classes"); - } - } - - @AfterClass - public static void cleanup() { - if (classes != null) { - classes.clear(); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/UpdateVersionDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/UpdateVersionDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/UpdateVersionDUnitTest.java deleted file mode 100644 index a1aec80..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/UpdateVersionDUnitTest.java +++ /dev/null @@ -1,962 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache; - -import static com.gemstone.gemfire.distributed.ConfigurationProperties.*; -import static org.junit.Assert.*; - -import java.io.File; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.HashSet; -import java.util.List; -import java.util.Properties; -import java.util.Set; -import java.util.StringTokenizer; - -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import com.gemstone.gemfire.cache.AttributesFactory; -import com.gemstone.gemfire.cache.Cache; -import com.gemstone.gemfire.cache.CacheException; -import com.gemstone.gemfire.cache.CacheFactory; -import com.gemstone.gemfire.cache.DataPolicy; -import com.gemstone.gemfire.cache.DiskStore; -import com.gemstone.gemfire.cache.DiskStoreFactory; -import com.gemstone.gemfire.cache.EntryNotFoundException; -import com.gemstone.gemfire.cache.Operation; -import com.gemstone.gemfire.cache.PartitionAttributesFactory; -import com.gemstone.gemfire.cache.Region; -import com.gemstone.gemfire.cache.Region.Entry; -import com.gemstone.gemfire.cache.Scope; -import com.gemstone.gemfire.cache.client.internal.LocatorDiscoveryCallbackAdapter; -import com.gemstone.gemfire.cache.wan.GatewayEventFilter; -import com.gemstone.gemfire.cache.wan.GatewayReceiver; -import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory; -import com.gemstone.gemfire.cache.wan.GatewaySender; -import com.gemstone.gemfire.cache.wan.GatewaySenderFactory; -import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; -import com.gemstone.gemfire.internal.AvailablePortHelper; -import com.gemstone.gemfire.internal.cache.LocalRegion.NonTXEntry; -import com.gemstone.gemfire.internal.cache.partitioned.PRLocallyDestroyedException; -import com.gemstone.gemfire.internal.cache.versions.VersionSource; -import com.gemstone.gemfire.internal.cache.versions.VersionStamp; -import com.gemstone.gemfire.internal.cache.versions.VersionTag; -import com.gemstone.gemfire.internal.cache.wan.InternalGatewaySenderFactory; -import com.gemstone.gemfire.test.dunit.Host; -import com.gemstone.gemfire.test.dunit.IgnoredException; -import com.gemstone.gemfire.test.dunit.Invoke; -import com.gemstone.gemfire.test.dunit.LogWriterUtils; -import com.gemstone.gemfire.test.dunit.SerializableCallable; -import com.gemstone.gemfire.test.dunit.SerializableRunnable; -import com.gemstone.gemfire.test.dunit.VM; -import com.gemstone.gemfire.test.dunit.Wait; -import com.gemstone.gemfire.test.dunit.WaitCriterion; -import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; -import com.gemstone.gemfire.test.junit.categories.DistributedTest; - -/** - * @since GemFire 7.0.1 - */ -@Category(DistributedTest.class) -public class UpdateVersionDUnitTest extends JUnit4DistributedTestCase { - - protected static final String regionName = "testRegion"; - protected static Cache cache; - private static Set<IgnoredException>expectedExceptions = new HashSet<IgnoredException>(); - - @Override - public final void preTearDown() throws Exception { - closeCache(); - Invoke.invokeInEveryVM(new SerializableRunnable() { public void run() { - closeCache(); - } }); - } - - @Test - public void testUpdateVersionAfterCreateWithSerialSender() { - Host host = Host.getHost(0); - VM vm0 = host.getVM(0); // server1 site1 - VM vm1 = host.getVM(1); // server2 site1 - - VM vm2 = host.getVM(2); // server1 site2 - VM vm3 = host.getVM(3); // server2 site2 - - final String key = "key-1"; - - // Site 1 - Integer lnPort = (Integer)vm0.invoke(() -> UpdateVersionDUnitTest.createFirstLocatorWithDSId( 1 )); - - vm0.invoke(() -> UpdateVersionDUnitTest.createCache( lnPort)); - vm0.invoke(() -> UpdateVersionDUnitTest.createSender( "ln1", 2, false, 10, 1, false, false, null, true )); - - vm0.invoke(() -> UpdateVersionDUnitTest.createPartitionedRegion(regionName, "ln1", 1, 1)); - vm0.invoke(() -> UpdateVersionDUnitTest.startSender( "ln1" )); - vm0.invoke(() -> UpdateVersionDUnitTest.waitForSenderRunningState( "ln1" )); - - //Site 2 - Integer nyPort = (Integer)vm2.invoke(() -> UpdateVersionDUnitTest.createFirstRemoteLocator( 2, lnPort )); - Integer nyRecPort = (Integer) vm2.invoke(() -> UpdateVersionDUnitTest.createReceiver( nyPort )); - - vm2.invoke(() -> UpdateVersionDUnitTest.createPartitionedRegion(regionName, "", 1, 1)); - vm3.invoke(() -> UpdateVersionDUnitTest.createCache( nyPort)); - vm3.invoke(() -> UpdateVersionDUnitTest.createPartitionedRegion(regionName, "", 1, 1)); - - final VersionTag tag = (VersionTag) vm0.invoke(new SerializableCallable("Update a single entry and get its version") { - - @Override - public Object call() throws CacheException { - Cache cache = CacheFactory.getAnyInstance(); - Region region = cache.getRegion(regionName); - assertTrue(region instanceof PartitionedRegion); - - region.put(key, "value-1"); - Entry entry = region.getEntry(key); - assertTrue(entry instanceof EntrySnapshot); - RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry(); - - VersionStamp stamp = regionEntry.getVersionStamp(); - - // Create a duplicate entry version tag from stamp with newer - // time-stamp. - VersionSource memberId = (VersionSource) cache.getDistributedSystem().getDistributedMember(); - VersionTag tag = VersionTag.create(memberId); - - int entryVersion = stamp.getEntryVersion()-1; - int dsid = stamp.getDistributedSystemId(); - long time = System.currentTimeMillis(); - - tag.setEntryVersion(entryVersion); - tag.setDistributedSystemId(dsid); - tag.setVersionTimeStamp(time); - tag.setIsRemoteForTesting(); - - EntryEventImpl event = createNewEvent((PartitionedRegion) region, tag, - entry.getKey(), "value-2"); - - ((LocalRegion) region).basicUpdate(event, false, true, 0L, false); - - // Verify the new stamp - entry = region.getEntry(key); - assertTrue(entry instanceof EntrySnapshot); - regionEntry = ((EntrySnapshot) entry).getRegionEntry(); - - stamp = regionEntry.getVersionStamp(); - assertEquals( - "Time stamp did NOT get updated by UPDATE_VERSION operation on LocalRegion", - time, stamp.getVersionTimeStamp()); - assertEquals(++entryVersion, stamp.getEntryVersion()); - assertEquals(dsid, stamp.getDistributedSystemId()); - - return stamp.asVersionTag(); - } - }); - - VersionTag remoteTag = (VersionTag) vm3.invoke(new SerializableCallable("Get timestamp from remote site") { - - @Override - public Object call() throws Exception { - - Cache cache = CacheFactory.getAnyInstance(); - final PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName); - - // wait for entry to be received - WaitCriterion wc = new WaitCriterion() { - public boolean done() { - Entry<?,?> entry = null; - try { - entry = region.getDataStore().getEntryLocally(0, key, false, false); - } catch (EntryNotFoundException e) { - // expected - } catch (ForceReattemptException e) { - // expected - } catch (PRLocallyDestroyedException e) { - throw new RuntimeException("unexpected exception", e); - } - if (entry != null) { - LogWriterUtils.getLogWriter().info("found entry " + entry); - } - return (entry != null); - } - - public String description() { - return "Expected "+key+" to be received on remote WAN site"; - } - }; - Wait.waitForCriterion(wc, 30000, 500, true); - - wc = new WaitCriterion() { - public boolean done() { - Entry entry = region.getEntry(key); - assertTrue(entry instanceof EntrySnapshot); - RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry(); - return regionEntry.getVersionStamp().getVersionTimeStamp() == tag.getVersionTimeStamp(); - } - public String description() { - return "waiting for timestamp to be updated"; - } - }; - Wait.waitForCriterion(wc, 30000, 500, true); - - Entry entry = region.getEntry(key); - assertTrue("entry class is wrong: " + entry, entry instanceof EntrySnapshot); - RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry(); - - VersionStamp stamp = regionEntry.getVersionStamp(); - - return stamp.asVersionTag(); - } - }); - - assertEquals("Local and remote site have different timestamps", tag.getVersionTimeStamp(), remoteTag.getVersionTimeStamp()); - } - - @Test - public void testUpdateVersionAfterCreateWithSerialSenderOnDR() { - Host host = Host.getHost(0); - VM vm0 = host.getVM(0); // server1 site1 - VM vm1 = host.getVM(1); // server2 site1 - - VM vm2 = host.getVM(2); // server1 site2 - VM vm3 = host.getVM(3); // server2 site2 - - final String key = "key-1"; - - // Site 1 - Integer lnPort = (Integer)vm0.invoke(() -> UpdateVersionDUnitTest.createFirstLocatorWithDSId( 1 )); - - vm0.invoke(() -> UpdateVersionDUnitTest.createCache( lnPort)); - vm0.invoke(() -> UpdateVersionDUnitTest.createSender( "ln1", 2, false, 10, 1, false, false, null, true )); - - vm0.invoke(() -> UpdateVersionDUnitTest.createReplicatedRegion(regionName, "ln1")); - vm0.invoke(() -> UpdateVersionDUnitTest.startSender( "ln1" )); - vm0.invoke(() -> UpdateVersionDUnitTest.waitForSenderRunningState( "ln1" )); - - //Site 2 - Integer nyPort = (Integer)vm2.invoke(() -> UpdateVersionDUnitTest.createFirstRemoteLocator( 2, lnPort )); - Integer nyRecPort = (Integer) vm2.invoke(() -> UpdateVersionDUnitTest.createReceiver( nyPort )); - - vm2.invoke(() -> UpdateVersionDUnitTest.createReplicatedRegion(regionName, "")); - vm3.invoke(() -> UpdateVersionDUnitTest.createCache( nyPort )); - vm3.invoke(() -> UpdateVersionDUnitTest.createReplicatedRegion(regionName, "")); - - final VersionTag tag = (VersionTag) vm0.invoke(new SerializableCallable("Update a single entry and get its version") { - - @Override - public Object call() throws CacheException { - Cache cache = CacheFactory.getAnyInstance(); - Region region = cache.getRegion(regionName); - assertTrue(region instanceof DistributedRegion); - - region.put(key, "value-1"); - Entry entry = region.getEntry(key); - assertTrue(entry instanceof NonTXEntry); - RegionEntry regionEntry = ((NonTXEntry) entry).getRegionEntry(); - - VersionStamp stamp = regionEntry.getVersionStamp(); - - // Create a duplicate entry version tag from stamp with newer - // time-stamp. - VersionSource memberId = (VersionSource) cache.getDistributedSystem().getDistributedMember(); - VersionTag tag = VersionTag.create(memberId); - - int entryVersion = stamp.getEntryVersion()-1; - int dsid = stamp.getDistributedSystemId(); - long time = System.currentTimeMillis(); - - tag.setEntryVersion(entryVersion); - tag.setDistributedSystemId(dsid); - tag.setVersionTimeStamp(time); - tag.setIsRemoteForTesting(); - - EntryEventImpl event = createNewEvent((DistributedRegion) region, tag, - entry.getKey(), "value-2"); - - ((LocalRegion) region).basicUpdate(event, false, true, 0L, false); - - // Verify the new stamp - entry = region.getEntry(key); - assertTrue(entry instanceof NonTXEntry); - regionEntry = ((NonTXEntry) entry).getRegionEntry(); - - stamp = regionEntry.getVersionStamp(); - assertEquals( - "Time stamp did NOT get updated by UPDATE_VERSION operation on LocalRegion", - time, stamp.getVersionTimeStamp()); - assertEquals(entryVersion+1, stamp.getEntryVersion()); - assertEquals(dsid, stamp.getDistributedSystemId()); - - return stamp.asVersionTag(); - } - }); - - VersionTag remoteTag = (VersionTag) vm3.invoke(new SerializableCallable("Get timestamp from remote site") { - - @Override - public Object call() throws Exception { - - Cache cache = CacheFactory.getAnyInstance(); - final Region region = cache.getRegion(regionName); - - // wait for entry to be received - WaitCriterion wc = new WaitCriterion() { - public boolean done() { - return (region.getEntry(key) != null); - } - - public String description() { - return "Expected key-1 to be received on remote WAN site"; - } - }; - Wait.waitForCriterion(wc, 30000, 500, true); - - wc = new WaitCriterion() { - public boolean done() { - Entry entry = region.getEntry(key); - assertTrue(entry instanceof NonTXEntry); - RegionEntry regionEntry = ((NonTXEntry) entry).getRegionEntry(); - return regionEntry.getVersionStamp().getVersionTimeStamp() == tag.getVersionTimeStamp(); - } - public String description() { - return "waiting for timestamp to be updated"; - } - }; - Wait.waitForCriterion(wc, 30000, 500, true); - - Entry entry = region.getEntry(key); - assertTrue(entry instanceof NonTXEntry); - RegionEntry regionEntry = ((NonTXEntry) entry).getRegionEntry(); - - VersionStamp stamp = regionEntry.getVersionStamp(); - - return stamp.asVersionTag(); - } - }); - - assertEquals("Local and remote site have different timestamps", tag.getVersionTimeStamp(), remoteTag.getVersionTimeStamp()); - } - - @Test - public void testUpdateVersionAfterCreateWithParallelSender() { - Host host = Host.getHost(0); - VM vm0 = host.getVM(0); // server1 site1 - VM vm1 = host.getVM(1); // server2 site1 - - VM vm2 = host.getVM(2); // server1 site2 - VM vm3 = host.getVM(3); // server2 site2 - - // Site 1 - Integer lnPort = (Integer)vm0.invoke(() -> UpdateVersionDUnitTest.createFirstLocatorWithDSId( 1 )); - - final String key = "key-1"; - - vm0.invoke(() -> UpdateVersionDUnitTest.createCache( lnPort)); - vm0.invoke(() -> UpdateVersionDUnitTest.createSender( "ln1", 2, true, 10, 1, false, false, null, true )); - - vm0.invoke(() -> UpdateVersionDUnitTest.createPartitionedRegion(regionName, "ln1", 1, 1)); - vm0.invoke(() -> UpdateVersionDUnitTest.startSender( "ln1" )); - vm0.invoke(() -> UpdateVersionDUnitTest.waitForSenderRunningState( "ln1" )); - - //Site 2 - Integer nyPort = (Integer)vm2.invoke(() -> UpdateVersionDUnitTest.createFirstRemoteLocator( 2, lnPort )); - Integer nyRecPort = (Integer) vm2.invoke(() -> UpdateVersionDUnitTest.createReceiver( nyPort )); - - vm2.invoke(() -> UpdateVersionDUnitTest.createPartitionedRegion(regionName, "", 1, 1)); - - vm3.invoke(() -> UpdateVersionDUnitTest.createCache( nyPort)); - vm3.invoke(() -> UpdateVersionDUnitTest.createPartitionedRegion(regionName, "", 1, 1)); - - final VersionTag tag = (VersionTag) vm0.invoke(new SerializableCallable("Put a single entry and get its version") { - - @Override - public Object call() throws CacheException { - Cache cache = CacheFactory.getAnyInstance(); - Region region = cache.getRegion(regionName); - assertTrue(region instanceof PartitionedRegion); - - region.put(key, "value-1"); - Entry entry = region.getEntry(key); - assertTrue(entry instanceof EntrySnapshot); - RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry(); - - VersionStamp stamp = regionEntry.getVersionStamp(); - - // Create a duplicate entry version tag from stamp with newer - // time-stamp. - VersionSource memberId = (VersionSource) cache.getDistributedSystem().getDistributedMember(); - VersionTag tag = VersionTag.create(memberId); - - int entryVersion = stamp.getEntryVersion()-1; - int dsid = stamp.getDistributedSystemId(); - long time = System.currentTimeMillis(); - - tag.setEntryVersion(entryVersion); - tag.setDistributedSystemId(dsid); - tag.setVersionTimeStamp(time); - tag.setIsRemoteForTesting(); - - EntryEventImpl event = createNewEvent((PartitionedRegion) region, tag, - entry.getKey(), "value-2"); - - ((LocalRegion) region).basicUpdate(event, false, true, 0L, false); - - // Verify the new stamp - entry = region.getEntry(key); - assertTrue(entry instanceof EntrySnapshot); - regionEntry = ((EntrySnapshot) entry).getRegionEntry(); - - stamp = regionEntry.getVersionStamp(); - assertEquals( - "Time stamp did NOT get updated by UPDATE_VERSION operation on LocalRegion", - time, stamp.getVersionTimeStamp()); - assertEquals(++entryVersion, stamp.getEntryVersion()); - assertEquals(dsid, stamp.getDistributedSystemId()); - - return stamp.asVersionTag(); - } - }); - - VersionTag remoteTag = (VersionTag) vm3.invoke(new SerializableCallable("Get timestamp from remote site") { - - @Override - public Object call() throws Exception { - - Cache cache = CacheFactory.getAnyInstance(); - final PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName); - - // wait for entry to be received - WaitCriterion wc = new WaitCriterion() { - public boolean done() { - Entry<?,?> entry = null; - try { - entry = region.getDataStore().getEntryLocally(0, key, false, false); - } catch (EntryNotFoundException e) { - // expected - } catch (ForceReattemptException e) { - // expected - } catch (PRLocallyDestroyedException e) { - throw new RuntimeException("unexpected exception", e); - } - if (entry != null) { - LogWriterUtils.getLogWriter().info("found entry " + entry); - } - return (entry != null); - } - - public String description() { - return "Expected key-1 to be received on remote WAN site"; - } - }; - Wait.waitForCriterion(wc, 30000, 500, true); - - wc = new WaitCriterion() { - public boolean done() { - Entry entry = region.getEntry(key); - assertTrue(entry instanceof EntrySnapshot); - RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry(); - return regionEntry.getVersionStamp().getVersionTimeStamp() == tag.getVersionTimeStamp(); - } - public String description() { - return "waiting for timestamp to be updated"; - } - }; - Wait.waitForCriterion(wc, 30000, 500, true); - - Entry entry = region.getEntry(key); - assertTrue(entry instanceof EntrySnapshot); - RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry(); - - VersionStamp stamp = regionEntry.getVersionStamp(); - - return stamp.asVersionTag(); - } - }); - - assertEquals("Local and remote site have different timestamps", tag.getVersionTimeStamp(), remoteTag.getVersionTimeStamp()); - } - - @Test - public void testUpdateVersionAfterCreateWithConcurrentSerialSender() { - Host host = Host.getHost(0); - VM vm0 = host.getVM(0); // server1 site1 - VM vm1 = host.getVM(1); // server2 site1 - - VM vm2 = host.getVM(2); // server1 site2 - VM vm3 = host.getVM(3); // server2 site2 - - // Site 1 - Integer lnPort = (Integer)vm0.invoke(() -> UpdateVersionDUnitTest.createFirstLocatorWithDSId( 1 )); - - final String key = "key-1"; - - vm0.invoke(() -> UpdateVersionDUnitTest.createCache( lnPort )); - vm0.invoke(() -> UpdateVersionDUnitTest.createConcurrentSender( "ln1", 2, false, 10, 2, false, false, null, true, 2 )); - - vm0.invoke(() -> UpdateVersionDUnitTest.createPartitionedRegion(regionName, "ln1", 1, 1)); - vm0.invoke(() -> UpdateVersionDUnitTest.startSender( "ln1" )); - vm0.invoke(() -> UpdateVersionDUnitTest.waitForSenderRunningState( "ln1" )); - - //Site 2 - Integer nyPort = (Integer)vm2.invoke(() -> UpdateVersionDUnitTest.createFirstRemoteLocator( 2, lnPort )); - Integer nyRecPort = (Integer) vm2.invoke(() -> UpdateVersionDUnitTest.createReceiver( nyPort )); - - vm2.invoke(() -> UpdateVersionDUnitTest.createPartitionedRegion(regionName, "", 1, 1)); - - vm3.invoke(() -> UpdateVersionDUnitTest.createCache( nyPort )); - vm3.invoke(() -> UpdateVersionDUnitTest.createPartitionedRegion(regionName, "", 1, 1)); - - final VersionTag tag = (VersionTag) vm0.invoke(new SerializableCallable("Put a single entry and get its version") { - - @Override - public Object call() throws CacheException { - Cache cache = CacheFactory.getAnyInstance(); - Region region = cache.getRegion(regionName); - assertTrue(region instanceof PartitionedRegion); - - region.put(key, "value-1"); - Entry entry = region.getEntry(key); - assertTrue(entry instanceof EntrySnapshot); - RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry(); - - VersionStamp stamp = regionEntry.getVersionStamp(); - - // Create a duplicate entry version tag from stamp with newer - // time-stamp. - VersionSource memberId = (VersionSource) cache.getDistributedSystem().getDistributedMember(); - VersionTag tag = VersionTag.create(memberId); - - int entryVersion = stamp.getEntryVersion()-1; - int dsid = stamp.getDistributedSystemId(); - long time = System.currentTimeMillis(); - - tag.setEntryVersion(entryVersion); - tag.setDistributedSystemId(dsid); - tag.setVersionTimeStamp(time); - tag.setIsRemoteForTesting(); - - EntryEventImpl event = createNewEvent((PartitionedRegion) region, tag, - entry.getKey(), "value-2"); - - ((LocalRegion) region).basicUpdate(event, false, true, 0L, false); - - // Verify the new stamp - entry = region.getEntry(key); - assertTrue(entry instanceof EntrySnapshot); - regionEntry = ((EntrySnapshot) entry).getRegionEntry(); - - stamp = regionEntry.getVersionStamp(); - assertEquals( - "Time stamp did NOT get updated by UPDATE_VERSION operation on LocalRegion", - time, stamp.getVersionTimeStamp()); - assertEquals(++entryVersion, stamp.getEntryVersion()); - assertEquals(dsid, stamp.getDistributedSystemId()); - - return stamp.asVersionTag(); - } - }); - - VersionTag remoteTag = (VersionTag) vm3.invoke(new SerializableCallable("Get timestamp from remote site") { - - @Override - public Object call() throws Exception { - - Cache cache = CacheFactory.getAnyInstance(); - final PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName); - - // wait for entry to be received - WaitCriterion wc = new WaitCriterion() { - public boolean done() { - Entry<?,?> entry = null; - try { - entry = region.getDataStore().getEntryLocally(0, key, false, false); - } catch (EntryNotFoundException e) { - // expected - } catch (ForceReattemptException e) { - // expected - } catch (PRLocallyDestroyedException e) { - throw new RuntimeException("unexpected exception", e); - } - if (entry != null) { - LogWriterUtils.getLogWriter().info("found entry " + entry); - } - return (entry != null); - } - - public String description() { - return "Expected key-1 to be received on remote WAN site"; - } - }; - Wait.waitForCriterion(wc, 30000, 500, true); - - wc = new WaitCriterion() { - public boolean done() { - Entry entry = region.getEntry(key); - assertTrue(entry instanceof EntrySnapshot); - RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry(); - return regionEntry.getVersionStamp().getVersionTimeStamp() == tag.getVersionTimeStamp(); - } - public String description() { - return "waiting for timestamp to be updated"; - } - }; - Wait.waitForCriterion(wc, 30000, 500, true); - - Entry entry = region.getEntry(key); - assertTrue(entry instanceof EntrySnapshot); - RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry(); - - VersionStamp stamp = regionEntry.getVersionStamp(); - - return stamp.asVersionTag(); - } - }); - - assertEquals("Local and remote site have different timestamps", tag.getVersionTimeStamp(), remoteTag.getVersionTimeStamp()); - } - - private VersionTagHolder createNewEvent(LocalRegion region, VersionTag tag, Object key, Object value) { - VersionTagHolder updateEvent = new VersionTagHolder(tag); - updateEvent.setOperation(Operation.UPDATE); - updateEvent.setRegion(region); - if (region instanceof PartitionedRegion) { - updateEvent.setKeyInfo(((PartitionedRegion)region).getKeyInfo(key)); - } else { - updateEvent.setKeyInfo(new KeyInfo(key, value, null)); - } - updateEvent.setNewValue(value); - updateEvent.setGenerateCallbacks(true); - updateEvent.distributedMember = region.getSystem().getDistributedMember(); - updateEvent.setNewEventId(region.getSystem()); - return updateEvent; - } - - /* - * Helper Methods - */ - - private static void createCache(Integer locPort) { - UpdateVersionDUnitTest test = new UpdateVersionDUnitTest(); - Properties props = test.getDistributedSystemProperties(); - props.setProperty(MCAST_PORT, "0"); - props.setProperty(LOCATORS, "localhost[" + locPort + "]"); - props.setProperty(LOG_LEVEL, LogWriterUtils.getDUnitLogLevel()); - props.setProperty(ENABLE_CLUSTER_CONFIGURATION, "false"); - props.setProperty(USE_CLUSTER_CONFIGURATION, "false"); - InternalDistributedSystem ds = test.getSystem(props); - cache = CacheFactory.create(ds); - IgnoredException ex = new IgnoredException("could not get remote locator information for remote site"); - cache.getLogger().info(ex.getAddMessage()); - expectedExceptions.add(ex); - ex = new IgnoredException("Pool ln1 is not available"); - cache.getLogger().info(ex.getAddMessage()); - expectedExceptions.add(ex); - } - - private static void closeCache() { - if (cache != null && !cache.isClosed()) { - for (IgnoredException expectedException: expectedExceptions) { - cache.getLogger().info(expectedException.getRemoveMessage()); - } - expectedExceptions.clear(); - cache.getDistributedSystem().disconnect(); - cache.close(); - } - cache = null; - } - - public static void createSender(String dsName, int remoteDsId, - boolean isParallel, Integer maxMemory, Integer batchSize, - boolean isConflation, boolean isPersistent, GatewayEventFilter filter, - boolean isManualStart) { - File persistentDirectory = new File(dsName + "_disk_" - + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); - persistentDirectory.mkdir(); - DiskStoreFactory dsf = cache.createDiskStoreFactory(); - File[] dirs1 = new File[] { persistentDirectory }; - if (isParallel) { - GatewaySenderFactory gateway = cache.createGatewaySenderFactory(); - gateway.setParallel(true); - gateway.setMaximumQueueMemory(maxMemory); - gateway.setBatchSize(batchSize); - gateway.setManualStart(isManualStart); - ((InternalGatewaySenderFactory) gateway) - .setLocatorDiscoveryCallback(new MyLocatorCallback()); - if (filter != null) { - gateway.addGatewayEventFilter(filter); - } - if (isPersistent) { - gateway.setPersistenceEnabled(true); - gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName) - .getName()); - } else { - DiskStore store = dsf.setDiskDirs(dirs1).create(dsName); - gateway.setDiskStoreName(store.getName()); - } - gateway.setBatchConflationEnabled(isConflation); - gateway.create(dsName, remoteDsId); - - } else { - GatewaySenderFactory gateway = cache.createGatewaySenderFactory(); - gateway.setMaximumQueueMemory(maxMemory); - gateway.setBatchSize(batchSize); - gateway.setManualStart(isManualStart); - ((InternalGatewaySenderFactory) gateway) - .setLocatorDiscoveryCallback(new MyLocatorCallback()); - if (filter != null) { - gateway.addGatewayEventFilter(filter); - } - gateway.setBatchConflationEnabled(isConflation); - if (isPersistent) { - gateway.setPersistenceEnabled(true); - gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName) - .getName()); - } else { - DiskStore store = dsf.setDiskDirs(dirs1).create(dsName); - gateway.setDiskStoreName(store.getName()); - } - gateway.create(dsName, remoteDsId); - } - } - - public static void createPartitionedRegion(String regionName, String senderIds, Integer redundantCopies, Integer totalNumBuckets){ - AttributesFactory fact = new AttributesFactory(); - if(senderIds!= null){ - StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); - while (tokenizer.hasMoreTokens()){ - String senderId = tokenizer.nextToken(); - fact.addGatewaySenderId(senderId); - } - } - PartitionAttributesFactory pFact = new PartitionAttributesFactory(); - pFact.setTotalNumBuckets(totalNumBuckets); - pFact.setRedundantCopies(redundantCopies); - pFact.setRecoveryDelay(0); - fact.setPartitionAttributes(pFact.create()); - Region r = cache.createRegionFactory(fact.create()).create(regionName); - assertNotNull(r); - } - - public static void createReplicatedRegion(String regionName, String senderIds){ - AttributesFactory fact = new AttributesFactory(); - if(senderIds!= null){ - StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); - while (tokenizer.hasMoreTokens()){ - String senderId = tokenizer.nextToken(); - fact.addGatewaySenderId(senderId); - } - } - fact.setDataPolicy(DataPolicy.REPLICATE); - fact.setScope(Scope.DISTRIBUTED_ACK); - Region r = cache.createRegionFactory(fact.create()).create(regionName); - assertNotNull(r); - } - - public static void waitForSenderRunningState(String senderId){ - Set<GatewaySender> senders = cache.getGatewaySenders(); - final GatewaySender sender = getGatewaySenderById(senders, senderId); - - WaitCriterion wc = new WaitCriterion() { - public boolean done() { - if (sender != null && sender.isRunning()) { - return true; - } - return false; - } - - public String description() { - return "Expected sender isRunning state to be true but is false"; - } - }; - Wait.waitForCriterion(wc, 300000, 500, true); - } - - public static Integer createFirstRemoteLocator(int dsId, int remoteLocPort) { - UpdateVersionDUnitTest test = new UpdateVersionDUnitTest(); - int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); - Properties props = test.getDistributedSystemProperties(); - props.setProperty(MCAST_PORT, "0"); - props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId); - props.setProperty(LOCATORS, "localhost[" + port + "]"); - props.setProperty(START_LOCATOR, "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost"); - props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]"); - props.setProperty(USE_CLUSTER_CONFIGURATION, "false"); - props.setProperty(ENABLE_CLUSTER_CONFIGURATION, "false"); - test.getSystem(props); - return port; - } - - public static void createConcurrentSender(String dsName, int remoteDsId, - boolean isParallel, Integer maxMemory, - Integer batchSize, boolean isConflation, boolean isPersistent, - GatewayEventFilter filter, boolean isManualStart, int concurrencyLevel) { - File persistentDirectory = new File(dsName +"_disk_"+System.currentTimeMillis()+"_" + VM.getCurrentVMNum()); - persistentDirectory.mkdir(); - DiskStoreFactory dsf = cache.createDiskStoreFactory(); - File [] dirs1 = new File[] {persistentDirectory}; - - if(isParallel) { - GatewaySenderFactory gateway = cache.createGatewaySenderFactory(); - gateway.setParallel(true); - gateway.setMaximumQueueMemory(maxMemory); - gateway.setBatchSize(batchSize); - gateway.setManualStart(isManualStart); - ((InternalGatewaySenderFactory)gateway).setLocatorDiscoveryCallback(new MyLocatorCallback()); - if (filter != null) { - gateway.addGatewayEventFilter(filter); - } - if(isPersistent) { - gateway.setPersistenceEnabled(true); - gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName()); - } - else { - DiskStore store = dsf.setDiskDirs(dirs1).create(dsName); - gateway.setDiskStoreName(store.getName()); - } - gateway.setBatchConflationEnabled(isConflation); - gateway.create(dsName, remoteDsId); - - }else { - GatewaySenderFactory gateway = cache.createGatewaySenderFactory(); - gateway.setMaximumQueueMemory(maxMemory); - gateway.setBatchSize(batchSize); - gateway.setManualStart(isManualStart); - ((InternalGatewaySenderFactory)gateway).setLocatorDiscoveryCallback(new MyLocatorCallback()); - if (filter != null) { - gateway.addGatewayEventFilter(filter); - } - gateway.setBatchConflationEnabled(isConflation); - if(isPersistent) { - gateway.setPersistenceEnabled(true); - gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName()); - } - else { - DiskStore store = dsf.setDiskDirs(dirs1).create(dsName); - gateway.setDiskStoreName(store.getName()); - } - gateway.setDispatcherThreads(concurrencyLevel); - gateway.create(dsName, remoteDsId); - } - } - - public static int createReceiver(int locPort) { - UpdateVersionDUnitTest test = new UpdateVersionDUnitTest(); - Properties props = test.getDistributedSystemProperties(); - props.setProperty(MCAST_PORT, "0"); - props.setProperty(LOCATORS, "localhost[" + locPort - + "]"); - - InternalDistributedSystem ds = test.getSystem(props); - cache = CacheFactory.create(ds); - GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); - int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); - fact.setStartPort(port); - fact.setEndPort(port); - GatewayReceiver receiver = fact.create(); - try { - receiver.start(); - } catch (IOException e) { - e.printStackTrace(); - fail("Test " + test.getName() + " failed to start GatewayReceiver on port " + port); - } - return port; - } - - public static void startSender(String senderId){ - Set<GatewaySender> senders = cache.getGatewaySenders(); - GatewaySender sender = null; - for(GatewaySender s : senders){ - if(s.getId().equals(senderId)){ - sender = s; - break; - } - } - sender.start(); - } - - protected static class MyLocatorCallback extends LocatorDiscoveryCallbackAdapter { - - private final Set discoveredLocators = new HashSet(); - - private final Set removedLocators = new HashSet(); - - @Override - public synchronized void locatorsDiscovered(List locators) { - discoveredLocators.addAll(locators); - notifyAll(); - } - - @Override - public synchronized void locatorsRemoved(List locators) { - removedLocators.addAll(locators); - notifyAll(); - } - - public boolean waitForDiscovery(InetSocketAddress locator, long time) - throws InterruptedException { - return waitFor(discoveredLocators, locator, time); - } - - public boolean waitForRemove(InetSocketAddress locator, long time) - throws InterruptedException { - return waitFor(removedLocators, locator, time); - } - - private synchronized boolean waitFor(Set set, InetSocketAddress locator, - long time) throws InterruptedException { - long remaining = time; - long endTime = System.currentTimeMillis() + time; - while (!set.contains(locator) && remaining >= 0) { - wait(remaining); - remaining = endTime - System.currentTimeMillis(); - } - return set.contains(locator); - } - - public synchronized Set getDiscovered() { - return new HashSet(discoveredLocators); - } - - public synchronized Set getRemoved() { - return new HashSet(removedLocators); - } - } - - private static GatewaySender getGatewaySenderById(Set<GatewaySender> senders, String senderId) { - for(GatewaySender s : senders){ - if(s.getId().equals(senderId)){ - return s; - } - } - //if none of the senders matches with the supplied senderId, return null - return null; - } - - public static Integer createFirstLocatorWithDSId(int dsId) { - UpdateVersionDUnitTest test = new UpdateVersionDUnitTest(); - int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); - Properties props = test.getDistributedSystemProperties(); - props.setProperty(MCAST_PORT, "0"); - props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId); - props.setProperty(LOCATORS, "localhost[" + port + "]"); - props.setProperty(ENABLE_CLUSTER_CONFIGURATION, "false"); - props.setProperty(USE_CLUSTER_CONFIGURATION, "false"); - props.setProperty(START_LOCATOR, "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost"); - test.getSystem(props); - return port; - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/CacheClientNotifierDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/CacheClientNotifierDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/CacheClientNotifierDUnitTest.java deleted file mode 100755 index 96d441c..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/CacheClientNotifierDUnitTest.java +++ /dev/null @@ -1,276 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan; - -import static com.gemstone.gemfire.distributed.ConfigurationProperties.*; -import static org.junit.Assert.*; - -import java.io.IOException; -import java.util.List; -import java.util.Properties; - -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import com.gemstone.gemfire.cache.AttributesFactory; -import com.gemstone.gemfire.cache.CacheFactory; -import com.gemstone.gemfire.cache.DataPolicy; -import com.gemstone.gemfire.cache.DiskStore; -import com.gemstone.gemfire.cache.EvictionAction; -import com.gemstone.gemfire.cache.EvictionAttributes; -import com.gemstone.gemfire.cache.Region; -import com.gemstone.gemfire.cache.RegionAttributes; -import com.gemstone.gemfire.cache.client.Pool; -import com.gemstone.gemfire.cache.client.PoolManager; -import com.gemstone.gemfire.cache.server.CacheServer; -import com.gemstone.gemfire.cache.server.ClientSubscriptionConfig; -import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; -import com.gemstone.gemfire.internal.AvailablePort; -import com.gemstone.gemfire.internal.cache.CacheServerImpl; -import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; -import com.gemstone.gemfire.internal.cache.ha.HAContainerRegion; -import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier; -import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerTestUtil; -import com.gemstone.gemfire.internal.logging.LogService; -import com.gemstone.gemfire.test.dunit.LogWriterUtils; -import com.gemstone.gemfire.test.dunit.SerializableRunnable; -import com.gemstone.gemfire.test.dunit.VM; -import com.gemstone.gemfire.test.dunit.Wait; -import com.gemstone.gemfire.test.dunit.WaitCriterion; -import com.gemstone.gemfire.test.junit.categories.DistributedTest; -import com.gemstone.gemfire.test.junit.categories.FlakyTest; - -@Category(DistributedTest.class) -public class CacheClientNotifierDUnitTest extends WANTestBase { - - private static final int NUM_KEYS = 10; - - private int createCacheServerWithCSC(VM vm, final boolean withCSC, final int capacity, - final String policy, final String diskStoreName) { - final int serverPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); - - SerializableRunnable createCacheServer = new SerializableRunnable() { - @Override - public void run() throws Exception { - CacheServerImpl server = (CacheServerImpl)cache.addCacheServer(); - server.setPort(serverPort); - if (withCSC) { - if (diskStoreName != null) { - DiskStore ds = cache.findDiskStore(diskStoreName); - if(ds == null) { - ds = cache.createDiskStoreFactory().create(diskStoreName); - } - } - ClientSubscriptionConfig csc = server.getClientSubscriptionConfig(); - csc.setCapacity(capacity); - csc.setEvictionPolicy(policy); - csc.setDiskStoreName(diskStoreName); - server.setHostnameForClients("localhost"); - //server.setGroups(new String[]{"serv"}); - } - try { - server.start(); - } catch (IOException e) { - com.gemstone.gemfire.test.dunit.Assert.fail("Failed to start server ", e); - } - } - }; - vm.invoke(createCacheServer); - return serverPort; - } - - private void checkCacheServer(VM vm, final int serverPort, final boolean withCSC, final int capacity) { - SerializableRunnable checkCacheServer = new SerializableRunnable() { - - @Override - public void run() throws Exception { - List<CacheServer> cacheServers = ((GemFireCacheImpl)cache).getCacheServersAndGatewayReceiver(); - CacheServerImpl server = null; - for (CacheServer cs:cacheServers) { - if (cs.getPort() == serverPort) { - server = (CacheServerImpl)cs; - break; - } - } - assertNotNull(server); - CacheClientNotifier ccn = server.getAcceptor().getCacheClientNotifier(); - HAContainerRegion haContainer = (HAContainerRegion)ccn.getHaContainer(); - if (server.getAcceptor().isGatewayReceiver()) { - assertNull(haContainer); - return; - } - Region internalRegion = haContainer.getMapForTest(); - RegionAttributes ra = internalRegion.getAttributes(); - EvictionAttributes ea = ra.getEvictionAttributes(); - if (withCSC) { - assertNotNull(ea); - assertEquals(capacity, ea.getMaximum()); - assertEquals(EvictionAction.OVERFLOW_TO_DISK, ea.getAction()); - } else { - assertNull(ea); - } - } - }; - vm.invoke(checkCacheServer); - } - - public static void closeACacheServer(final int serverPort) { - List<CacheServer> cacheServers = cache.getCacheServers(); - CacheServerImpl server = null; - for (CacheServer cs:cacheServers) { - if (cs.getPort() == serverPort) { - server = (CacheServerImpl)cs; - break; - } - } - assertNotNull(server); - server.stop(); - } - - private void verifyRegionSize(VM vm, final int expect) { - SerializableRunnable verifyRegionSize = new SerializableRunnable() { - @Override - public void run() throws Exception { - final Region region = cache.getRegion(getTestMethodName() + "_PR"); - - Wait.waitForCriterion(new WaitCriterion() { - public boolean done() { - return region.size() == expect; - } - public String description() { - return null; - } - }, 60000, 100, false); - assertEquals(expect, region.size()); - } - }; - vm.invoke(verifyRegionSize); - } - - /** - * The test will start several cache servers, including gateway receivers. - * Shutdown them and verify the CacheClientNotifier for each server is correct - */ - @Test - public void testNormalClient2MultipleCacheServer() throws Exception { - doMultipleCacheServer(false); - } - - public void doMultipleCacheServer(boolean durable) throws Exception { - /* test scenario: */ - /* create 1 GatewaySender on vm0 */ - /* create 1 GatewayReceiver on vm1 */ - /* create 2 cache servers on vm1, one with overflow. */ - /* verify if the cache server2 still has the overflow attributes */ - /* create 1 cache client1 on vm2 to register interest on cache server1 */ - /* create 1 cache client2 on vm3 to register interest on cache server1 */ - /* do some puts to GatewaySender on vm0 */ - - // create sender at ln - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - - // create receiver and cache servers will be at ny - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm1.invoke(() -> WANTestBase.createCache( nyPort )); - int receiverPort = vm1.invoke(() -> WANTestBase.createReceiver()); - checkCacheServer(vm1, receiverPort, false, 0); - - // create PR for receiver - vm1.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - - // create cache server1 with overflow - int serverPort = createCacheServerWithCSC(vm1, true, 3, "entry", "DEFAULT"); - checkCacheServer(vm1, serverPort, true, 3); - - // create cache server 2 - final int serverPort2 = createCacheServerWithCSC(vm1, false, 0, null, null); - // Currently, only the first cache server's overflow attributes will take effect - // It will be enhanced in GEODE-1102 - checkCacheServer(vm1, serverPort2, true, 3); - LogService.getLogger().info("receiverPort="+receiverPort+",serverPort="+serverPort+",serverPort2="+serverPort2); - - vm2.invoke(() -> createClientWithLocator(nyPort, "localhost", getTestMethodName() + "_PR", "123", durable)); - vm3.invoke(() -> createClientWithLocator(nyPort, "localhost", getTestMethodName() + "_PR", "124", durable)); - - vm0.invoke(() -> WANTestBase.createCache( lnPort )); - vm0.invoke(() -> WANTestBase.createSender( "ln", 2, false, 100, 400, false, false, null, true )); - vm0.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm0.invoke(() -> WANTestBase.startSender( "ln" )); - vm0.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", NUM_KEYS )); - - /* verify */ - verifyRegionSize(vm0, NUM_KEYS); - verifyRegionSize(vm1, NUM_KEYS); - verifyRegionSize(vm3, NUM_KEYS); - verifyRegionSize(vm2, NUM_KEYS); - - // close a cache server, then re-test - vm1.invoke(() -> closeACacheServer(serverPort2)); - - vm0.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", NUM_KEYS*2 )); - - /* verify */ - verifyRegionSize(vm0, NUM_KEYS*2); - verifyRegionSize(vm1, NUM_KEYS*2); - verifyRegionSize(vm3, NUM_KEYS*2); - verifyRegionSize(vm2, NUM_KEYS*2); - - disconnectAllFromDS(); - } - - public static void createClientWithLocator(int port0,String host, - String regionName, String clientId, boolean isDurable) { - WANTestBase test = new WANTestBase(); - Properties props = test.getDistributedSystemProperties(); - props.setProperty(MCAST_PORT, "0"); - props.setProperty(LOCATORS, ""); - if (isDurable) { - props.setProperty(DURABLE_CLIENT_ID, clientId); - props.setProperty(DURABLE_CLIENT_TIMEOUT, "" + 200); - } - - InternalDistributedSystem ds = test.getSystem(props); - cache = CacheFactory.create(ds); - - assertNotNull(cache); - CacheServerTestUtil.disableShufflingOfEndpoints(); - Pool p; - try { - p = PoolManager.createFactory().addLocator(host, port0) - .setPingInterval(250).setSubscriptionEnabled(true) - .setSubscriptionRedundancy(-1).setReadTimeout(2000) - .setSocketBufferSize(1000).setMinConnections(6).setMaxConnections(10) - .setRetryAttempts(3).create(regionName); - } finally { - CacheServerTestUtil.enableShufflingOfEndpoints(); - } - - AttributesFactory factory = new AttributesFactory(); - factory.setPoolName(p.getName()); - factory.setDataPolicy(DataPolicy.NORMAL); - RegionAttributes attrs = factory.create(); - region = cache.createRegion(regionName, attrs); - region.registerInterest("ALL_KEYS"); - assertNotNull(region); - if (isDurable) { - cache.readyForEvents(); - } - LogWriterUtils.getLogWriter().info( - "Distributed Region " + regionName + " created Successfully :" - + region.toString() + " in a "+(isDurable?"durable":"")+" client"); - } -}
