http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java deleted file mode 100644 index 6727f1b..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java +++ /dev/null @@ -1,3765 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan; - -import static com.gemstone.gemfire.distributed.ConfigurationProperties.*; -import static org.junit.Assert.*; - -import java.io.File; -import java.io.IOException; -import java.io.Serializable; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.StringTokenizer; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; - -import com.jayway.awaitility.Awaitility; -import org.junit.experimental.categories.Category; - -import com.gemstone.gemfire.cache.AttributesFactory; -import com.gemstone.gemfire.cache.AttributesMutator; -import com.gemstone.gemfire.cache.Cache; -import com.gemstone.gemfire.cache.CacheClosedException; -import com.gemstone.gemfire.cache.CacheFactory; -import com.gemstone.gemfire.cache.CacheListener; -import com.gemstone.gemfire.cache.CacheTransactionManager; -import com.gemstone.gemfire.cache.DataPolicy; -import com.gemstone.gemfire.cache.DiskStore; -import com.gemstone.gemfire.cache.DiskStoreFactory; -import com.gemstone.gemfire.cache.EntryEvent; -import com.gemstone.gemfire.cache.PartitionAttributesFactory; -import com.gemstone.gemfire.cache.Region; -import com.gemstone.gemfire.cache.RegionAttributes; -import com.gemstone.gemfire.cache.RegionDestroyedException; -import com.gemstone.gemfire.cache.RegionFactory; -import com.gemstone.gemfire.cache.Scope; -import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener; -import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue; -import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueueFactory; -import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl; -import com.gemstone.gemfire.cache.client.Pool; -import com.gemstone.gemfire.cache.client.PoolManager; -import com.gemstone.gemfire.cache.client.internal.LocatorDiscoveryCallbackAdapter; -import com.gemstone.gemfire.cache.client.internal.locator.wan.LocatorMembershipListener; -import com.gemstone.gemfire.cache.persistence.PartitionOfflineException; -import com.gemstone.gemfire.cache.server.CacheServer; -import com.gemstone.gemfire.cache.util.CacheListenerAdapter; -import com.gemstone.gemfire.cache.wan.GatewayEventFilter; -import com.gemstone.gemfire.cache.wan.GatewayQueueEvent; -import com.gemstone.gemfire.cache.wan.GatewayReceiver; -import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory; -import com.gemstone.gemfire.cache.wan.GatewaySender; -import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy; -import com.gemstone.gemfire.cache.wan.GatewaySenderFactory; -import com.gemstone.gemfire.cache.wan.GatewayTransportFilter; -import com.gemstone.gemfire.cache30.CacheTestCase; -import com.gemstone.gemfire.distributed.Locator; -import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; -import com.gemstone.gemfire.distributed.internal.InternalLocator; -import com.gemstone.gemfire.distributed.internal.ServerLocation; -import com.gemstone.gemfire.internal.AvailablePort; -import com.gemstone.gemfire.internal.AvailablePortHelper; -import com.gemstone.gemfire.internal.FileUtil; -import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId; -import com.gemstone.gemfire.internal.cache.BucketRegion; -import com.gemstone.gemfire.internal.cache.CacheConfig; -import com.gemstone.gemfire.internal.cache.CacheServerImpl; -import com.gemstone.gemfire.internal.cache.CustomerIDPartitionResolver; -import com.gemstone.gemfire.internal.cache.ForceReattemptException; -import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; -import com.gemstone.gemfire.internal.cache.LocalRegion; -import com.gemstone.gemfire.internal.cache.PartitionedRegion; -import com.gemstone.gemfire.internal.cache.RegionQueue; -import com.gemstone.gemfire.internal.cache.execute.data.CustId; -import com.gemstone.gemfire.internal.cache.execute.data.Customer; -import com.gemstone.gemfire.internal.cache.execute.data.Order; -import com.gemstone.gemfire.internal.cache.execute.data.OrderId; -import com.gemstone.gemfire.internal.cache.execute.data.Shipment; -import com.gemstone.gemfire.internal.cache.execute.data.ShipmentId; -import com.gemstone.gemfire.internal.cache.partitioned.PRLocallyDestroyedException; -import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerStats; -import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerTestUtil; -import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderEventProcessor; -import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue; -import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderEventProcessor; -import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue; -import com.gemstone.gemfire.internal.cache.wan.serial.ConcurrentSerialGatewaySenderEventProcessor; -import com.gemstone.gemfire.internal.cache.wan.serial.SerialGatewaySenderQueue; -import com.gemstone.gemfire.pdx.SimpleClass; -import com.gemstone.gemfire.pdx.SimpleClass1; -import com.gemstone.gemfire.test.dunit.Assert; -import com.gemstone.gemfire.test.dunit.AsyncInvocation; -import com.gemstone.gemfire.test.dunit.Host; -import com.gemstone.gemfire.test.dunit.IgnoredException; -import com.gemstone.gemfire.test.dunit.Invoke; -import com.gemstone.gemfire.test.dunit.LogWriterUtils; -import com.gemstone.gemfire.test.dunit.VM; -import com.gemstone.gemfire.test.dunit.Wait; -import com.gemstone.gemfire.test.dunit.WaitCriterion; -import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; -import com.gemstone.gemfire.test.junit.categories.DistributedTest; -import com.gemstone.gemfire.util.test.TestUtil; - -@Category(DistributedTest.class) -public class WANTestBase extends JUnit4DistributedTestCase { - - protected static Cache cache; - protected static Region region; - - protected static PartitionedRegion customerRegion; - protected static PartitionedRegion orderRegion; - protected static PartitionedRegion shipmentRegion; - - protected static final String customerRegionName = "CUSTOMER"; - protected static final String orderRegionName = "ORDER"; - protected static final String shipmentRegionName = "SHIPMENT"; - - protected static VM vm0; - protected static VM vm1; - protected static VM vm2; - protected static VM vm3; - protected static VM vm4; - protected static VM vm5; - protected static VM vm6; - protected static VM vm7; - - protected static QueueListener listener1; - protected static QueueListener listener2; - - protected static AsyncEventListener eventListener1 ; - protected static AsyncEventListener eventListener2 ; - - private static final long MAX_WAIT = 10000; - - protected static GatewayEventFilter eventFilter; - - protected static List<Integer> dispatcherThreads = - new ArrayList<Integer>(Arrays.asList(1, 3, 5)); - //this will be set for each test method run with one of the values from above list - protected static int numDispatcherThreadsForTheRun = 1; - - public WANTestBase() { - super(); - } - - /** - * @deprecated Use the no arg constructor, or better yet, don't construct this class - */ - @Deprecated - public WANTestBase(final String ignored) { - } - - @Override - public final void preSetUp() throws Exception { - final Host host = Host.getHost(0); - vm0 = host.getVM(0); - vm1 = host.getVM(1); - vm2 = host.getVM(2); - vm3 = host.getVM(3); - vm4 = host.getVM(4); - vm5 = host.getVM(5); - vm6 = host.getVM(6); - vm7 = host.getVM(7); - //Need to set the test name after the VMs are created - } - - @Override - public final void postSetUp() throws Exception { - //this is done to vary the number of dispatchers for sender - //during every test method run - shuffleNumDispatcherThreads(); - Invoke.invokeInEveryVM(() -> setNumDispatcherThreadsForTheRun(dispatcherThreads.get(0))); - IgnoredException.addIgnoredException("Connection refused"); - IgnoredException.addIgnoredException("Software caused connection abort"); - IgnoredException.addIgnoredException("Connection reset"); - postSetUpWANTestBase(); - } - - protected void postSetUpWANTestBase() throws Exception { - } - - public static void shuffleNumDispatcherThreads() { - Collections.shuffle(dispatcherThreads); - } - - public static void setNumDispatcherThreadsForTheRun(int numThreads) { - numDispatcherThreadsForTheRun = numThreads; - } - - public static void stopOldLocator() { - if (Locator.hasLocator()) { - Locator.getLocator().stop(); - } - } - - public static void createLocator(int dsId, int port, Set<String> localLocatorsList, Set<String> remoteLocatorsList){ - WANTestBase test = new WANTestBase(); - Properties props = test.getDistributedSystemProperties(); - props.setProperty(MCAST_PORT, "0"); - props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId); - StringBuffer localLocatorBuffer = new StringBuffer(localLocatorsList.toString()); - localLocatorBuffer.deleteCharAt(0); - localLocatorBuffer.deleteCharAt(localLocatorBuffer.lastIndexOf("]")); - String localLocator = localLocatorBuffer.toString(); - localLocator = localLocator.replace(" ", ""); - - props.setProperty(LOCATORS, localLocator); - props.setProperty(START_LOCATOR, "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost"); - StringBuffer remoteLocatorBuffer = new StringBuffer(remoteLocatorsList.toString()); - remoteLocatorBuffer.deleteCharAt(0); - remoteLocatorBuffer.deleteCharAt(remoteLocatorBuffer.lastIndexOf("]")); - String remoteLocator = remoteLocatorBuffer.toString(); - remoteLocator = remoteLocator.replace(" ", ""); - props.setProperty(REMOTE_LOCATORS, remoteLocator); - test.getSystem(props); - } - - public static Integer createFirstLocatorWithDSId(int dsId) { - stopOldLocator(); - WANTestBase test = new WANTestBase(); - int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); - Properties props = test.getDistributedSystemProperties(); - props.setProperty(MCAST_PORT, "0"); - props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId); - props.setProperty(LOCATORS, "localhost[" + port + "]"); - props.setProperty(START_LOCATOR, "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost"); - test.getSystem(props); - return port; - } - - public static Integer createFirstPeerLocator(int dsId) { - stopOldLocator(); - WANTestBase test = new WANTestBase(); - int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); - Properties props = test.getDistributedSystemProperties(); - props.setProperty(MCAST_PORT, "0"); - props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId); - props.setProperty(LOCATORS, "localhost[" + port + "]"); - props.setProperty(START_LOCATOR, "localhost[" + port + "],server=false,peer=true,hostname-for-clients=localhost"); - test.getSystem(props); - return port; - } - - public static Integer createSecondLocator(int dsId, int locatorPort) { - stopOldLocator(); - WANTestBase test = new WANTestBase(); - int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); - Properties props = test.getDistributedSystemProperties(); - props.setProperty(MCAST_PORT, "0"); - props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId); - props.setProperty(LOCATORS, "localhost[" + locatorPort + "]"); - props.setProperty(START_LOCATOR, "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost"); - test.getSystem(props); - return port; - } - - public static Integer createSecondPeerLocator(int dsId, int locatorPort) { - stopOldLocator(); - WANTestBase test = new WANTestBase(); - int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); - Properties props = test.getDistributedSystemProperties(); - props.setProperty(MCAST_PORT, "0"); - props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId); - props.setProperty(LOCATORS, "localhost[" + locatorPort + "]"); - props.setProperty(START_LOCATOR, "localhost[" + port + "],server=false,peer=true,hostname-for-clients=localhost"); - test.getSystem(props); - return port; - } - - public static Integer createFirstRemoteLocator(int dsId, int remoteLocPort) { - stopOldLocator(); - WANTestBase test = new WANTestBase(); - int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); - Properties props = test.getDistributedSystemProperties(); - props.setProperty(MCAST_PORT, "0"); - props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId); - props.setProperty(LOCATORS, "localhost[" + port + "]"); - props.setProperty(START_LOCATOR, "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost"); - props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]"); - test.getSystem(props); - return port; - } - - public static void bringBackLocatorOnOldPort(int dsId, int remoteLocPort, int oldPort) { - WANTestBase test = new WANTestBase(); - Properties props = test.getDistributedSystemProperties(); - props.put(LOG_LEVEL, "fine"); - props.setProperty(MCAST_PORT, "0"); - props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId); - props.setProperty(LOCATORS, "localhost[" + oldPort + "]"); - props.setProperty(START_LOCATOR, "localhost[" + oldPort + "],server=true,peer=true,hostname-for-clients=localhost"); - props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]"); - test.getSystem(props); - } - - - public static Integer createFirstRemotePeerLocator(int dsId, int remoteLocPort) { - stopOldLocator(); - WANTestBase test = new WANTestBase(); - int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); - Properties props = test.getDistributedSystemProperties(); - props.setProperty(MCAST_PORT, "0"); - props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId); - props.setProperty(LOCATORS, "localhost[" + port + "]"); - props.setProperty(START_LOCATOR, "localhost[" + port + "],server=false,peer=true,hostname-for-clients=localhost"); - props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]"); - test.getSystem(props); - return port; - } - - public static Integer createSecondRemoteLocator(int dsId, int localPort, - int remoteLocPort) { - stopOldLocator(); - WANTestBase test = new WANTestBase(); - int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); - Properties props = test.getDistributedSystemProperties(); - props.setProperty(MCAST_PORT, "0"); - props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId); - props.setProperty(LOCATORS, "localhost[" + localPort + "]"); - props.setProperty(START_LOCATOR, "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost"); - props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]"); - test.getSystem(props); - return port; - } - - public static Integer createSecondRemoteLocatorWithAPI(int dsId, int localPort, - int remoteLocPort, String hostnameForClients) - throws IOException - { - stopOldLocator(); - WANTestBase test = new WANTestBase(); - int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); - Properties props = test.getDistributedSystemProperties(); - props.setProperty(MCAST_PORT, "0"); - props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId); - props.setProperty(LOCATORS, "localhost[" + localPort + "]"); - props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]"); - Locator locator = Locator.startLocatorAndDS(0, null, InetAddress.getByName("localhost"), props, true, true, hostnameForClients); - return locator.getPort(); - } - - public static Integer createSecondRemotePeerLocator(int dsId, int localPort, - int remoteLocPort) { - stopOldLocator(); - WANTestBase test = new WANTestBase(); - int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); - Properties props = test.getDistributedSystemProperties(); - props.setProperty(MCAST_PORT, "0"); - props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId); - props.setProperty(LOCATORS, "localhost[" + localPort + "]"); - props.setProperty(START_LOCATOR, "localhost[" + port + "],server=false,peer=true,hostname-for-clients=localhost"); - props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]"); - test.getSystem(props); - return port; - } - - public static int createReceiverInSecuredCache() { - GatewayReceiverFactory fact = WANTestBase.cache.createGatewayReceiverFactory(); - int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); - fact.setStartPort(port); - fact.setEndPort(port); - fact.setManualStart(true); - GatewayReceiver receiver = fact.create(); - try { - receiver.start(); - } - catch (IOException e) { - e.printStackTrace(); - com.gemstone.gemfire.test.dunit.Assert.fail("Failed to start GatewayReceiver on port " + port, e); - } - return port; - } - - public static void createReplicatedRegion(String regionName, String senderIds, Boolean offHeap){ - IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class - .getName()); - IgnoredException exp1 = IgnoredException.addIgnoredException(InterruptedException.class - .getName()); - IgnoredException exp2 = IgnoredException.addIgnoredException(GatewaySenderException.class - .getName()); - try { - AttributesFactory fact = new AttributesFactory(); - if (senderIds != null) { - StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); - while (tokenizer.hasMoreTokens()) { - String senderId = tokenizer.nextToken(); - fact.addGatewaySenderId(senderId); - } - } - fact.setDataPolicy(DataPolicy.REPLICATE); - fact.setScope(Scope.DISTRIBUTED_ACK); - fact.setOffHeap(offHeap); - Region r = cache.createRegionFactory(fact.create()).create(regionName); - assertNotNull(r); - } finally { - exp.remove(); - exp1.remove(); - exp2.remove(); - } - } - - public static void createNormalRegion(String regionName, String senderIds){ - AttributesFactory fact = new AttributesFactory(); - if(senderIds!= null){ - StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); - while (tokenizer.hasMoreTokens()){ - String senderId = tokenizer.nextToken(); - fact.addGatewaySenderId(senderId); - } - } - fact.setDataPolicy(DataPolicy.NORMAL); - fact.setScope(Scope.DISTRIBUTED_ACK); - Region r = cache.createRegionFactory(fact.create()).create(regionName); - assertNotNull(r); - } - - public static void createPersistentReplicatedRegion(String regionName, String senderIds, Boolean offHeap){ - AttributesFactory fact = new AttributesFactory(); - if(senderIds!= null){ - StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); - while (tokenizer.hasMoreTokens()){ - String senderId = tokenizer.nextToken(); - fact.addGatewaySenderId(senderId); - } - } - fact.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE); - fact.setOffHeap(offHeap); - Region r = cache.createRegionFactory(fact.create()).create(regionName); - assertNotNull(r); - } - - public static void createReplicatedRegionWithAsyncEventQueue( - String regionName, String asyncQueueIds, Boolean offHeap) { - IgnoredException exp1 = IgnoredException.addIgnoredException(ForceReattemptException.class - .getName()); - try { - AttributesFactory fact = new AttributesFactory(); - if (asyncQueueIds != null) { - StringTokenizer tokenizer = new StringTokenizer(asyncQueueIds, ","); - while (tokenizer.hasMoreTokens()) { - String asyncQueueId = tokenizer.nextToken(); - fact.addAsyncEventQueueId(asyncQueueId); - } - } - fact.setDataPolicy(DataPolicy.REPLICATE); - fact.setOffHeap(offHeap); - RegionFactory regionFactory = cache.createRegionFactory(fact.create()); - Region r = regionFactory.create(regionName); - assertNotNull(r); - } finally { - exp1.remove(); - } - } - - public static void createReplicatedRegionWithSenderAndAsyncEventQueue( - String regionName, String senderIds, String asyncChannelId, Boolean offHeap) { - IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class - .getName()); - try { - AttributesFactory fact = new AttributesFactory(); - if (senderIds != null) { - StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); - while (tokenizer.hasMoreTokens()) { - String senderId = tokenizer.nextToken(); - fact.addGatewaySenderId(senderId); - } - } - fact.setDataPolicy(DataPolicy.REPLICATE); - fact.setScope(Scope.DISTRIBUTED_ACK); - fact.setOffHeap(offHeap); - RegionFactory regionFactory = cache.createRegionFactory(fact.create()); - regionFactory.addAsyncEventQueueId(asyncChannelId); - Region r = regionFactory.create(regionName); - assertNotNull(r); - } finally { - exp.remove(); - } - } - - public static void createReplicatedRegion(String regionName, String senderIds, Scope scope, DataPolicy policy, Boolean offHeap){ - AttributesFactory fact = new AttributesFactory(); - if(senderIds!= null){ - StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); - while (tokenizer.hasMoreTokens()){ - String senderId = tokenizer.nextToken(); - fact.addGatewaySenderId(senderId); - } - } - fact.setDataPolicy(policy); - fact.setScope(scope); - fact.setOffHeap(offHeap); - Region r = cache.createRegionFactory(fact.create()).create(regionName); - assertNotNull(r); - } - - public static void createAsyncEventQueue( - String asyncChannelId, boolean isParallel, - Integer maxMemory, Integer batchSize, boolean isConflation, - boolean isPersistent, String diskStoreName, boolean isDiskSynchronous) { - - if (diskStoreName != null) { - File directory = new File(asyncChannelId + "_disk_" - + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); - directory.mkdir(); - File[] dirs1 = new File[] { directory }; - DiskStoreFactory dsf = cache.createDiskStoreFactory(); - dsf.setDiskDirs(dirs1); - dsf.create(diskStoreName); - } - - AsyncEventListener asyncEventListener = new MyAsyncEventListener(); - - AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory(); - factory.setBatchSize(batchSize); - factory.setPersistent(isPersistent); - factory.setDiskStoreName(diskStoreName); - factory.setDiskSynchronous(isDiskSynchronous); - factory.setBatchConflationEnabled(isConflation); - factory.setMaximumQueueMemory(maxMemory); - factory.setParallel(isParallel); - //set dispatcher threads - factory.setDispatcherThreads(numDispatcherThreadsForTheRun); - factory.create(asyncChannelId, asyncEventListener); - } - - public static void createPartitionedRegion(String regionName, String senderIds, Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap){ - IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class - .getName()); - IgnoredException exp1 = IgnoredException.addIgnoredException(PartitionOfflineException.class - .getName()); - try { - AttributesFactory fact = new AttributesFactory(); - if (senderIds != null) { - StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); - while (tokenizer.hasMoreTokens()) { - String senderId = tokenizer.nextToken(); - fact.addGatewaySenderId(senderId); - } - } - PartitionAttributesFactory pfact = new PartitionAttributesFactory(); - pfact.setTotalNumBuckets(totalNumBuckets); - pfact.setRedundantCopies(redundantCopies); - pfact.setRecoveryDelay(0); - fact.setPartitionAttributes(pfact.create()); - fact.setOffHeap(offHeap); - Region r = cache.createRegionFactory(fact.create()).create(regionName); - assertNotNull(r); - } finally { - exp.remove(); - exp1.remove(); - } - } - - // TODO:OFFHEAP: add offheap flavor - public static void createPartitionedRegionWithPersistence(String regionName, - String senderIds, Integer redundantCopies, Integer totalNumBuckets) { - IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class - .getName()); - IgnoredException exp1 = IgnoredException.addIgnoredException(PartitionOfflineException.class - .getName()); - try { - AttributesFactory fact = new AttributesFactory(); - if (senderIds != null) { - StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); - while (tokenizer.hasMoreTokens()) { - String senderId = tokenizer.nextToken(); - fact.addGatewaySenderId(senderId); - } - } - PartitionAttributesFactory pfact = new PartitionAttributesFactory(); - pfact.setTotalNumBuckets(totalNumBuckets); - pfact.setRedundantCopies(redundantCopies); - pfact.setRecoveryDelay(0); - fact.setPartitionAttributes(pfact.create()); - fact.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); - Region r = cache.createRegionFactory(fact.create()).create(regionName); - assertNotNull(r); - } finally { - exp.remove(); - exp1.remove(); - } - } - public static void createColocatedPartitionedRegion(String regionName, - String senderIds, Integer redundantCopies, Integer totalNumBuckets, String colocatedWith) { - IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class - .getName()); - IgnoredException exp1 = IgnoredException.addIgnoredException(PartitionOfflineException.class - .getName()); - try { - AttributesFactory fact = new AttributesFactory(); - if (senderIds != null) { - StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); - while (tokenizer.hasMoreTokens()) { - String senderId = tokenizer.nextToken(); - fact.addGatewaySenderId(senderId); - } - } - PartitionAttributesFactory pfact = new PartitionAttributesFactory(); - pfact.setTotalNumBuckets(totalNumBuckets); - pfact.setRedundantCopies(redundantCopies); - pfact.setRecoveryDelay(0); - pfact.setColocatedWith(colocatedWith); - fact.setPartitionAttributes(pfact.create()); - Region r = cache.createRegionFactory(fact.create()).create(regionName); - assertNotNull(r); - } finally { - exp.remove(); - exp1.remove(); - } - } - - public static void addSenderThroughAttributesMutator(String regionName, - String senderIds){ - final Region r = cache.getRegion(Region.SEPARATOR + regionName); - assertNotNull(r); - AttributesMutator mutator = r.getAttributesMutator(); - mutator.addGatewaySenderId(senderIds); - } - - public static void addAsyncEventQueueThroughAttributesMutator( - String regionName, String queueId) { - final Region r = cache.getRegion(Region.SEPARATOR + regionName); - assertNotNull(r); - AttributesMutator mutator = r.getAttributesMutator(); - mutator.addAsyncEventQueueId(queueId); - } - - public static void createPartitionedRegionAsAccessor( - String regionName, String senderIds, Integer redundantCopies, Integer totalNumBuckets){ - AttributesFactory fact = new AttributesFactory(); - if(senderIds!= null){ - StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); - while (tokenizer.hasMoreTokens()){ - String senderId = tokenizer.nextToken(); - fact.addGatewaySenderId(senderId); - } - } - PartitionAttributesFactory pfact = new PartitionAttributesFactory(); - pfact.setTotalNumBuckets(totalNumBuckets); - pfact.setRedundantCopies(redundantCopies); - pfact.setLocalMaxMemory(0); - fact.setPartitionAttributes(pfact.create()); - Region r = cache.createRegionFactory(fact.create()).create(regionName); - assertNotNull(r); - } - - public static void createPartitionedRegionWithSerialParallelSenderIds(String regionName, String serialSenderIds, String parallelSenderIds, String colocatedWith, Boolean offHeap){ - AttributesFactory fact = new AttributesFactory(); - if (serialSenderIds != null) { - StringTokenizer tokenizer = new StringTokenizer(serialSenderIds, ","); - while (tokenizer.hasMoreTokens()) { - String senderId = tokenizer.nextToken(); - fact.addGatewaySenderId(senderId); - } - } - if (parallelSenderIds != null) { - StringTokenizer tokenizer = new StringTokenizer(parallelSenderIds, ","); - while (tokenizer.hasMoreTokens()) { - String senderId = tokenizer.nextToken(); - fact.addGatewaySenderId(senderId); - } - } - PartitionAttributesFactory pfact = new PartitionAttributesFactory(); - pfact.setColocatedWith(colocatedWith); - fact.setPartitionAttributes(pfact.create()); - fact.setOffHeap(offHeap); - Region r = cache.createRegionFactory(fact.create()).create(regionName); - assertNotNull(r); - } - - public static void createPersistentPartitionedRegion( - String regionName, - String senderIds, - Integer redundantCopies, - Integer totalNumBuckets, - Boolean offHeap){ - - IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class - .getName()); - IgnoredException exp1 = IgnoredException.addIgnoredException(PartitionOfflineException.class - .getName()); - try { - - AttributesFactory fact = new AttributesFactory(); - if (senderIds != null) { - StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); - while (tokenizer.hasMoreTokens()) { - String senderId = tokenizer.nextToken(); - fact.addGatewaySenderId(senderId); - } - } - PartitionAttributesFactory pfact = new PartitionAttributesFactory(); - pfact.setTotalNumBuckets(totalNumBuckets); - pfact.setRedundantCopies(redundantCopies); - fact.setPartitionAttributes(pfact.create()); - fact.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); - fact.setOffHeap(offHeap); - Region r = cache.createRegionFactory(fact.create()).create(regionName); - assertNotNull(r); - } finally { - exp.remove(); - exp1.remove(); - } - } - - public static void createCustomerOrderShipmentPartitionedRegion( - String senderIds, Integer redundantCopies, - Integer totalNumBuckets, Boolean offHeap) { - IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class - .getName()); - try { - AttributesFactory fact = new AttributesFactory(); - if (senderIds != null) { - StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); - while (tokenizer.hasMoreTokens()) { - String senderId = tokenizer.nextToken(); - fact.addGatewaySenderId(senderId); - } - } - - PartitionAttributesFactory paf = new PartitionAttributesFactory(); - paf.setRedundantCopies(redundantCopies) - .setTotalNumBuckets(totalNumBuckets) - .setPartitionResolver( - new CustomerIDPartitionResolver("CustomerIDPartitionResolver")); - fact.setPartitionAttributes(paf.create()); - fact.setOffHeap(offHeap); - customerRegion = (PartitionedRegion)cache.createRegionFactory( - fact.create()).create(customerRegionName); - assertNotNull(customerRegion); - LogWriterUtils.getLogWriter().info( - "Partitioned Region CUSTOMER created Successfully :" - + customerRegion.toString()); - - paf = new PartitionAttributesFactory(); - paf.setRedundantCopies(redundantCopies) - .setTotalNumBuckets(totalNumBuckets) - .setColocatedWith(customerRegionName) - .setPartitionResolver( - new CustomerIDPartitionResolver("CustomerIDPartitionResolver")); - fact = new AttributesFactory(); - if (senderIds != null) { - StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); - while (tokenizer.hasMoreTokens()) { - String senderId = tokenizer.nextToken(); - fact.addGatewaySenderId(senderId); - } - } - fact.setPartitionAttributes(paf.create()); - fact.setOffHeap(offHeap); - orderRegion = (PartitionedRegion)cache.createRegionFactory(fact.create()) - .create(orderRegionName); - assertNotNull(orderRegion); - LogWriterUtils.getLogWriter().info( - "Partitioned Region ORDER created Successfully :" - + orderRegion.toString()); - - paf = new PartitionAttributesFactory(); - paf.setRedundantCopies(redundantCopies) - .setTotalNumBuckets(totalNumBuckets) - .setColocatedWith(orderRegionName) - .setPartitionResolver( - new CustomerIDPartitionResolver("CustomerIDPartitionResolver")); - fact = new AttributesFactory(); - if (senderIds != null) { - StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); - while (tokenizer.hasMoreTokens()) { - String senderId = tokenizer.nextToken(); - fact.addGatewaySenderId(senderId); - } - } - fact.setPartitionAttributes(paf.create()); - fact.setOffHeap(offHeap); - shipmentRegion = (PartitionedRegion)cache.createRegionFactory( - fact.create()).create(shipmentRegionName); - assertNotNull(shipmentRegion); - LogWriterUtils.getLogWriter().info( - "Partitioned Region SHIPMENT created Successfully :" - + shipmentRegion.toString()); - } finally { - exp.remove(); - } - } - - public static void createColocatedPartitionedRegions(String regionName, String senderIds, Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap){ - AttributesFactory fact = new AttributesFactory(); - if(senderIds!= null){ - StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); - while (tokenizer.hasMoreTokens()){ - String senderId = tokenizer.nextToken(); - fact.addGatewaySenderId(senderId); - } - } - PartitionAttributesFactory pfact = new PartitionAttributesFactory(); - pfact.setTotalNumBuckets(totalNumBuckets); - pfact.setRedundantCopies(redundantCopies); - fact.setPartitionAttributes(pfact.create()); - fact.setOffHeap(offHeap); - Region r = cache.createRegionFactory(fact.create()).create(regionName); - assertNotNull(r); - - pfact.setColocatedWith(r.getName()); - fact.setPartitionAttributes(pfact.create()); - fact.setOffHeap(offHeap); - Region r1 = cache.createRegionFactory(fact.create()).create(regionName+"_child1"); - assertNotNull(r1); - - Region r2 = cache.createRegionFactory(fact.create()).create(regionName+"_child2"); - assertNotNull(r2); - } - - public static void createColocatedPartitionedRegions2 (String regionName, String senderIds, Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap){ - AttributesFactory fact = new AttributesFactory(); - if(senderIds!= null){ - StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); - while (tokenizer.hasMoreTokens()){ - String senderId = tokenizer.nextToken(); - fact.addGatewaySenderId(senderId); - } - } - PartitionAttributesFactory pfact = new PartitionAttributesFactory(); - pfact.setTotalNumBuckets(totalNumBuckets); - pfact.setRedundantCopies(redundantCopies); - fact.setPartitionAttributes(pfact.create()); - fact.setOffHeap(offHeap); - Region r = cache.createRegionFactory(fact.create()).create(regionName); - assertNotNull(r); - - - fact = new AttributesFactory(); - pfact.setColocatedWith(r.getName()); - fact.setPartitionAttributes(pfact.create()); - fact.setOffHeap(offHeap); - Region r1 = cache.createRegionFactory(fact.create()).create(regionName+"_child1"); - assertNotNull(r1); - - Region r2 = cache.createRegionFactory(fact.create()).create(regionName+"_child2"); - assertNotNull(r2); - } - - public static void createCacheInVMs(Integer locatorPort, VM... vms) { - for (VM vm : vms) { - vm.invoke(() -> createCache(locatorPort)); - } - } - - public static void addListenerToSleepAfterCreateEvent(int milliSeconds, final String regionName) { - cache.getRegion(regionName).getAttributesMutator() - .addCacheListener(new CacheListenerAdapter<Object, Object>() { - @Override - public void afterCreate(final EntryEvent<Object, Object> event) { - try { - Thread.sleep(milliSeconds); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - }); - } - - private static CacheListener myListener; - public static void longPauseAfterNumEvents(int numEvents, int milliSeconds) { - myListener = new CacheListenerAdapter<Object, Object>() { - @Override - public void afterCreate(final EntryEvent<Object, Object> event) { - try { - if (event.getRegion().size() >= numEvents){ - Thread.sleep(milliSeconds); - } - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - }; - cache.getRegion(getTestMethodName() + "_RR_1").getAttributesMutator() - .addCacheListener(myListener); - } - - public static void removeCacheListener() { - cache.getRegion(getTestMethodName() + "_RR_1").getAttributesMutator() - .removeCacheListener(myListener); - - } - - - public static void createCache(Integer locPort){ - createCache(false, locPort); - } - public static void createManagementCache(Integer locPort){ - createCache(true, locPort); - } - - public static void createCacheConserveSockets(Boolean conserveSockets,Integer locPort){ - WANTestBase test = new WANTestBase(); - Properties props = test.getDistributedSystemProperties(); - props.setProperty(MCAST_PORT, "0"); - props.setProperty(LOCATORS, "localhost[" + locPort + "]"); - props.setProperty(CONSERVE_SOCKETS, conserveSockets.toString()); - InternalDistributedSystem ds = test.getSystem(props); - cache = CacheFactory.create(ds); - } - - protected static void createCache(boolean management, Integer locPort) { - WANTestBase test = new WANTestBase(); - Properties props = test.getDistributedSystemProperties(); - if (management) { - props.setProperty(JMX_MANAGER, "true"); - props.setProperty(JMX_MANAGER_START, "false"); - props.setProperty(JMX_MANAGER_PORT, "0"); - props.setProperty(JMX_MANAGER_HTTP_PORT, "0"); - } - props.setProperty(MCAST_PORT, "0"); - props.setProperty(LOCATORS, "localhost[" + locPort + "]"); - InternalDistributedSystem ds = test.getSystem(props); - cache = CacheFactory.create(ds); - } - - protected static void createCacheWithSSL(Integer locPort) { - WANTestBase test = new WANTestBase(); - - boolean gatewaySslenabled = true; - String gatewaySslprotocols = "any"; - String gatewaySslciphers = "any"; - boolean gatewaySslRequireAuth = true; - - Properties gemFireProps = test.getDistributedSystemProperties(); - gemFireProps.put(LOG_LEVEL, LogWriterUtils.getDUnitLogLevel()); - gemFireProps.put(GATEWAY_SSL_ENABLED, String.valueOf(gatewaySslenabled)); - gemFireProps.put(GATEWAY_SSL_PROTOCOLS, gatewaySslprotocols); - gemFireProps.put(GATEWAY_SSL_CIPHERS, gatewaySslciphers); - gemFireProps.put(GATEWAY_SSL_REQUIRE_AUTHENTICATION, String.valueOf(gatewaySslRequireAuth)); - - gemFireProps.put(GATEWAY_SSL_KEYSTORE_TYPE, "jks"); - gemFireProps.put(GATEWAY_SSL_KEYSTORE, - TestUtil.getResourcePath(WANTestBase.class, "/com/gemstone/gemfire/cache/client/internal/client.keystore")); - gemFireProps.put(GATEWAY_SSL_KEYSTORE_PASSWORD, "password"); - gemFireProps.put(GATEWAY_SSL_TRUSTSTORE, - TestUtil.getResourcePath(WANTestBase.class, "/com/gemstone/gemfire/cache/client/internal/client.truststore")); - gemFireProps.put(GATEWAY_SSL_TRUSTSTORE_PASSWORD, "password"); - - gemFireProps.setProperty(MCAST_PORT, "0"); - gemFireProps.setProperty(LOCATORS, "localhost[" + locPort + "]"); - - LogWriterUtils.getLogWriter().info("Starting cache ds with following properties \n" + gemFireProps); - - InternalDistributedSystem ds = test.getSystem(gemFireProps); - cache = CacheFactory.create(ds); - } - - public static void createCache_PDX(Integer locPort){ - WANTestBase test = new WANTestBase(); - Properties props = test.getDistributedSystemProperties(); - props.setProperty(MCAST_PORT, "0"); - props.setProperty(LOCATORS, "localhost[" + locPort + "]"); - InternalDistributedSystem ds = test.getSystem(props); - CacheConfig cacheConfig = new CacheConfig(); - cacheConfig.setPdxPersistent(true); - cacheConfig.setPdxDiskStore("PDX_TEST"); - cache = GemFireCacheImpl.create(ds, false, cacheConfig); - - File pdxDir = new File(CacheTestCase.getDiskDir(), "pdx"); - DiskStoreFactory dsf = cache.createDiskStoreFactory(); - File [] dirs1 = new File[] {pdxDir}; - dsf.setDiskDirs(dirs1).setMaxOplogSize(1).create("PDX_TEST"); - } - - public static void createCache(Integer locPort1, Integer locPort2){ - WANTestBase test = new WANTestBase(); - Properties props = test.getDistributedSystemProperties(); - props.setProperty(MCAST_PORT, "0"); - props.setProperty(LOCATORS, "localhost[" + locPort1 - + "],localhost[" + locPort2 + "]"); - InternalDistributedSystem ds = test.getSystem(props); - cache = CacheFactory.create(ds); - } - - public static void createCacheWithoutLocator(Integer mCastPort){ - WANTestBase test = new WANTestBase(); - Properties props = test.getDistributedSystemProperties(); - props.setProperty(MCAST_PORT, "" + mCastPort); - InternalDistributedSystem ds = test.getSystem(props); - cache = CacheFactory.create(ds); - } - - /** - * Method that creates a bridge server - * - * @return Integer Port on which the server is started. - */ - public static Integer createCacheServer() { - CacheServer server1 = cache.addCacheServer(); - assertNotNull(server1); - server1.setPort(0); - try { - server1.start(); - } - catch (IOException e) { - com.gemstone.gemfire.test.dunit.Assert.fail("Failed to start the Server", e); - } - assertTrue(server1.isRunning()); - - return new Integer(server1.getPort()); - } - - /** - * Returns a Map that contains the count for number of bridge server and number - * of Receivers. - * - * @return Map - */ - public static Map getCacheServers() { - List cacheServers = cache.getCacheServers(); - - Map cacheServersMap = new HashMap(); - Iterator itr = cacheServers.iterator(); - int bridgeServerCounter = 0; - int receiverServerCounter = 0; - while (itr.hasNext()) { - CacheServerImpl cacheServer = (CacheServerImpl) itr.next(); - if (cacheServer.getAcceptor().isGatewayReceiver()) { - receiverServerCounter++; - } else { - bridgeServerCounter++; - } - } - cacheServersMap.put("BridgeServer", bridgeServerCounter); - cacheServersMap.put("ReceiverServer", receiverServerCounter); - return cacheServersMap; - } - - public static void startSenderInVMs(String senderId, VM... vms) { - for (VM vm : vms) { - vm.invoke(() -> startSender(senderId)); - } - } - - public static void startSenderInVMsAsync(String senderId, VM... vms) { - List<AsyncInvocation> tasks = new LinkedList<>(); - for (VM vm : vms) { - tasks.add(vm.invokeAsync(() -> startSender(senderId))); - } - for (AsyncInvocation invocation : tasks) { - try { - invocation.join(30000); // TODO: these might be AsyncInvocation orphans - } - catch (InterruptedException e) { - fail("Starting senders was interrupted"); - } - } - } - - - public static void startSender(String senderId) { - final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); - - IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class - .getName()); - IgnoredException exp1 = IgnoredException.addIgnoredException(InterruptedException.class - .getName()); - try { - Set<GatewaySender> senders = cache.getGatewaySenders(); - GatewaySender sender = null; - for (GatewaySender s : senders) { - if (s.getId().equals(senderId)) { - sender = s; - break; - } - } - sender.start(); - } finally { - exp.remove(); - exp1.remove(); - exln.remove(); - } - - } - - public static void enableConflation(String senderId) { - Set<GatewaySender> senders = cache.getGatewaySenders(); - AbstractGatewaySender sender = null; - for (GatewaySender s : senders) { - if (s.getId().equals(senderId)) { - sender = (AbstractGatewaySender)s; - break; - } - } - sender.test_setBatchConflationEnabled(true); - } - - public static Map getSenderToReceiverConnectionInfo(String senderId){ - Set<GatewaySender> senders = cache.getGatewaySenders(); - GatewaySender sender = null; - for(GatewaySender s : senders){ - if(s.getId().equals(senderId)){ - sender = s; - break; - } - } - Map connectionInfo = null; - if (!sender.isParallel() && ((AbstractGatewaySender) sender).isPrimary()) { - connectionInfo = new HashMap(); - GatewaySenderEventDispatcher dispatcher = - ((AbstractGatewaySender)sender).getEventProcessor().getDispatcher(); - if (dispatcher instanceof GatewaySenderEventRemoteDispatcher) { - ServerLocation serverLocation = - ((GatewaySenderEventRemoteDispatcher) dispatcher).getConnection(false).getServer(); - connectionInfo.put("serverHost", serverLocation.getHostName()); - connectionInfo.put("serverPort", serverLocation.getPort()); - - } - } - return connectionInfo; - } - public static List<Integer> getSenderStats(String senderId, final int expectedQueueSize){ - Set<GatewaySender> senders = cache.getGatewaySenders(); - AbstractGatewaySender sender = null; - for (GatewaySender s : senders) { - if (s.getId().equals(senderId)) { - sender = (AbstractGatewaySender)s; - break; - } - } - final GatewaySenderStats statistics = sender.getStatistics(); - if (expectedQueueSize != -1) { - final RegionQueue regionQueue; - regionQueue = sender.getQueues().toArray( - new RegionQueue[1])[0]; - Awaitility.await().atMost(120,TimeUnit.SECONDS).until(() -> assertEquals("Expected queue entries: " + - expectedQueueSize + " but actual entries: " + regionQueue.size(), expectedQueueSize,regionQueue.size())); - } - ArrayList<Integer> stats = new ArrayList<Integer>(); - stats.add(statistics.getEventQueueSize()); - stats.add(statistics.getEventsReceived()); - stats.add(statistics.getEventsQueued()); - stats.add(statistics.getEventsDistributed()); - stats.add(statistics.getBatchesDistributed()); - stats.add(statistics.getBatchesRedistributed()); - stats.add(statistics.getEventsFiltered()); - stats.add(statistics.getEventsNotQueuedConflated()); - stats.add(statistics.getEventsConflatedFromBatches()); - return stats; - } - - public static void checkQueueStats(String senderId, final int queueSize, - final int eventsReceived, final int eventsQueued, - final int eventsDistributed) { - Set<GatewaySender> senders = cache.getGatewaySenders(); - GatewaySender sender = null; - for (GatewaySender s : senders) { - if (s.getId().equals(senderId)) { - sender = s; - break; - } - } - - final GatewaySenderStats statistics = ((AbstractGatewaySender)sender).getStatistics(); - assertEquals(queueSize, statistics.getEventQueueSize()); - assertEquals(eventsReceived, statistics.getEventsReceived()); - assertEquals(eventsQueued, statistics.getEventsQueued()); - assert(statistics.getEventsDistributed() >= eventsDistributed); - } - - public static void checkGatewayReceiverStats(int processBatches, - int eventsReceived, int creates) { - Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers(); - GatewayReceiver receiver = gatewayReceivers.iterator().next(); - CacheServerStats stats = ((CacheServerImpl)receiver.getServer()) - .getAcceptor().getStats(); - - assertTrue(stats instanceof GatewayReceiverStats); - GatewayReceiverStats gatewayReceiverStats = (GatewayReceiverStats)stats; - assertTrue(gatewayReceiverStats.getProcessBatchRequests() >= processBatches); - assertEquals(eventsReceived, gatewayReceiverStats.getEventsReceived()); - assertEquals(creates, gatewayReceiverStats.getCreateRequest()); - } - - public static void checkMinimumGatewayReceiverStats(int processBatches, - int eventsReceived) { - Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers(); - GatewayReceiver receiver = gatewayReceivers.iterator().next(); - CacheServerStats stats = ((CacheServerImpl)receiver.getServer()) - .getAcceptor().getStats(); - - assertTrue(stats instanceof GatewayReceiverStats); - GatewayReceiverStats gatewayReceiverStats = (GatewayReceiverStats)stats; - assertTrue(gatewayReceiverStats.getProcessBatchRequests() >= processBatches); - assertTrue(gatewayReceiverStats.getEventsReceived()>= eventsReceived); - } - - public static void checkExceptionStats(int exceptionsOccurred) { - Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers(); - GatewayReceiver receiver = gatewayReceivers.iterator().next(); - CacheServerStats stats = ((CacheServerImpl)receiver.getServer()) - .getAcceptor().getStats(); - - assertTrue(stats instanceof GatewayReceiverStats); - GatewayReceiverStats gatewayReceiverStats = (GatewayReceiverStats)stats; - if (exceptionsOccurred == 0) { - assertEquals(exceptionsOccurred, gatewayReceiverStats - .getExceptionsOccured()); - } - else { - assertTrue(gatewayReceiverStats.getExceptionsOccured() >= exceptionsOccurred); - } - } - - public static void checkGatewayReceiverStatsHA(int processBatches, - int eventsReceived, int creates) { - Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers(); - GatewayReceiver receiver = gatewayReceivers.iterator().next(); - CacheServerStats stats = ((CacheServerImpl)receiver.getServer()) - .getAcceptor().getStats(); - - assertTrue(stats instanceof GatewayReceiverStats); - GatewayReceiverStats gatewayReceiverStats = (GatewayReceiverStats)stats; - assertTrue(gatewayReceiverStats.getProcessBatchRequests() >= processBatches); - assertTrue(gatewayReceiverStats.getEventsReceived() >= eventsReceived); - assertTrue(gatewayReceiverStats.getCreateRequest() >= creates); - } - - public static void checkEventFilteredStats(String senderId, final int eventsFiltered) { - Set<GatewaySender> senders = cache.getGatewaySenders(); - GatewaySender sender = null; - for (GatewaySender s : senders) { - if (s.getId().equals(senderId)) { - sender = s; - break; - } - } - final GatewaySenderStats statistics = ((AbstractGatewaySender)sender).getStatistics(); - assertEquals(eventsFiltered, statistics.getEventsFiltered()); - } - - public static void checkConflatedStats(String senderId, final int eventsConflated) { - Set<GatewaySender> senders = cache.getGatewaySenders(); - GatewaySender sender = null; - for (GatewaySender s : senders) { - if (s.getId().equals(senderId)) { - sender = s; - break; - } - } - final GatewaySenderStats statistics = ((AbstractGatewaySender)sender).getStatistics(); - assertEquals(eventsConflated, statistics.getEventsNotQueuedConflated()); - } - - public static void checkStats_Failover(String senderId, - final int eventsReceived) { - Set<GatewaySender> senders = cache.getGatewaySenders(); - GatewaySender sender = null; - for (GatewaySender s : senders) { - if (s.getId().equals(senderId)) { - sender = s; - break; - } - } - final GatewaySenderStats statistics = ((AbstractGatewaySender)sender) - .getStatistics(); - - assertEquals(eventsReceived, statistics.getEventsReceived()); - assertEquals(eventsReceived, (statistics.getEventsQueued() - + statistics.getUnprocessedTokensAddedByPrimary() + statistics - .getUnprocessedEventsRemovedByPrimary())); - } - - public static void checkBatchStats(String senderId, final int batches) { - Set<GatewaySender> senders = cache.getGatewaySenders(); - GatewaySender sender = null; - for (GatewaySender s : senders) { - if (s.getId().equals(senderId)) { - sender = s; - break; - } - } - final GatewaySenderStats statistics = ((AbstractGatewaySender)sender) - .getStatistics(); - assert (statistics.getBatchesDistributed() >= batches); - assertEquals(0, statistics.getBatchesRedistributed()); - } - - public static void checkBatchStats(String senderId, - final boolean batchesDistributed, final boolean batchesRedistributed) { - Set<GatewaySender> senders = cache.getGatewaySenders(); - GatewaySender sender = null; - for (GatewaySender s : senders) { - if (s.getId().equals(senderId)) { - sender = s; - break; - } - } - final GatewaySenderStats statistics = ((AbstractGatewaySender)sender) - .getStatistics(); - assertEquals(batchesDistributed, (statistics.getBatchesDistributed() > 0)); - assertEquals(batchesRedistributed, - (statistics.getBatchesRedistributed() > 0)); - } - - public static void checkUnProcessedStats(String senderId, int events) { - Set<GatewaySender> senders = cache.getGatewaySenders(); - GatewaySender sender = null; - for (GatewaySender s : senders) { - if (s.getId().equals(senderId)) { - sender = s; - break; - } - } - final GatewaySenderStats statistics = ((AbstractGatewaySender)sender) - .getStatistics(); - assertEquals(events, - (statistics.getUnprocessedEventsAddedBySecondary() + statistics - .getUnprocessedTokensRemovedBySecondary())); - assertEquals(events, - (statistics.getUnprocessedEventsRemovedByPrimary() + statistics - .getUnprocessedTokensAddedByPrimary())); - } - - public static void waitForSenderRunningState(String senderId){ - final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); - try { - Set<GatewaySender> senders = cache.getGatewaySenders(); - final GatewaySender sender = getGatewaySenderById(senders, senderId); - Awaitility.await().atMost(300,TimeUnit.SECONDS).until(() -> assertEquals("Expected sender isRunning state to " - + "be true but is false", true, (sender != null && sender.isRunning()))); - } finally { - exln.remove(); - } - } - - public static void waitForSenderToBecomePrimary(String senderId){ - Set<GatewaySender> senders = ((GemFireCacheImpl)cache).getAllGatewaySenders(); - final GatewaySender sender = getGatewaySenderById(senders, senderId); - Awaitility.await().atMost(10,TimeUnit.SECONDS).until(() -> assertEquals("Expected sender primary state to " - + "be true but is false", true, (sender != null && ((AbstractGatewaySender)sender).isPrimary()))); - } - - private static GatewaySender getGatewaySenderById(Set<GatewaySender> senders, String senderId) { - for(GatewaySender s : senders){ - if(s.getId().equals(senderId)){ - return s; - } - } - //if none of the senders matches with the supplied senderid, return null - return null; - } - - public static HashMap checkQueue(){ - HashMap listenerAttrs = new HashMap(); - listenerAttrs.put("Create", listener1.createList); - listenerAttrs.put("Update", listener1.updateList); - listenerAttrs.put("Destroy", listener1.destroyList); - return listenerAttrs; - } - - public static void checkQueueOnSecondary (final Map primaryUpdatesMap){ - final HashMap secondaryUpdatesMap = new HashMap(); - secondaryUpdatesMap.put("Create", listener1.createList); - secondaryUpdatesMap.put("Update", listener1.updateList); - secondaryUpdatesMap.put("Destroy", listener1.destroyList); - - Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> { - secondaryUpdatesMap.put("Create", listener1.createList); - secondaryUpdatesMap.put("Update", listener1.updateList); - secondaryUpdatesMap.put("Destroy", listener1.destroyList); - assertEquals("Expected secondary map to be " + primaryUpdatesMap + " but it is " + secondaryUpdatesMap, - true,secondaryUpdatesMap.equals(primaryUpdatesMap)); - }); - } - - public static HashMap checkQueue2(){ - HashMap listenerAttrs = new HashMap(); - listenerAttrs.put("Create", listener2.createList); - listenerAttrs.put("Update", listener2.updateList); - listenerAttrs.put("Destroy", listener2.destroyList); - return listenerAttrs; - } - - public static HashMap checkPR(String regionName){ - PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName); - QueueListener listener = (QueueListener)region.getCacheListener(); - - HashMap listenerAttrs = new HashMap(); - listenerAttrs.put("Create", listener.createList); - listenerAttrs.put("Update", listener.updateList); - listenerAttrs.put("Destroy", listener.destroyList); - return listenerAttrs; - } - - public static HashMap checkBR(String regionName, int numBuckets){ - PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName); - HashMap listenerAttrs = new HashMap(); - for (int i = 0; i < numBuckets; i++) { - BucketRegion br = region.getBucketRegion(i); - QueueListener listener = (QueueListener)br.getCacheListener(); - listenerAttrs.put("Create"+i, listener.createList); - listenerAttrs.put("Update"+i, listener.updateList); - listenerAttrs.put("Destroy"+i, listener.destroyList); - } - return listenerAttrs; - } - - public static HashMap checkQueue_BR(String senderId, int numBuckets){ - Set<GatewaySender> senders = cache.getGatewaySenders(); - GatewaySender sender = null; - for(GatewaySender s : senders){ - if(s.getId().equals(senderId)){ - sender = s; - break; - } - } - RegionQueue parallelQueue = ((AbstractGatewaySender)sender) - .getQueues().toArray(new RegionQueue[1])[0]; - - PartitionedRegion region = (PartitionedRegion)parallelQueue.getRegion(); - HashMap listenerAttrs = new HashMap(); - for (int i = 0; i < numBuckets; i++) { - BucketRegion br = region.getBucketRegion(i); - if (br != null) { - QueueListener listener = (QueueListener)br.getCacheListener(); - if (listener != null) { - listenerAttrs.put("Create"+i, listener.createList); - listenerAttrs.put("Update"+i, listener.updateList); - listenerAttrs.put("Destroy"+i, listener.destroyList); - } - } - } - return listenerAttrs; - } - - public static void addListenerOnBucketRegion(String regionName, int numBuckets) { - WANTestBase test = new WANTestBase(); - test.addCacheListenerOnBucketRegion(regionName, numBuckets); - } - - private void addCacheListenerOnBucketRegion(String regionName, int numBuckets){ - PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName); - for (int i = 0; i < numBuckets; i++) { - BucketRegion br = region.getBucketRegion(i); - AttributesMutator mutator = br.getAttributesMutator(); - listener1 = new QueueListener(); - mutator.addCacheListener(listener1); - } - } - - public static void addListenerOnQueueBucketRegion(String senderId, int numBuckets) { - WANTestBase test = new WANTestBase(); - test.addCacheListenerOnQueueBucketRegion(senderId, numBuckets); - } - - private void addCacheListenerOnQueueBucketRegion(String senderId, int numBuckets){ - Set<GatewaySender> senders = cache.getGatewaySenders(); - GatewaySender sender = null; - for(GatewaySender s : senders){ - if(s.getId().equals(senderId)){ - sender = s; - break; - } - } - RegionQueue parallelQueue = ((AbstractGatewaySender)sender) - .getQueues().toArray(new RegionQueue[1])[0]; - - PartitionedRegion region = (PartitionedRegion)parallelQueue.getRegion(); - for (int i = 0; i < numBuckets; i++) { - BucketRegion br = region.getBucketRegion(i); - if (br != null) { - AttributesMutator mutator = br.getAttributesMutator(); - CacheListener listener = new QueueListener(); - mutator.addCacheListener(listener); - } - } - - } - - public static void addQueueListener(String senderId, boolean isParallel){ - WANTestBase test = new WANTestBase(); - test.addCacheQueueListener(senderId, isParallel); - } - - public static void addSecondQueueListener(String senderId, boolean isParallel){ - WANTestBase test = new WANTestBase(); - test.addSecondCacheQueueListener(senderId, isParallel); - } - - public static void addListenerOnRegion(String regionName){ - WANTestBase test = new WANTestBase(); - test.addCacheListenerOnRegion(regionName); - } - private void addCacheListenerOnRegion(String regionName){ - Region region = cache.getRegion(regionName); - AttributesMutator mutator = region.getAttributesMutator(); - listener1 = new QueueListener(); - mutator.addCacheListener(listener1); - } - - private void addCacheQueueListener(String senderId, boolean isParallel) { - Set<GatewaySender> senders = cache.getGatewaySenders(); - GatewaySender sender = null; - for(GatewaySender s : senders){ - if(s.getId().equals(senderId)){ - sender = s; - break; - } - } - listener1 = new QueueListener(); - if (!isParallel) { - Set<RegionQueue> queues = ((AbstractGatewaySender)sender).getQueues(); - for(RegionQueue q: queues) { - q.addCacheListener(listener1); - } - } - else { - RegionQueue parallelQueue = ((AbstractGatewaySender)sender) - .getQueues().toArray(new RegionQueue[1])[0]; - parallelQueue.addCacheListener(listener1); - } - } - - private void addSecondCacheQueueListener(String senderId, boolean isParallel) { - Set<GatewaySender> senders = cache.getGatewaySenders(); - GatewaySender sender = null; - for (GatewaySender s : senders) { - if (s.getId().equals(senderId)) { - sender = s; - break; - } - } - listener2 = new QueueListener(); - if (!isParallel) { - Set<RegionQueue> queues = ((AbstractGatewaySender)sender).getQueues(); - for(RegionQueue q: queues) { - q.addCacheListener(listener2); - } - } - else { - RegionQueue parallelQueue = ((AbstractGatewaySender)sender) - .getQueues().toArray(new RegionQueue[1])[0]; - parallelQueue.addCacheListener(listener2); - } - } - - public static void pauseSender(String senderId) { - final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); - IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class - .getName()); - try { - Set<GatewaySender> senders = cache.getGatewaySenders(); - GatewaySender sender = null; - for (GatewaySender s : senders) { - if (s.getId().equals(senderId)) { - sender = s; - break; - } - } - sender.pause(); - ((AbstractGatewaySender) sender).getEventProcessor().waitForDispatcherToPause(); - - } finally { - exp.remove(); - exln.remove(); - } - } - - public static void resumeSender(String senderId) { - final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); - IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class - .getName()); - try { - Set<GatewaySender> senders = cache.getGatewaySenders(); - GatewaySender sender = null; - for (GatewaySender s : senders) { - if (s.getId().equals(senderId)) { - sender = s; - break; - } - } - sender.resume(); - } finally { - exp.remove(); - exln.remove(); - } - } - - public static void stopSender(String senderId) { - final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); - IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class - .getName()); - try { - Set<GatewaySender> senders = cache.getGatewaySenders(); - GatewaySender sender = null; - for (GatewaySender s : senders) { - if (s.getId().equals(senderId)) { - sender = s; - break; - } - } - AbstractGatewaySenderEventProcessor eventProcessor = null; - if (sender instanceof AbstractGatewaySender) { - eventProcessor = ((AbstractGatewaySender) sender).getEventProcessor(); - } - sender.stop(); - - Set<RegionQueue> queues = null; - if (eventProcessor instanceof ConcurrentSerialGatewaySenderEventProcessor) { - queues = ((ConcurrentSerialGatewaySenderEventProcessor)eventProcessor).getQueues(); - for (RegionQueue queue: queues) { - if (queue instanceof SerialGatewaySenderQueue) { - assertFalse(((SerialGatewaySenderQueue) queue).isRemovalThreadAlive()); - } - } - } - } finally { - exp.remove(); - exln.remove(); - } - } - - public static void stopReceivers() { - Set<GatewayReceiver> receivers = cache.getGatewayReceivers(); - for (GatewayReceiver receiver : receivers) { - receiver.stop(); - } - } - - public static void startReceivers() { - Set<GatewayReceiver> receivers = cache.getGatewayReceivers(); - for (GatewayReceiver receiver : receivers) { - try { - receiver.start(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - } - - public static GatewaySenderFactory configureGateway(DiskStoreFactory dsf, File[] dirs1, String dsName, - boolean isParallel, Integer maxMemory, - Integer batchSize, boolean isConflation, boolean isPersistent, - GatewayEventFilter filter, boolean isManualStart, int numDispatchers, OrderPolicy policy) { - - InternalGatewaySenderFactory gateway = (InternalGatewaySenderFactory)cache.createGatewaySenderFactory(); - gateway.setParallel(isParallel); - gateway.setMaximumQueueMemory(maxMemory); - gateway.setBatchSize(batchSize); - gateway.setBatchConflationEnabled(isConflation); - gateway.setManualStart(isManualStart); - gateway.setDispatcherThreads(numDispatchers); - gateway.setOrderPolicy(policy); - gateway.setLocatorDiscoveryCallback(new MyLocatorCallback()); - if (filter != null) { - eventFilter = filter; - gateway.addGatewayEventFilter(filter); - } - if (isPersistent) { - gateway.setPersistenceEnabled(true); - gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName) - .getName()); - } else { - DiskStore store = dsf.setDiskDirs(dirs1).create(dsName); - gateway.setDiskStoreName(store.getName()); - } - return gateway; - } - - public static void createSender(String dsName, int remoteDsId, - boolean isParallel, Integer maxMemory, - Integer batchSize, boolean isConflation, boolean isPersistent, - GatewayEventFilter filter, boolean isManualStart) { - final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); - try { - File persistentDirectory = new File(dsName + "_disk_" - + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); - persistentDirectory.mkdir(); - DiskStoreFactory dsf = cache.createDiskStoreFactory(); - File[] dirs1 = new File[] { persistentDirectory }; - GatewaySenderFactory gateway = configureGateway(dsf, dirs1, dsName, isParallel, maxMemory, batchSize, isConflation, isPersistent, filter, isManualStart, numDispatcherThreadsForTheRun, GatewaySender.DEFAULT_ORDER_POLICY); - gateway.create(dsName, remoteDsId); - - } finally { - exln.remove(); - } - } - - public static void createSenderWithMultipleDispatchers(String dsName, int remoteDsId, - boolean isParallel, Integer maxMemory, - Integer batchSize, boolean isConflation, boolean isPersistent, - GatewayEventFilter filter, boolean isManualStart, int numDispatchers, OrderPolicy orderPolicy) { - final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); - try { - File persistentDirectory = new File(dsName + "_disk_" + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); - persistentDirectory.mkdir(); - DiskStoreFactory dsf = cache.createDiskStoreFactory(); - File[] dirs1 = new File[] { persistentDirectory }; - GatewaySenderFactory gateway = configureGateway(dsf, dirs1, dsName,isParallel, maxMemory, batchSize, isConflation, isPersistent, filter, - isManualStart, numDispatchers, orderPolicy); - gateway.create(dsName, remoteDsId); - - } finally { - exln.remove(); - } - } - - public static void createSenderWithoutDiskStore(String dsName, int remoteDsId, Integer maxMemory, - Integer batchSize, boolean isConflation, boolean isManualStart) { - - GatewaySenderFactory gateway = cache.createGatewaySenderFactory(); - gateway.setParallel(true); - gateway.setMaximumQueueMemory(maxMemory); - gateway.setBatchSize(batchSize); - gateway.setManualStart(isManualStart); - //set dispatcher threads - gateway.setDispatcherThreads(numDispatcherThreadsForTheRun); - gateway.setBatchConflationEnabled(isConflation); - gateway.create(dsName, remoteDsId); - } - - public static void createConcurrentSender(String dsName, int remoteDsId, - boolean isParallel, Integer maxMemory, Integer batchSize, - boolean isConflation, boolean isPersistent, GatewayEventFilter filter, - boolean isManualStart, int concurrencyLevel, OrderPolicy policy) { - - File persistentDirectory = new File(dsName + "_disk_" - + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); - persistentDirectory.mkdir(); - DiskStoreFactory dsf = cache.createDiskStoreFactory(); - File[] dirs1 = new File[] { persistentDirectory }; - GatewaySenderFactory gateway = configureGateway(dsf, dirs1, dsName, isParallel, maxMemory, batchSize, isConflation, isPersistent, filter, isManualStart, concurrencyLevel, policy); - gateway.create(dsName, remoteDsId); - } - - public static void createSenderForValidations(String dsName, int remoteDsId, - boolean isParallel, Integer alertThreshold, - boolean isConflation, boolean isPersistent, - List<GatewayEventFilter> eventFilters, - List<GatewayTransportFilter> transportFilters, boolean isManualStart, - boolean isDiskSync) { - IgnoredException exp1 = IgnoredException.addIgnoredException(RegionDestroyedException.class - .getName()); - try { - File persistentDirectory = new File(dsName + "_disk_" - + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); - persistentDirectory.mkdir(); - DiskStoreFactory dsf = cache.createDiskStoreFactory(); - File[] dirs1 = new File[] { persistentDirectory }; - - if (isParallel) { - GatewaySenderFactory gateway = cache.createGatewaySenderFactory(); - gateway.setParallel(true); - gateway.setAlertThreshold(alertThreshold); - ((InternalGatewaySenderFactory)gateway) - .setLocatorDiscoveryCallback(new MyLocatorCallback()); - if (eventFilters != null) { - for (GatewayEventFilter filter : eventFilters) { - gateway.addGatewayEventFilter(filter); - } - } - if (transportFilters != null) { - for (GatewayTransportFilter filter : transportFilters) { - gateway.addGatewayTransportFilter(filter); - } - } - if (isPersistent) { - gateway.setPersistenceEnabled(true); - gateway.setDiskStoreName(dsf.setDiskDirs(dirs1) - .create(dsName + "_Parallel").getName()); - } - else { - DiskStore store = dsf.setDiskDirs(dirs1).create(dsName + "_Parallel"); - gateway.setDiskStoreName(store.getName()); - } - gateway.setDiskSynchronous(isDiskSync); - gateway.setBatchConflationEnabled(isConflation); - gateway.setManualStart(isManualStart); - //set dispatcher threads - gateway.setDispatcherThreads(numDispatcherThreadsForTheRun); - gateway.create(dsName, remoteDsId); - - } - else { - GatewaySenderFactory gateway = cache.createGatewaySenderFactory(); - gateway.setAlertThreshold(alertThreshold); - gateway.setManualStart(isManualStart); - //set dispatcher threads - gateway.setDispatcherThreads(numDispatcherThreadsForTheRun); - ((InternalGatewaySenderFactory)gateway) - .setLocatorDiscoveryCallback(new MyLocatorCallback()); - if (eventFilters != null) { - for (GatewayEventFilter filter : eventFilters) { - gateway.addGatewayEventFilter(filter); - } - } - if (transportFilters != null) { - for (GatewayTransportFilter filter : transportFilters) { - gateway.addGatewayTransportFilter(filter); - } - } - gateway.setBatchConflationEnabled(isConflation); - if (isPersistent) { - gateway.setPersistenceEnabled(true); - gateway.setDiskStoreName(dsf.setDiskDirs(dirs1) - .create(dsName + "_Serial").getName()); - } - else { - DiskStore store = dsf.setDiskDirs(dirs1).create(dsName + "_Serial"); - gateway.setDiskStoreName(store.getName()); - } - gateway.setDiskSynchronous(isDiskSync); - gateway.create(dsName, remoteDsId); - } - } finally { - exp1.remove(); - } - } - - public static String createSenderWithDiskStore(String dsName, int remoteDsId, - boolean isParallel, Integer maxMemory, - Integer batchSize, boolean isConflation, boolean isPersistent, - GatewayEventFilter filter, String dsStore, boolean isManualStart) { - File persistentDirectory = null; - if (dsStore == null) { - persistentDirectory = new File(dsName + "_disk_" - + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); - } - else { - persistentDirectory = new File(dsStore); - } - LogWriterUtils.getLogWriter().info("The ds is : " + persistentDirectory.getName()); - - persistentDirectory.mkdir(); - DiskStoreFactory dsf = cache.createDiskStoreFactory(); - File [] dirs1 = new File[] {persistentDirectory}; - - if(isParallel) { - GatewaySenderFactory gateway = cache.createGatewaySenderFactory(); - gateway.setParallel(true); - gateway.setMaximumQueueMemory(maxMemory); - gateway.setBatchSize(batchSize); - gateway.setManualStart(isManualStart); - //set dispatcher threads - gateway.setDispatcherThreads(numDispatcherThreadsForTheRun); - ((InternalGatewaySenderFactory)gateway).setLocatorDiscoveryCallback(new MyLocatorCallback()); - if (filter != null) { - gateway.addGatewayEventFilter(filter); - } - if(isPersistent) { - gateway.setPersistenceEnabled(true); - String dsname = dsf.setDiskDirs(dirs1).create(dsName).getName(); - gateway.setDiskStoreName(dsname); - LogWriterUtils.getLogWriter().info("The DiskStoreName is : " + dsname); - } - else { - DiskStore store = dsf.setDiskDirs(dirs1).create(dsName); - gateway.setDiskStoreName(store.getName()); - LogWriterUtils.getLogWriter().info("The ds is : " + store.getName()); - } - gateway.setBatchConflationEnabled(isConflation); - gateway.create(dsName, remoteDsId); - - }else { - GatewaySenderFactory gateway = cache.createGatewaySenderFactory(); - gateway.setMaximumQueueMemory(maxMemory); - gateway.setBatchSize(batchSize); - gateway.setManualStart(isManualStart); - //set dispatcher threads - gateway.setDispatcherThreads(numDispatcherThreadsForTheRun); - ((InternalGatewaySenderFactory)gateway).setLocatorDiscoveryCallback(new MyLocatorCallback()); - if (filter != null) { - gateway.addGatewayEventFilter(filter); - } - gateway.setBatchConflationEnabled(isConflation); - if(isPersistent) { - gateway.setPersistenceEnabled(true); - gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName()); - } - else { - DiskStore store = dsf.setDiskDirs(dirs1).create(dsName); - - gateway.setDiskStoreName(store.getName()); - } - gateway.create(dsName, remoteDsId); - } - return persistentDirectory.getName(); - } - - - public static void createSenderWithListener(String dsName, int remoteDsName, - boolean isParallel, Integer maxMemory, - Integer batchSize, boolean isConflation, boolean isPersistent, - GatewayEventFilter filter, boolean attachTwoListeners, boolean isManualStart) { - File persistentDirectory = new File(dsName + "_disk_" - + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); - persistentDirectory.mkdir(); - DiskStoreFactory dsf = cache.createDiskStoreFactory(); - File[] dirs1 = new File[] { persistentDirectory }; - - if (isParallel) { - GatewaySenderFactory gateway = cache - .createGatewaySenderFactory(); - gateway.setParallel(true); - gateway.setMaximumQueueMemory(maxMemory); - gateway.setBatchSize(batchSize); - gateway.setManualStart(isManualStart); - //set dispatcher threads - gateway.setDispatcherThreads(numDispatcherThreadsForTheRun); - ((InternalGatewaySenderFactory)gateway) - .setLocatorDiscoveryCallback(new MyLocatorCallback()); - if (filter != null) { - gateway.addGatewayEventFilter(filter); - } - if (isPersistent) { - gateway.setPersistenceEnabled(true); - gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName) - .getName()); - } else { - DiskStore store = dsf.setDiskDirs(dirs1).create(dsName); - gateway.setDiskStoreName(store.getName()); - } - gateway.setBatchConflationEnabled(isConflation); - gateway.create(dsName, remoteDsName); - - } else { - GatewaySenderFactory gateway = cache - .createGatewaySenderFactory(); - gateway.setMaximumQueueMemory(maxMemory); - gateway.setBatchSize(batchSize); - gateway.setManualStart(isManualStart); - //set dispatcher threads - gateway.setDispatcherThreads(numDispatcherThreadsForTheRun); - ((InternalGatewaySenderFactory)gateway) - .setLocatorDiscoveryCallback(new MyLocatorCallback()); - if (filter != null) { - gateway.addGatewayEventFilter(filter); - } - gateway.setBatchConflationEnabled(isConflation); - if (isPersistent) { - gateway.setPersistenceEnabled(true); - gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName) - .getName()); - } else { - DiskStore store = dsf.setDiskDirs(dirs1).create(dsName); - gateway.setDiskStoreName(store.getName()); - } - - eventListener1 = new MyGatewaySenderEventListener(); - ((InternalGatewaySenderFactory)gateway).addAsyncEventListener(eventListener1); - if (attachTwoListeners) { - eventListener2 = new MyGatewaySenderEventListener2(); - ((InternalGatewaySenderFactory)gateway).addAsyncEventListener(eventListener2); - } - ((InternalGatewaySenderFactory)gateway).create(dsName); - } - } - - public static void createReceiverInVMs(VM... vms) { - for (VM vm : vms) { - vm.invoke(() -> createReceiver()); - } - } - - public static int createReceiver() { - GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); - int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); - fact.setStartPort(port); - fact.setEndPort(port); - fact.setManualStart(true); - GatewayReceiver receiver = fact.create(); - try { - receiver.start(); - } - catch (IOException e) { - e.printStackTrace(); - Assert.fail("Test " + getTestMethodName() - + " failed to start GatewayReceiver on port " + port, e); - } - return port; - } - - public static void createReceiverWithBindAddress(int locPort) { - WANTestBase test = new WANTestBase(); - Properties props = test.getDistributedSystemProperties(); - props.setProperty(MCAST_PORT, "0"); - props.setProperty(LOG_LEVEL, LogWriterUtils.getDUnitLogLevel()); - props.setProperty(LOCATORS, "localhost[" + locPort - + "]"); - - InternalDistributedSystem ds = test.getSystem(props); - cache = CacheFactory.create(ds); - GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); - int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); - fact.setStartPort(port); - fact.setEndPort(port); - fact.setManualStart(true); - fact.setBindAddress("200.112.204.10"); - GatewayReceiver receiver = fact.create(); - try { - receiver.start(); - fail("Expected GatewayReceiver Exception"); - } - catch (GatewayReceiverException gRE){ - LogWriterUtils.getLogWriter().fine("Got the GatewayReceiverException", gRE); - assertTrue(gRE.getMessage().contains("Failed to create server socket on")); - } - catch (IOException e) { - e.printStackTrace(); - fail("Test " + test.getName() - + " failed to start GatewayReceiver on port " + port); - } - } - public static int createReceiverWithSSL(int locPort) { - WANTestBase test = new WANTestBase(); - - Properties gemFireProps = test.getDistributedSystemProperties(); - - gemFireProps.put(LOG_LEVEL, LogWriterUtils.getDUnitLogLevel()); - gemFireProps.put(GATEWAY_SSL_ENABLED, "true"); - gemFireProps.put(GATEWAY_SSL_PROTOCOLS, "any"); - gemFireProps.put(GATEWAY_SSL_CIPHERS, "any"); - gemFireProps.put(GATEWAY_SSL_REQUIRE_AUTHENTICATION, "true"); - - gemFireProps.put(GATEWAY_SSL_KEYSTORE_TYPE, "jks"); - gemFireProps.put(GATEWAY_SSL_KEYSTORE, - TestUtil.getResourcePath(WANTestBase.class, "/com/gemstone/gemfire/cache/client/internal/cacheserver.keystore")); - gemFireProps.put(GATEWAY_SSL_KEYSTORE_PASSWORD, "password"); - gemFireProps.put(GATEWAY_SSL_TRUSTSTORE, - TestUtil.getResourcePath(WANTestBase.class, "/com/gemstone/gemfire/cache/client/internal/cacheserver.truststore")); - gemFireProps.put(GATEWAY_SSL_TRUSTSTORE_PASSWORD, "password"); - - gemFireProps.setProperty(MCAST_PORT, "0"); - gemFireProps.setProperty(LOCATORS, "localhost[" + locPort + "]"); - - LogWriterUtils.getLogWriter().info("Starting cache ds with following properties \n" + gemFireProps); - - InternalDistributedSystem ds = test.getSystem(gemFireProps); - cache = CacheFactory.create(ds); - GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); - int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); - fact.setStartPort(port); - fact.setEndPort(port); - fact.setManualStart(true); - GatewayReceiver receiver = fact.create(); - try { - receiver.start(); - } - catch (IOException e) { - e.printStackTrace(); - fail("Test " + test.getName() - + " failed to start GatewayReceiver on port " + port); - } - return port; - } - - public static void createReceiverAndServer(int locPort) { - WANTestBase test = new WANTestBase(); - Properties props = test.getDistributedSystemProperties(); - props.setProperty(MCAST_PORT, "0"); - props.setProperty(LOCATORS, "localhost[" + locPort - + "]"); - - InternalDistributedSystem ds = test.getSystem(props); - cache = CacheFactory.create(ds); - GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); - int receiverPort = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); - fact.setStartPort(receiverPort); - fact.setEndPort(receiverPort); - fact.setManualStart(true); - GatewayReceiver receiver = fact.create(); - try { - receiver.start(); - } - catch (IOException e) { - e.printStackTrace(); - fail("Test " + test.getName() - + " failed to start GatewayReceiver on port " + receiverPort); - } - CacheServer server = cache.addCacheServer(); - int serverPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); - server.setPort(serverPort); - server.setHostnameForClients("localhost"); - try { - server.start(); - } catch (IOException e) { - com.gemstone.gemfire.test.dunit.Assert.fail("Failed to start server ", e); - } - } - - public static int createServer(int locPort) { - WANTestBase test = new WANTestBase(); - Properties props = test.getDistributedSystemProperties(); - props.setProperty(MCAST_PORT, "0"); - props.setProperty(LOCATORS, "localhost[" + locPort - + "]"); - InternalDistributedSystem ds = test.getSystem(props); - cache = CacheFactory.create(ds); - - CacheServer server = cache.addCacheServer(); - int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); - server.setPort(port); - server.setHostnameForClients("localhost"); - try { - server.start(); - } catch (IOException e) { - com.gemstone.gemfire.test.dunit.Assert.fail("Failed to start server ", e); - } - return port; - } - - public static void createClientWithLocator(int port0,String host, - String regionName) { - WANTestBase test = new WANTestBase(); - Properties props = test.getDistributedSystemProperties(); - props.setProperty(MCAST_PORT, "0"); - props.setProperty(LOCATORS, ""); - - InternalDistributedSystem ds = test.getSystem(props); - cache = CacheFactory.create(ds); - - assertNotNull(cache); - CacheServerTestUtil.disableShufflingOfEndpoints(); - Pool p; - try { - p = PoolManager.createFactory().addLocator(host, port0) - .setPingInterval(250).setSubscriptionEnabled(true) - .setSubscriptionRedundancy(-1).setReadTimeout(2000) - .setSocketBufferSize(1000).setMinConnections(6).setMaxConnections(10) - .setRetryAttempts(3).create(regionName); - } finally { - CacheServerTestUtil.enableShufflingOfEndpoints(); - } - - AttributesFactory factory = new AttributesFactory(); - factory.setPoolName(p.getName()); - factory.setDataPolicy(DataPolicy.NORMAL); - RegionAttributes attrs = factory.create(); - region = cache.createRegion(regionName, attrs); - region.registerInterest("ALL_KEYS"); - assertNotNull(region); - LogWriterUtils.getLogW
<TRUNCATED>
