http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java new file mode 100644 index 0000000..6727f1b --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java @@ -0,0 +1,3765 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan; + +import static com.gemstone.gemfire.distributed.ConfigurationProperties.*; +import static org.junit.Assert.*; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.StringTokenizer; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import com.jayway.awaitility.Awaitility; +import org.junit.experimental.categories.Category; + +import com.gemstone.gemfire.cache.AttributesFactory; +import com.gemstone.gemfire.cache.AttributesMutator; +import com.gemstone.gemfire.cache.Cache; +import com.gemstone.gemfire.cache.CacheClosedException; +import com.gemstone.gemfire.cache.CacheFactory; +import com.gemstone.gemfire.cache.CacheListener; +import com.gemstone.gemfire.cache.CacheTransactionManager; +import com.gemstone.gemfire.cache.DataPolicy; +import com.gemstone.gemfire.cache.DiskStore; +import com.gemstone.gemfire.cache.DiskStoreFactory; +import com.gemstone.gemfire.cache.EntryEvent; +import com.gemstone.gemfire.cache.PartitionAttributesFactory; +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.cache.RegionAttributes; +import com.gemstone.gemfire.cache.RegionDestroyedException; +import com.gemstone.gemfire.cache.RegionFactory; +import com.gemstone.gemfire.cache.Scope; +import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener; +import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue; +import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueueFactory; +import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl; +import com.gemstone.gemfire.cache.client.Pool; +import com.gemstone.gemfire.cache.client.PoolManager; +import com.gemstone.gemfire.cache.client.internal.LocatorDiscoveryCallbackAdapter; +import com.gemstone.gemfire.cache.client.internal.locator.wan.LocatorMembershipListener; +import com.gemstone.gemfire.cache.persistence.PartitionOfflineException; +import com.gemstone.gemfire.cache.server.CacheServer; +import com.gemstone.gemfire.cache.util.CacheListenerAdapter; +import com.gemstone.gemfire.cache.wan.GatewayEventFilter; +import com.gemstone.gemfire.cache.wan.GatewayQueueEvent; +import com.gemstone.gemfire.cache.wan.GatewayReceiver; +import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory; +import com.gemstone.gemfire.cache.wan.GatewaySender; +import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy; +import com.gemstone.gemfire.cache.wan.GatewaySenderFactory; +import com.gemstone.gemfire.cache.wan.GatewayTransportFilter; +import com.gemstone.gemfire.cache30.CacheTestCase; +import com.gemstone.gemfire.distributed.Locator; +import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; +import com.gemstone.gemfire.distributed.internal.InternalLocator; +import com.gemstone.gemfire.distributed.internal.ServerLocation; +import com.gemstone.gemfire.internal.AvailablePort; +import com.gemstone.gemfire.internal.AvailablePortHelper; +import com.gemstone.gemfire.internal.FileUtil; +import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId; +import com.gemstone.gemfire.internal.cache.BucketRegion; +import com.gemstone.gemfire.internal.cache.CacheConfig; +import com.gemstone.gemfire.internal.cache.CacheServerImpl; +import com.gemstone.gemfire.internal.cache.CustomerIDPartitionResolver; +import com.gemstone.gemfire.internal.cache.ForceReattemptException; +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; +import com.gemstone.gemfire.internal.cache.LocalRegion; +import com.gemstone.gemfire.internal.cache.PartitionedRegion; +import com.gemstone.gemfire.internal.cache.RegionQueue; +import com.gemstone.gemfire.internal.cache.execute.data.CustId; +import com.gemstone.gemfire.internal.cache.execute.data.Customer; +import com.gemstone.gemfire.internal.cache.execute.data.Order; +import com.gemstone.gemfire.internal.cache.execute.data.OrderId; +import com.gemstone.gemfire.internal.cache.execute.data.Shipment; +import com.gemstone.gemfire.internal.cache.execute.data.ShipmentId; +import com.gemstone.gemfire.internal.cache.partitioned.PRLocallyDestroyedException; +import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerStats; +import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerTestUtil; +import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderEventProcessor; +import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue; +import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderEventProcessor; +import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue; +import com.gemstone.gemfire.internal.cache.wan.serial.ConcurrentSerialGatewaySenderEventProcessor; +import com.gemstone.gemfire.internal.cache.wan.serial.SerialGatewaySenderQueue; +import com.gemstone.gemfire.pdx.SimpleClass; +import com.gemstone.gemfire.pdx.SimpleClass1; +import com.gemstone.gemfire.test.dunit.Assert; +import com.gemstone.gemfire.test.dunit.AsyncInvocation; +import com.gemstone.gemfire.test.dunit.Host; +import com.gemstone.gemfire.test.dunit.IgnoredException; +import com.gemstone.gemfire.test.dunit.Invoke; +import com.gemstone.gemfire.test.dunit.LogWriterUtils; +import com.gemstone.gemfire.test.dunit.VM; +import com.gemstone.gemfire.test.dunit.Wait; +import com.gemstone.gemfire.test.dunit.WaitCriterion; +import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; +import com.gemstone.gemfire.util.test.TestUtil; + +@Category(DistributedTest.class) +public class WANTestBase extends JUnit4DistributedTestCase { + + protected static Cache cache; + protected static Region region; + + protected static PartitionedRegion customerRegion; + protected static PartitionedRegion orderRegion; + protected static PartitionedRegion shipmentRegion; + + protected static final String customerRegionName = "CUSTOMER"; + protected static final String orderRegionName = "ORDER"; + protected static final String shipmentRegionName = "SHIPMENT"; + + protected static VM vm0; + protected static VM vm1; + protected static VM vm2; + protected static VM vm3; + protected static VM vm4; + protected static VM vm5; + protected static VM vm6; + protected static VM vm7; + + protected static QueueListener listener1; + protected static QueueListener listener2; + + protected static AsyncEventListener eventListener1 ; + protected static AsyncEventListener eventListener2 ; + + private static final long MAX_WAIT = 10000; + + protected static GatewayEventFilter eventFilter; + + protected static List<Integer> dispatcherThreads = + new ArrayList<Integer>(Arrays.asList(1, 3, 5)); + //this will be set for each test method run with one of the values from above list + protected static int numDispatcherThreadsForTheRun = 1; + + public WANTestBase() { + super(); + } + + /** + * @deprecated Use the no arg constructor, or better yet, don't construct this class + */ + @Deprecated + public WANTestBase(final String ignored) { + } + + @Override + public final void preSetUp() throws Exception { + final Host host = Host.getHost(0); + vm0 = host.getVM(0); + vm1 = host.getVM(1); + vm2 = host.getVM(2); + vm3 = host.getVM(3); + vm4 = host.getVM(4); + vm5 = host.getVM(5); + vm6 = host.getVM(6); + vm7 = host.getVM(7); + //Need to set the test name after the VMs are created + } + + @Override + public final void postSetUp() throws Exception { + //this is done to vary the number of dispatchers for sender + //during every test method run + shuffleNumDispatcherThreads(); + Invoke.invokeInEveryVM(() -> setNumDispatcherThreadsForTheRun(dispatcherThreads.get(0))); + IgnoredException.addIgnoredException("Connection refused"); + IgnoredException.addIgnoredException("Software caused connection abort"); + IgnoredException.addIgnoredException("Connection reset"); + postSetUpWANTestBase(); + } + + protected void postSetUpWANTestBase() throws Exception { + } + + public static void shuffleNumDispatcherThreads() { + Collections.shuffle(dispatcherThreads); + } + + public static void setNumDispatcherThreadsForTheRun(int numThreads) { + numDispatcherThreadsForTheRun = numThreads; + } + + public static void stopOldLocator() { + if (Locator.hasLocator()) { + Locator.getLocator().stop(); + } + } + + public static void createLocator(int dsId, int port, Set<String> localLocatorsList, Set<String> remoteLocatorsList){ + WANTestBase test = new WANTestBase(); + Properties props = test.getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId); + StringBuffer localLocatorBuffer = new StringBuffer(localLocatorsList.toString()); + localLocatorBuffer.deleteCharAt(0); + localLocatorBuffer.deleteCharAt(localLocatorBuffer.lastIndexOf("]")); + String localLocator = localLocatorBuffer.toString(); + localLocator = localLocator.replace(" ", ""); + + props.setProperty(LOCATORS, localLocator); + props.setProperty(START_LOCATOR, "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost"); + StringBuffer remoteLocatorBuffer = new StringBuffer(remoteLocatorsList.toString()); + remoteLocatorBuffer.deleteCharAt(0); + remoteLocatorBuffer.deleteCharAt(remoteLocatorBuffer.lastIndexOf("]")); + String remoteLocator = remoteLocatorBuffer.toString(); + remoteLocator = remoteLocator.replace(" ", ""); + props.setProperty(REMOTE_LOCATORS, remoteLocator); + test.getSystem(props); + } + + public static Integer createFirstLocatorWithDSId(int dsId) { + stopOldLocator(); + WANTestBase test = new WANTestBase(); + int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + Properties props = test.getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId); + props.setProperty(LOCATORS, "localhost[" + port + "]"); + props.setProperty(START_LOCATOR, "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost"); + test.getSystem(props); + return port; + } + + public static Integer createFirstPeerLocator(int dsId) { + stopOldLocator(); + WANTestBase test = new WANTestBase(); + int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + Properties props = test.getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId); + props.setProperty(LOCATORS, "localhost[" + port + "]"); + props.setProperty(START_LOCATOR, "localhost[" + port + "],server=false,peer=true,hostname-for-clients=localhost"); + test.getSystem(props); + return port; + } + + public static Integer createSecondLocator(int dsId, int locatorPort) { + stopOldLocator(); + WANTestBase test = new WANTestBase(); + int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + Properties props = test.getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId); + props.setProperty(LOCATORS, "localhost[" + locatorPort + "]"); + props.setProperty(START_LOCATOR, "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost"); + test.getSystem(props); + return port; + } + + public static Integer createSecondPeerLocator(int dsId, int locatorPort) { + stopOldLocator(); + WANTestBase test = new WANTestBase(); + int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + Properties props = test.getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId); + props.setProperty(LOCATORS, "localhost[" + locatorPort + "]"); + props.setProperty(START_LOCATOR, "localhost[" + port + "],server=false,peer=true,hostname-for-clients=localhost"); + test.getSystem(props); + return port; + } + + public static Integer createFirstRemoteLocator(int dsId, int remoteLocPort) { + stopOldLocator(); + WANTestBase test = new WANTestBase(); + int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + Properties props = test.getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId); + props.setProperty(LOCATORS, "localhost[" + port + "]"); + props.setProperty(START_LOCATOR, "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost"); + props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]"); + test.getSystem(props); + return port; + } + + public static void bringBackLocatorOnOldPort(int dsId, int remoteLocPort, int oldPort) { + WANTestBase test = new WANTestBase(); + Properties props = test.getDistributedSystemProperties(); + props.put(LOG_LEVEL, "fine"); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId); + props.setProperty(LOCATORS, "localhost[" + oldPort + "]"); + props.setProperty(START_LOCATOR, "localhost[" + oldPort + "],server=true,peer=true,hostname-for-clients=localhost"); + props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]"); + test.getSystem(props); + } + + + public static Integer createFirstRemotePeerLocator(int dsId, int remoteLocPort) { + stopOldLocator(); + WANTestBase test = new WANTestBase(); + int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + Properties props = test.getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId); + props.setProperty(LOCATORS, "localhost[" + port + "]"); + props.setProperty(START_LOCATOR, "localhost[" + port + "],server=false,peer=true,hostname-for-clients=localhost"); + props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]"); + test.getSystem(props); + return port; + } + + public static Integer createSecondRemoteLocator(int dsId, int localPort, + int remoteLocPort) { + stopOldLocator(); + WANTestBase test = new WANTestBase(); + int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + Properties props = test.getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId); + props.setProperty(LOCATORS, "localhost[" + localPort + "]"); + props.setProperty(START_LOCATOR, "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost"); + props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]"); + test.getSystem(props); + return port; + } + + public static Integer createSecondRemoteLocatorWithAPI(int dsId, int localPort, + int remoteLocPort, String hostnameForClients) + throws IOException + { + stopOldLocator(); + WANTestBase test = new WANTestBase(); + int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + Properties props = test.getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId); + props.setProperty(LOCATORS, "localhost[" + localPort + "]"); + props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]"); + Locator locator = Locator.startLocatorAndDS(0, null, InetAddress.getByName("localhost"), props, true, true, hostnameForClients); + return locator.getPort(); + } + + public static Integer createSecondRemotePeerLocator(int dsId, int localPort, + int remoteLocPort) { + stopOldLocator(); + WANTestBase test = new WANTestBase(); + int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + Properties props = test.getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId); + props.setProperty(LOCATORS, "localhost[" + localPort + "]"); + props.setProperty(START_LOCATOR, "localhost[" + port + "],server=false,peer=true,hostname-for-clients=localhost"); + props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]"); + test.getSystem(props); + return port; + } + + public static int createReceiverInSecuredCache() { + GatewayReceiverFactory fact = WANTestBase.cache.createGatewayReceiverFactory(); + int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + fact.setStartPort(port); + fact.setEndPort(port); + fact.setManualStart(true); + GatewayReceiver receiver = fact.create(); + try { + receiver.start(); + } + catch (IOException e) { + e.printStackTrace(); + com.gemstone.gemfire.test.dunit.Assert.fail("Failed to start GatewayReceiver on port " + port, e); + } + return port; + } + + public static void createReplicatedRegion(String regionName, String senderIds, Boolean offHeap){ + IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class + .getName()); + IgnoredException exp1 = IgnoredException.addIgnoredException(InterruptedException.class + .getName()); + IgnoredException exp2 = IgnoredException.addIgnoredException(GatewaySenderException.class + .getName()); + try { + AttributesFactory fact = new AttributesFactory(); + if (senderIds != null) { + StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); + while (tokenizer.hasMoreTokens()) { + String senderId = tokenizer.nextToken(); + fact.addGatewaySenderId(senderId); + } + } + fact.setDataPolicy(DataPolicy.REPLICATE); + fact.setScope(Scope.DISTRIBUTED_ACK); + fact.setOffHeap(offHeap); + Region r = cache.createRegionFactory(fact.create()).create(regionName); + assertNotNull(r); + } finally { + exp.remove(); + exp1.remove(); + exp2.remove(); + } + } + + public static void createNormalRegion(String regionName, String senderIds){ + AttributesFactory fact = new AttributesFactory(); + if(senderIds!= null){ + StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); + while (tokenizer.hasMoreTokens()){ + String senderId = tokenizer.nextToken(); + fact.addGatewaySenderId(senderId); + } + } + fact.setDataPolicy(DataPolicy.NORMAL); + fact.setScope(Scope.DISTRIBUTED_ACK); + Region r = cache.createRegionFactory(fact.create()).create(regionName); + assertNotNull(r); + } + + public static void createPersistentReplicatedRegion(String regionName, String senderIds, Boolean offHeap){ + AttributesFactory fact = new AttributesFactory(); + if(senderIds!= null){ + StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); + while (tokenizer.hasMoreTokens()){ + String senderId = tokenizer.nextToken(); + fact.addGatewaySenderId(senderId); + } + } + fact.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE); + fact.setOffHeap(offHeap); + Region r = cache.createRegionFactory(fact.create()).create(regionName); + assertNotNull(r); + } + + public static void createReplicatedRegionWithAsyncEventQueue( + String regionName, String asyncQueueIds, Boolean offHeap) { + IgnoredException exp1 = IgnoredException.addIgnoredException(ForceReattemptException.class + .getName()); + try { + AttributesFactory fact = new AttributesFactory(); + if (asyncQueueIds != null) { + StringTokenizer tokenizer = new StringTokenizer(asyncQueueIds, ","); + while (tokenizer.hasMoreTokens()) { + String asyncQueueId = tokenizer.nextToken(); + fact.addAsyncEventQueueId(asyncQueueId); + } + } + fact.setDataPolicy(DataPolicy.REPLICATE); + fact.setOffHeap(offHeap); + RegionFactory regionFactory = cache.createRegionFactory(fact.create()); + Region r = regionFactory.create(regionName); + assertNotNull(r); + } finally { + exp1.remove(); + } + } + + public static void createReplicatedRegionWithSenderAndAsyncEventQueue( + String regionName, String senderIds, String asyncChannelId, Boolean offHeap) { + IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class + .getName()); + try { + AttributesFactory fact = new AttributesFactory(); + if (senderIds != null) { + StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); + while (tokenizer.hasMoreTokens()) { + String senderId = tokenizer.nextToken(); + fact.addGatewaySenderId(senderId); + } + } + fact.setDataPolicy(DataPolicy.REPLICATE); + fact.setScope(Scope.DISTRIBUTED_ACK); + fact.setOffHeap(offHeap); + RegionFactory regionFactory = cache.createRegionFactory(fact.create()); + regionFactory.addAsyncEventQueueId(asyncChannelId); + Region r = regionFactory.create(regionName); + assertNotNull(r); + } finally { + exp.remove(); + } + } + + public static void createReplicatedRegion(String regionName, String senderIds, Scope scope, DataPolicy policy, Boolean offHeap){ + AttributesFactory fact = new AttributesFactory(); + if(senderIds!= null){ + StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); + while (tokenizer.hasMoreTokens()){ + String senderId = tokenizer.nextToken(); + fact.addGatewaySenderId(senderId); + } + } + fact.setDataPolicy(policy); + fact.setScope(scope); + fact.setOffHeap(offHeap); + Region r = cache.createRegionFactory(fact.create()).create(regionName); + assertNotNull(r); + } + + public static void createAsyncEventQueue( + String asyncChannelId, boolean isParallel, + Integer maxMemory, Integer batchSize, boolean isConflation, + boolean isPersistent, String diskStoreName, boolean isDiskSynchronous) { + + if (diskStoreName != null) { + File directory = new File(asyncChannelId + "_disk_" + + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); + directory.mkdir(); + File[] dirs1 = new File[] { directory }; + DiskStoreFactory dsf = cache.createDiskStoreFactory(); + dsf.setDiskDirs(dirs1); + dsf.create(diskStoreName); + } + + AsyncEventListener asyncEventListener = new MyAsyncEventListener(); + + AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory(); + factory.setBatchSize(batchSize); + factory.setPersistent(isPersistent); + factory.setDiskStoreName(diskStoreName); + factory.setDiskSynchronous(isDiskSynchronous); + factory.setBatchConflationEnabled(isConflation); + factory.setMaximumQueueMemory(maxMemory); + factory.setParallel(isParallel); + //set dispatcher threads + factory.setDispatcherThreads(numDispatcherThreadsForTheRun); + factory.create(asyncChannelId, asyncEventListener); + } + + public static void createPartitionedRegion(String regionName, String senderIds, Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap){ + IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class + .getName()); + IgnoredException exp1 = IgnoredException.addIgnoredException(PartitionOfflineException.class + .getName()); + try { + AttributesFactory fact = new AttributesFactory(); + if (senderIds != null) { + StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); + while (tokenizer.hasMoreTokens()) { + String senderId = tokenizer.nextToken(); + fact.addGatewaySenderId(senderId); + } + } + PartitionAttributesFactory pfact = new PartitionAttributesFactory(); + pfact.setTotalNumBuckets(totalNumBuckets); + pfact.setRedundantCopies(redundantCopies); + pfact.setRecoveryDelay(0); + fact.setPartitionAttributes(pfact.create()); + fact.setOffHeap(offHeap); + Region r = cache.createRegionFactory(fact.create()).create(regionName); + assertNotNull(r); + } finally { + exp.remove(); + exp1.remove(); + } + } + + // TODO:OFFHEAP: add offheap flavor + public static void createPartitionedRegionWithPersistence(String regionName, + String senderIds, Integer redundantCopies, Integer totalNumBuckets) { + IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class + .getName()); + IgnoredException exp1 = IgnoredException.addIgnoredException(PartitionOfflineException.class + .getName()); + try { + AttributesFactory fact = new AttributesFactory(); + if (senderIds != null) { + StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); + while (tokenizer.hasMoreTokens()) { + String senderId = tokenizer.nextToken(); + fact.addGatewaySenderId(senderId); + } + } + PartitionAttributesFactory pfact = new PartitionAttributesFactory(); + pfact.setTotalNumBuckets(totalNumBuckets); + pfact.setRedundantCopies(redundantCopies); + pfact.setRecoveryDelay(0); + fact.setPartitionAttributes(pfact.create()); + fact.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); + Region r = cache.createRegionFactory(fact.create()).create(regionName); + assertNotNull(r); + } finally { + exp.remove(); + exp1.remove(); + } + } + public static void createColocatedPartitionedRegion(String regionName, + String senderIds, Integer redundantCopies, Integer totalNumBuckets, String colocatedWith) { + IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class + .getName()); + IgnoredException exp1 = IgnoredException.addIgnoredException(PartitionOfflineException.class + .getName()); + try { + AttributesFactory fact = new AttributesFactory(); + if (senderIds != null) { + StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); + while (tokenizer.hasMoreTokens()) { + String senderId = tokenizer.nextToken(); + fact.addGatewaySenderId(senderId); + } + } + PartitionAttributesFactory pfact = new PartitionAttributesFactory(); + pfact.setTotalNumBuckets(totalNumBuckets); + pfact.setRedundantCopies(redundantCopies); + pfact.setRecoveryDelay(0); + pfact.setColocatedWith(colocatedWith); + fact.setPartitionAttributes(pfact.create()); + Region r = cache.createRegionFactory(fact.create()).create(regionName); + assertNotNull(r); + } finally { + exp.remove(); + exp1.remove(); + } + } + + public static void addSenderThroughAttributesMutator(String regionName, + String senderIds){ + final Region r = cache.getRegion(Region.SEPARATOR + regionName); + assertNotNull(r); + AttributesMutator mutator = r.getAttributesMutator(); + mutator.addGatewaySenderId(senderIds); + } + + public static void addAsyncEventQueueThroughAttributesMutator( + String regionName, String queueId) { + final Region r = cache.getRegion(Region.SEPARATOR + regionName); + assertNotNull(r); + AttributesMutator mutator = r.getAttributesMutator(); + mutator.addAsyncEventQueueId(queueId); + } + + public static void createPartitionedRegionAsAccessor( + String regionName, String senderIds, Integer redundantCopies, Integer totalNumBuckets){ + AttributesFactory fact = new AttributesFactory(); + if(senderIds!= null){ + StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); + while (tokenizer.hasMoreTokens()){ + String senderId = tokenizer.nextToken(); + fact.addGatewaySenderId(senderId); + } + } + PartitionAttributesFactory pfact = new PartitionAttributesFactory(); + pfact.setTotalNumBuckets(totalNumBuckets); + pfact.setRedundantCopies(redundantCopies); + pfact.setLocalMaxMemory(0); + fact.setPartitionAttributes(pfact.create()); + Region r = cache.createRegionFactory(fact.create()).create(regionName); + assertNotNull(r); + } + + public static void createPartitionedRegionWithSerialParallelSenderIds(String regionName, String serialSenderIds, String parallelSenderIds, String colocatedWith, Boolean offHeap){ + AttributesFactory fact = new AttributesFactory(); + if (serialSenderIds != null) { + StringTokenizer tokenizer = new StringTokenizer(serialSenderIds, ","); + while (tokenizer.hasMoreTokens()) { + String senderId = tokenizer.nextToken(); + fact.addGatewaySenderId(senderId); + } + } + if (parallelSenderIds != null) { + StringTokenizer tokenizer = new StringTokenizer(parallelSenderIds, ","); + while (tokenizer.hasMoreTokens()) { + String senderId = tokenizer.nextToken(); + fact.addGatewaySenderId(senderId); + } + } + PartitionAttributesFactory pfact = new PartitionAttributesFactory(); + pfact.setColocatedWith(colocatedWith); + fact.setPartitionAttributes(pfact.create()); + fact.setOffHeap(offHeap); + Region r = cache.createRegionFactory(fact.create()).create(regionName); + assertNotNull(r); + } + + public static void createPersistentPartitionedRegion( + String regionName, + String senderIds, + Integer redundantCopies, + Integer totalNumBuckets, + Boolean offHeap){ + + IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class + .getName()); + IgnoredException exp1 = IgnoredException.addIgnoredException(PartitionOfflineException.class + .getName()); + try { + + AttributesFactory fact = new AttributesFactory(); + if (senderIds != null) { + StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); + while (tokenizer.hasMoreTokens()) { + String senderId = tokenizer.nextToken(); + fact.addGatewaySenderId(senderId); + } + } + PartitionAttributesFactory pfact = new PartitionAttributesFactory(); + pfact.setTotalNumBuckets(totalNumBuckets); + pfact.setRedundantCopies(redundantCopies); + fact.setPartitionAttributes(pfact.create()); + fact.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); + fact.setOffHeap(offHeap); + Region r = cache.createRegionFactory(fact.create()).create(regionName); + assertNotNull(r); + } finally { + exp.remove(); + exp1.remove(); + } + } + + public static void createCustomerOrderShipmentPartitionedRegion( + String senderIds, Integer redundantCopies, + Integer totalNumBuckets, Boolean offHeap) { + IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class + .getName()); + try { + AttributesFactory fact = new AttributesFactory(); + if (senderIds != null) { + StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); + while (tokenizer.hasMoreTokens()) { + String senderId = tokenizer.nextToken(); + fact.addGatewaySenderId(senderId); + } + } + + PartitionAttributesFactory paf = new PartitionAttributesFactory(); + paf.setRedundantCopies(redundantCopies) + .setTotalNumBuckets(totalNumBuckets) + .setPartitionResolver( + new CustomerIDPartitionResolver("CustomerIDPartitionResolver")); + fact.setPartitionAttributes(paf.create()); + fact.setOffHeap(offHeap); + customerRegion = (PartitionedRegion)cache.createRegionFactory( + fact.create()).create(customerRegionName); + assertNotNull(customerRegion); + LogWriterUtils.getLogWriter().info( + "Partitioned Region CUSTOMER created Successfully :" + + customerRegion.toString()); + + paf = new PartitionAttributesFactory(); + paf.setRedundantCopies(redundantCopies) + .setTotalNumBuckets(totalNumBuckets) + .setColocatedWith(customerRegionName) + .setPartitionResolver( + new CustomerIDPartitionResolver("CustomerIDPartitionResolver")); + fact = new AttributesFactory(); + if (senderIds != null) { + StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); + while (tokenizer.hasMoreTokens()) { + String senderId = tokenizer.nextToken(); + fact.addGatewaySenderId(senderId); + } + } + fact.setPartitionAttributes(paf.create()); + fact.setOffHeap(offHeap); + orderRegion = (PartitionedRegion)cache.createRegionFactory(fact.create()) + .create(orderRegionName); + assertNotNull(orderRegion); + LogWriterUtils.getLogWriter().info( + "Partitioned Region ORDER created Successfully :" + + orderRegion.toString()); + + paf = new PartitionAttributesFactory(); + paf.setRedundantCopies(redundantCopies) + .setTotalNumBuckets(totalNumBuckets) + .setColocatedWith(orderRegionName) + .setPartitionResolver( + new CustomerIDPartitionResolver("CustomerIDPartitionResolver")); + fact = new AttributesFactory(); + if (senderIds != null) { + StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); + while (tokenizer.hasMoreTokens()) { + String senderId = tokenizer.nextToken(); + fact.addGatewaySenderId(senderId); + } + } + fact.setPartitionAttributes(paf.create()); + fact.setOffHeap(offHeap); + shipmentRegion = (PartitionedRegion)cache.createRegionFactory( + fact.create()).create(shipmentRegionName); + assertNotNull(shipmentRegion); + LogWriterUtils.getLogWriter().info( + "Partitioned Region SHIPMENT created Successfully :" + + shipmentRegion.toString()); + } finally { + exp.remove(); + } + } + + public static void createColocatedPartitionedRegions(String regionName, String senderIds, Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap){ + AttributesFactory fact = new AttributesFactory(); + if(senderIds!= null){ + StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); + while (tokenizer.hasMoreTokens()){ + String senderId = tokenizer.nextToken(); + fact.addGatewaySenderId(senderId); + } + } + PartitionAttributesFactory pfact = new PartitionAttributesFactory(); + pfact.setTotalNumBuckets(totalNumBuckets); + pfact.setRedundantCopies(redundantCopies); + fact.setPartitionAttributes(pfact.create()); + fact.setOffHeap(offHeap); + Region r = cache.createRegionFactory(fact.create()).create(regionName); + assertNotNull(r); + + pfact.setColocatedWith(r.getName()); + fact.setPartitionAttributes(pfact.create()); + fact.setOffHeap(offHeap); + Region r1 = cache.createRegionFactory(fact.create()).create(regionName+"_child1"); + assertNotNull(r1); + + Region r2 = cache.createRegionFactory(fact.create()).create(regionName+"_child2"); + assertNotNull(r2); + } + + public static void createColocatedPartitionedRegions2 (String regionName, String senderIds, Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap){ + AttributesFactory fact = new AttributesFactory(); + if(senderIds!= null){ + StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); + while (tokenizer.hasMoreTokens()){ + String senderId = tokenizer.nextToken(); + fact.addGatewaySenderId(senderId); + } + } + PartitionAttributesFactory pfact = new PartitionAttributesFactory(); + pfact.setTotalNumBuckets(totalNumBuckets); + pfact.setRedundantCopies(redundantCopies); + fact.setPartitionAttributes(pfact.create()); + fact.setOffHeap(offHeap); + Region r = cache.createRegionFactory(fact.create()).create(regionName); + assertNotNull(r); + + + fact = new AttributesFactory(); + pfact.setColocatedWith(r.getName()); + fact.setPartitionAttributes(pfact.create()); + fact.setOffHeap(offHeap); + Region r1 = cache.createRegionFactory(fact.create()).create(regionName+"_child1"); + assertNotNull(r1); + + Region r2 = cache.createRegionFactory(fact.create()).create(regionName+"_child2"); + assertNotNull(r2); + } + + public static void createCacheInVMs(Integer locatorPort, VM... vms) { + for (VM vm : vms) { + vm.invoke(() -> createCache(locatorPort)); + } + } + + public static void addListenerToSleepAfterCreateEvent(int milliSeconds, final String regionName) { + cache.getRegion(regionName).getAttributesMutator() + .addCacheListener(new CacheListenerAdapter<Object, Object>() { + @Override + public void afterCreate(final EntryEvent<Object, Object> event) { + try { + Thread.sleep(milliSeconds); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + }); + } + + private static CacheListener myListener; + public static void longPauseAfterNumEvents(int numEvents, int milliSeconds) { + myListener = new CacheListenerAdapter<Object, Object>() { + @Override + public void afterCreate(final EntryEvent<Object, Object> event) { + try { + if (event.getRegion().size() >= numEvents){ + Thread.sleep(milliSeconds); + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + }; + cache.getRegion(getTestMethodName() + "_RR_1").getAttributesMutator() + .addCacheListener(myListener); + } + + public static void removeCacheListener() { + cache.getRegion(getTestMethodName() + "_RR_1").getAttributesMutator() + .removeCacheListener(myListener); + + } + + + public static void createCache(Integer locPort){ + createCache(false, locPort); + } + public static void createManagementCache(Integer locPort){ + createCache(true, locPort); + } + + public static void createCacheConserveSockets(Boolean conserveSockets,Integer locPort){ + WANTestBase test = new WANTestBase(); + Properties props = test.getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, "localhost[" + locPort + "]"); + props.setProperty(CONSERVE_SOCKETS, conserveSockets.toString()); + InternalDistributedSystem ds = test.getSystem(props); + cache = CacheFactory.create(ds); + } + + protected static void createCache(boolean management, Integer locPort) { + WANTestBase test = new WANTestBase(); + Properties props = test.getDistributedSystemProperties(); + if (management) { + props.setProperty(JMX_MANAGER, "true"); + props.setProperty(JMX_MANAGER_START, "false"); + props.setProperty(JMX_MANAGER_PORT, "0"); + props.setProperty(JMX_MANAGER_HTTP_PORT, "0"); + } + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, "localhost[" + locPort + "]"); + InternalDistributedSystem ds = test.getSystem(props); + cache = CacheFactory.create(ds); + } + + protected static void createCacheWithSSL(Integer locPort) { + WANTestBase test = new WANTestBase(); + + boolean gatewaySslenabled = true; + String gatewaySslprotocols = "any"; + String gatewaySslciphers = "any"; + boolean gatewaySslRequireAuth = true; + + Properties gemFireProps = test.getDistributedSystemProperties(); + gemFireProps.put(LOG_LEVEL, LogWriterUtils.getDUnitLogLevel()); + gemFireProps.put(GATEWAY_SSL_ENABLED, String.valueOf(gatewaySslenabled)); + gemFireProps.put(GATEWAY_SSL_PROTOCOLS, gatewaySslprotocols); + gemFireProps.put(GATEWAY_SSL_CIPHERS, gatewaySslciphers); + gemFireProps.put(GATEWAY_SSL_REQUIRE_AUTHENTICATION, String.valueOf(gatewaySslRequireAuth)); + + gemFireProps.put(GATEWAY_SSL_KEYSTORE_TYPE, "jks"); + gemFireProps.put(GATEWAY_SSL_KEYSTORE, + TestUtil.getResourcePath(WANTestBase.class, "/com/gemstone/gemfire/cache/client/internal/client.keystore")); + gemFireProps.put(GATEWAY_SSL_KEYSTORE_PASSWORD, "password"); + gemFireProps.put(GATEWAY_SSL_TRUSTSTORE, + TestUtil.getResourcePath(WANTestBase.class, "/com/gemstone/gemfire/cache/client/internal/client.truststore")); + gemFireProps.put(GATEWAY_SSL_TRUSTSTORE_PASSWORD, "password"); + + gemFireProps.setProperty(MCAST_PORT, "0"); + gemFireProps.setProperty(LOCATORS, "localhost[" + locPort + "]"); + + LogWriterUtils.getLogWriter().info("Starting cache ds with following properties \n" + gemFireProps); + + InternalDistributedSystem ds = test.getSystem(gemFireProps); + cache = CacheFactory.create(ds); + } + + public static void createCache_PDX(Integer locPort){ + WANTestBase test = new WANTestBase(); + Properties props = test.getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, "localhost[" + locPort + "]"); + InternalDistributedSystem ds = test.getSystem(props); + CacheConfig cacheConfig = new CacheConfig(); + cacheConfig.setPdxPersistent(true); + cacheConfig.setPdxDiskStore("PDX_TEST"); + cache = GemFireCacheImpl.create(ds, false, cacheConfig); + + File pdxDir = new File(CacheTestCase.getDiskDir(), "pdx"); + DiskStoreFactory dsf = cache.createDiskStoreFactory(); + File [] dirs1 = new File[] {pdxDir}; + dsf.setDiskDirs(dirs1).setMaxOplogSize(1).create("PDX_TEST"); + } + + public static void createCache(Integer locPort1, Integer locPort2){ + WANTestBase test = new WANTestBase(); + Properties props = test.getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, "localhost[" + locPort1 + + "],localhost[" + locPort2 + "]"); + InternalDistributedSystem ds = test.getSystem(props); + cache = CacheFactory.create(ds); + } + + public static void createCacheWithoutLocator(Integer mCastPort){ + WANTestBase test = new WANTestBase(); + Properties props = test.getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "" + mCastPort); + InternalDistributedSystem ds = test.getSystem(props); + cache = CacheFactory.create(ds); + } + + /** + * Method that creates a bridge server + * + * @return Integer Port on which the server is started. + */ + public static Integer createCacheServer() { + CacheServer server1 = cache.addCacheServer(); + assertNotNull(server1); + server1.setPort(0); + try { + server1.start(); + } + catch (IOException e) { + com.gemstone.gemfire.test.dunit.Assert.fail("Failed to start the Server", e); + } + assertTrue(server1.isRunning()); + + return new Integer(server1.getPort()); + } + + /** + * Returns a Map that contains the count for number of bridge server and number + * of Receivers. + * + * @return Map + */ + public static Map getCacheServers() { + List cacheServers = cache.getCacheServers(); + + Map cacheServersMap = new HashMap(); + Iterator itr = cacheServers.iterator(); + int bridgeServerCounter = 0; + int receiverServerCounter = 0; + while (itr.hasNext()) { + CacheServerImpl cacheServer = (CacheServerImpl) itr.next(); + if (cacheServer.getAcceptor().isGatewayReceiver()) { + receiverServerCounter++; + } else { + bridgeServerCounter++; + } + } + cacheServersMap.put("BridgeServer", bridgeServerCounter); + cacheServersMap.put("ReceiverServer", receiverServerCounter); + return cacheServersMap; + } + + public static void startSenderInVMs(String senderId, VM... vms) { + for (VM vm : vms) { + vm.invoke(() -> startSender(senderId)); + } + } + + public static void startSenderInVMsAsync(String senderId, VM... vms) { + List<AsyncInvocation> tasks = new LinkedList<>(); + for (VM vm : vms) { + tasks.add(vm.invokeAsync(() -> startSender(senderId))); + } + for (AsyncInvocation invocation : tasks) { + try { + invocation.join(30000); // TODO: these might be AsyncInvocation orphans + } + catch (InterruptedException e) { + fail("Starting senders was interrupted"); + } + } + } + + + public static void startSender(String senderId) { + final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); + + IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class + .getName()); + IgnoredException exp1 = IgnoredException.addIgnoredException(InterruptedException.class + .getName()); + try { + Set<GatewaySender> senders = cache.getGatewaySenders(); + GatewaySender sender = null; + for (GatewaySender s : senders) { + if (s.getId().equals(senderId)) { + sender = s; + break; + } + } + sender.start(); + } finally { + exp.remove(); + exp1.remove(); + exln.remove(); + } + + } + + public static void enableConflation(String senderId) { + Set<GatewaySender> senders = cache.getGatewaySenders(); + AbstractGatewaySender sender = null; + for (GatewaySender s : senders) { + if (s.getId().equals(senderId)) { + sender = (AbstractGatewaySender)s; + break; + } + } + sender.test_setBatchConflationEnabled(true); + } + + public static Map getSenderToReceiverConnectionInfo(String senderId){ + Set<GatewaySender> senders = cache.getGatewaySenders(); + GatewaySender sender = null; + for(GatewaySender s : senders){ + if(s.getId().equals(senderId)){ + sender = s; + break; + } + } + Map connectionInfo = null; + if (!sender.isParallel() && ((AbstractGatewaySender) sender).isPrimary()) { + connectionInfo = new HashMap(); + GatewaySenderEventDispatcher dispatcher = + ((AbstractGatewaySender)sender).getEventProcessor().getDispatcher(); + if (dispatcher instanceof GatewaySenderEventRemoteDispatcher) { + ServerLocation serverLocation = + ((GatewaySenderEventRemoteDispatcher) dispatcher).getConnection(false).getServer(); + connectionInfo.put("serverHost", serverLocation.getHostName()); + connectionInfo.put("serverPort", serverLocation.getPort()); + + } + } + return connectionInfo; + } + public static List<Integer> getSenderStats(String senderId, final int expectedQueueSize){ + Set<GatewaySender> senders = cache.getGatewaySenders(); + AbstractGatewaySender sender = null; + for (GatewaySender s : senders) { + if (s.getId().equals(senderId)) { + sender = (AbstractGatewaySender)s; + break; + } + } + final GatewaySenderStats statistics = sender.getStatistics(); + if (expectedQueueSize != -1) { + final RegionQueue regionQueue; + regionQueue = sender.getQueues().toArray( + new RegionQueue[1])[0]; + Awaitility.await().atMost(120,TimeUnit.SECONDS).until(() -> assertEquals("Expected queue entries: " + + expectedQueueSize + " but actual entries: " + regionQueue.size(), expectedQueueSize,regionQueue.size())); + } + ArrayList<Integer> stats = new ArrayList<Integer>(); + stats.add(statistics.getEventQueueSize()); + stats.add(statistics.getEventsReceived()); + stats.add(statistics.getEventsQueued()); + stats.add(statistics.getEventsDistributed()); + stats.add(statistics.getBatchesDistributed()); + stats.add(statistics.getBatchesRedistributed()); + stats.add(statistics.getEventsFiltered()); + stats.add(statistics.getEventsNotQueuedConflated()); + stats.add(statistics.getEventsConflatedFromBatches()); + return stats; + } + + public static void checkQueueStats(String senderId, final int queueSize, + final int eventsReceived, final int eventsQueued, + final int eventsDistributed) { + Set<GatewaySender> senders = cache.getGatewaySenders(); + GatewaySender sender = null; + for (GatewaySender s : senders) { + if (s.getId().equals(senderId)) { + sender = s; + break; + } + } + + final GatewaySenderStats statistics = ((AbstractGatewaySender)sender).getStatistics(); + assertEquals(queueSize, statistics.getEventQueueSize()); + assertEquals(eventsReceived, statistics.getEventsReceived()); + assertEquals(eventsQueued, statistics.getEventsQueued()); + assert(statistics.getEventsDistributed() >= eventsDistributed); + } + + public static void checkGatewayReceiverStats(int processBatches, + int eventsReceived, int creates) { + Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers(); + GatewayReceiver receiver = gatewayReceivers.iterator().next(); + CacheServerStats stats = ((CacheServerImpl)receiver.getServer()) + .getAcceptor().getStats(); + + assertTrue(stats instanceof GatewayReceiverStats); + GatewayReceiverStats gatewayReceiverStats = (GatewayReceiverStats)stats; + assertTrue(gatewayReceiverStats.getProcessBatchRequests() >= processBatches); + assertEquals(eventsReceived, gatewayReceiverStats.getEventsReceived()); + assertEquals(creates, gatewayReceiverStats.getCreateRequest()); + } + + public static void checkMinimumGatewayReceiverStats(int processBatches, + int eventsReceived) { + Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers(); + GatewayReceiver receiver = gatewayReceivers.iterator().next(); + CacheServerStats stats = ((CacheServerImpl)receiver.getServer()) + .getAcceptor().getStats(); + + assertTrue(stats instanceof GatewayReceiverStats); + GatewayReceiverStats gatewayReceiverStats = (GatewayReceiverStats)stats; + assertTrue(gatewayReceiverStats.getProcessBatchRequests() >= processBatches); + assertTrue(gatewayReceiverStats.getEventsReceived()>= eventsReceived); + } + + public static void checkExceptionStats(int exceptionsOccurred) { + Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers(); + GatewayReceiver receiver = gatewayReceivers.iterator().next(); + CacheServerStats stats = ((CacheServerImpl)receiver.getServer()) + .getAcceptor().getStats(); + + assertTrue(stats instanceof GatewayReceiverStats); + GatewayReceiverStats gatewayReceiverStats = (GatewayReceiverStats)stats; + if (exceptionsOccurred == 0) { + assertEquals(exceptionsOccurred, gatewayReceiverStats + .getExceptionsOccured()); + } + else { + assertTrue(gatewayReceiverStats.getExceptionsOccured() >= exceptionsOccurred); + } + } + + public static void checkGatewayReceiverStatsHA(int processBatches, + int eventsReceived, int creates) { + Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers(); + GatewayReceiver receiver = gatewayReceivers.iterator().next(); + CacheServerStats stats = ((CacheServerImpl)receiver.getServer()) + .getAcceptor().getStats(); + + assertTrue(stats instanceof GatewayReceiverStats); + GatewayReceiverStats gatewayReceiverStats = (GatewayReceiverStats)stats; + assertTrue(gatewayReceiverStats.getProcessBatchRequests() >= processBatches); + assertTrue(gatewayReceiverStats.getEventsReceived() >= eventsReceived); + assertTrue(gatewayReceiverStats.getCreateRequest() >= creates); + } + + public static void checkEventFilteredStats(String senderId, final int eventsFiltered) { + Set<GatewaySender> senders = cache.getGatewaySenders(); + GatewaySender sender = null; + for (GatewaySender s : senders) { + if (s.getId().equals(senderId)) { + sender = s; + break; + } + } + final GatewaySenderStats statistics = ((AbstractGatewaySender)sender).getStatistics(); + assertEquals(eventsFiltered, statistics.getEventsFiltered()); + } + + public static void checkConflatedStats(String senderId, final int eventsConflated) { + Set<GatewaySender> senders = cache.getGatewaySenders(); + GatewaySender sender = null; + for (GatewaySender s : senders) { + if (s.getId().equals(senderId)) { + sender = s; + break; + } + } + final GatewaySenderStats statistics = ((AbstractGatewaySender)sender).getStatistics(); + assertEquals(eventsConflated, statistics.getEventsNotQueuedConflated()); + } + + public static void checkStats_Failover(String senderId, + final int eventsReceived) { + Set<GatewaySender> senders = cache.getGatewaySenders(); + GatewaySender sender = null; + for (GatewaySender s : senders) { + if (s.getId().equals(senderId)) { + sender = s; + break; + } + } + final GatewaySenderStats statistics = ((AbstractGatewaySender)sender) + .getStatistics(); + + assertEquals(eventsReceived, statistics.getEventsReceived()); + assertEquals(eventsReceived, (statistics.getEventsQueued() + + statistics.getUnprocessedTokensAddedByPrimary() + statistics + .getUnprocessedEventsRemovedByPrimary())); + } + + public static void checkBatchStats(String senderId, final int batches) { + Set<GatewaySender> senders = cache.getGatewaySenders(); + GatewaySender sender = null; + for (GatewaySender s : senders) { + if (s.getId().equals(senderId)) { + sender = s; + break; + } + } + final GatewaySenderStats statistics = ((AbstractGatewaySender)sender) + .getStatistics(); + assert (statistics.getBatchesDistributed() >= batches); + assertEquals(0, statistics.getBatchesRedistributed()); + } + + public static void checkBatchStats(String senderId, + final boolean batchesDistributed, final boolean batchesRedistributed) { + Set<GatewaySender> senders = cache.getGatewaySenders(); + GatewaySender sender = null; + for (GatewaySender s : senders) { + if (s.getId().equals(senderId)) { + sender = s; + break; + } + } + final GatewaySenderStats statistics = ((AbstractGatewaySender)sender) + .getStatistics(); + assertEquals(batchesDistributed, (statistics.getBatchesDistributed() > 0)); + assertEquals(batchesRedistributed, + (statistics.getBatchesRedistributed() > 0)); + } + + public static void checkUnProcessedStats(String senderId, int events) { + Set<GatewaySender> senders = cache.getGatewaySenders(); + GatewaySender sender = null; + for (GatewaySender s : senders) { + if (s.getId().equals(senderId)) { + sender = s; + break; + } + } + final GatewaySenderStats statistics = ((AbstractGatewaySender)sender) + .getStatistics(); + assertEquals(events, + (statistics.getUnprocessedEventsAddedBySecondary() + statistics + .getUnprocessedTokensRemovedBySecondary())); + assertEquals(events, + (statistics.getUnprocessedEventsRemovedByPrimary() + statistics + .getUnprocessedTokensAddedByPrimary())); + } + + public static void waitForSenderRunningState(String senderId){ + final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); + try { + Set<GatewaySender> senders = cache.getGatewaySenders(); + final GatewaySender sender = getGatewaySenderById(senders, senderId); + Awaitility.await().atMost(300,TimeUnit.SECONDS).until(() -> assertEquals("Expected sender isRunning state to " + + "be true but is false", true, (sender != null && sender.isRunning()))); + } finally { + exln.remove(); + } + } + + public static void waitForSenderToBecomePrimary(String senderId){ + Set<GatewaySender> senders = ((GemFireCacheImpl)cache).getAllGatewaySenders(); + final GatewaySender sender = getGatewaySenderById(senders, senderId); + Awaitility.await().atMost(10,TimeUnit.SECONDS).until(() -> assertEquals("Expected sender primary state to " + + "be true but is false", true, (sender != null && ((AbstractGatewaySender)sender).isPrimary()))); + } + + private static GatewaySender getGatewaySenderById(Set<GatewaySender> senders, String senderId) { + for(GatewaySender s : senders){ + if(s.getId().equals(senderId)){ + return s; + } + } + //if none of the senders matches with the supplied senderid, return null + return null; + } + + public static HashMap checkQueue(){ + HashMap listenerAttrs = new HashMap(); + listenerAttrs.put("Create", listener1.createList); + listenerAttrs.put("Update", listener1.updateList); + listenerAttrs.put("Destroy", listener1.destroyList); + return listenerAttrs; + } + + public static void checkQueueOnSecondary (final Map primaryUpdatesMap){ + final HashMap secondaryUpdatesMap = new HashMap(); + secondaryUpdatesMap.put("Create", listener1.createList); + secondaryUpdatesMap.put("Update", listener1.updateList); + secondaryUpdatesMap.put("Destroy", listener1.destroyList); + + Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> { + secondaryUpdatesMap.put("Create", listener1.createList); + secondaryUpdatesMap.put("Update", listener1.updateList); + secondaryUpdatesMap.put("Destroy", listener1.destroyList); + assertEquals("Expected secondary map to be " + primaryUpdatesMap + " but it is " + secondaryUpdatesMap, + true,secondaryUpdatesMap.equals(primaryUpdatesMap)); + }); + } + + public static HashMap checkQueue2(){ + HashMap listenerAttrs = new HashMap(); + listenerAttrs.put("Create", listener2.createList); + listenerAttrs.put("Update", listener2.updateList); + listenerAttrs.put("Destroy", listener2.destroyList); + return listenerAttrs; + } + + public static HashMap checkPR(String regionName){ + PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName); + QueueListener listener = (QueueListener)region.getCacheListener(); + + HashMap listenerAttrs = new HashMap(); + listenerAttrs.put("Create", listener.createList); + listenerAttrs.put("Update", listener.updateList); + listenerAttrs.put("Destroy", listener.destroyList); + return listenerAttrs; + } + + public static HashMap checkBR(String regionName, int numBuckets){ + PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName); + HashMap listenerAttrs = new HashMap(); + for (int i = 0; i < numBuckets; i++) { + BucketRegion br = region.getBucketRegion(i); + QueueListener listener = (QueueListener)br.getCacheListener(); + listenerAttrs.put("Create"+i, listener.createList); + listenerAttrs.put("Update"+i, listener.updateList); + listenerAttrs.put("Destroy"+i, listener.destroyList); + } + return listenerAttrs; + } + + public static HashMap checkQueue_BR(String senderId, int numBuckets){ + Set<GatewaySender> senders = cache.getGatewaySenders(); + GatewaySender sender = null; + for(GatewaySender s : senders){ + if(s.getId().equals(senderId)){ + sender = s; + break; + } + } + RegionQueue parallelQueue = ((AbstractGatewaySender)sender) + .getQueues().toArray(new RegionQueue[1])[0]; + + PartitionedRegion region = (PartitionedRegion)parallelQueue.getRegion(); + HashMap listenerAttrs = new HashMap(); + for (int i = 0; i < numBuckets; i++) { + BucketRegion br = region.getBucketRegion(i); + if (br != null) { + QueueListener listener = (QueueListener)br.getCacheListener(); + if (listener != null) { + listenerAttrs.put("Create"+i, listener.createList); + listenerAttrs.put("Update"+i, listener.updateList); + listenerAttrs.put("Destroy"+i, listener.destroyList); + } + } + } + return listenerAttrs; + } + + public static void addListenerOnBucketRegion(String regionName, int numBuckets) { + WANTestBase test = new WANTestBase(); + test.addCacheListenerOnBucketRegion(regionName, numBuckets); + } + + private void addCacheListenerOnBucketRegion(String regionName, int numBuckets){ + PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName); + for (int i = 0; i < numBuckets; i++) { + BucketRegion br = region.getBucketRegion(i); + AttributesMutator mutator = br.getAttributesMutator(); + listener1 = new QueueListener(); + mutator.addCacheListener(listener1); + } + } + + public static void addListenerOnQueueBucketRegion(String senderId, int numBuckets) { + WANTestBase test = new WANTestBase(); + test.addCacheListenerOnQueueBucketRegion(senderId, numBuckets); + } + + private void addCacheListenerOnQueueBucketRegion(String senderId, int numBuckets){ + Set<GatewaySender> senders = cache.getGatewaySenders(); + GatewaySender sender = null; + for(GatewaySender s : senders){ + if(s.getId().equals(senderId)){ + sender = s; + break; + } + } + RegionQueue parallelQueue = ((AbstractGatewaySender)sender) + .getQueues().toArray(new RegionQueue[1])[0]; + + PartitionedRegion region = (PartitionedRegion)parallelQueue.getRegion(); + for (int i = 0; i < numBuckets; i++) { + BucketRegion br = region.getBucketRegion(i); + if (br != null) { + AttributesMutator mutator = br.getAttributesMutator(); + CacheListener listener = new QueueListener(); + mutator.addCacheListener(listener); + } + } + + } + + public static void addQueueListener(String senderId, boolean isParallel){ + WANTestBase test = new WANTestBase(); + test.addCacheQueueListener(senderId, isParallel); + } + + public static void addSecondQueueListener(String senderId, boolean isParallel){ + WANTestBase test = new WANTestBase(); + test.addSecondCacheQueueListener(senderId, isParallel); + } + + public static void addListenerOnRegion(String regionName){ + WANTestBase test = new WANTestBase(); + test.addCacheListenerOnRegion(regionName); + } + private void addCacheListenerOnRegion(String regionName){ + Region region = cache.getRegion(regionName); + AttributesMutator mutator = region.getAttributesMutator(); + listener1 = new QueueListener(); + mutator.addCacheListener(listener1); + } + + private void addCacheQueueListener(String senderId, boolean isParallel) { + Set<GatewaySender> senders = cache.getGatewaySenders(); + GatewaySender sender = null; + for(GatewaySender s : senders){ + if(s.getId().equals(senderId)){ + sender = s; + break; + } + } + listener1 = new QueueListener(); + if (!isParallel) { + Set<RegionQueue> queues = ((AbstractGatewaySender)sender).getQueues(); + for(RegionQueue q: queues) { + q.addCacheListener(listener1); + } + } + else { + RegionQueue parallelQueue = ((AbstractGatewaySender)sender) + .getQueues().toArray(new RegionQueue[1])[0]; + parallelQueue.addCacheListener(listener1); + } + } + + private void addSecondCacheQueueListener(String senderId, boolean isParallel) { + Set<GatewaySender> senders = cache.getGatewaySenders(); + GatewaySender sender = null; + for (GatewaySender s : senders) { + if (s.getId().equals(senderId)) { + sender = s; + break; + } + } + listener2 = new QueueListener(); + if (!isParallel) { + Set<RegionQueue> queues = ((AbstractGatewaySender)sender).getQueues(); + for(RegionQueue q: queues) { + q.addCacheListener(listener2); + } + } + else { + RegionQueue parallelQueue = ((AbstractGatewaySender)sender) + .getQueues().toArray(new RegionQueue[1])[0]; + parallelQueue.addCacheListener(listener2); + } + } + + public static void pauseSender(String senderId) { + final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); + IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class + .getName()); + try { + Set<GatewaySender> senders = cache.getGatewaySenders(); + GatewaySender sender = null; + for (GatewaySender s : senders) { + if (s.getId().equals(senderId)) { + sender = s; + break; + } + } + sender.pause(); + ((AbstractGatewaySender) sender).getEventProcessor().waitForDispatcherToPause(); + + } finally { + exp.remove(); + exln.remove(); + } + } + + public static void resumeSender(String senderId) { + final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); + IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class + .getName()); + try { + Set<GatewaySender> senders = cache.getGatewaySenders(); + GatewaySender sender = null; + for (GatewaySender s : senders) { + if (s.getId().equals(senderId)) { + sender = s; + break; + } + } + sender.resume(); + } finally { + exp.remove(); + exln.remove(); + } + } + + public static void stopSender(String senderId) { + final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); + IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class + .getName()); + try { + Set<GatewaySender> senders = cache.getGatewaySenders(); + GatewaySender sender = null; + for (GatewaySender s : senders) { + if (s.getId().equals(senderId)) { + sender = s; + break; + } + } + AbstractGatewaySenderEventProcessor eventProcessor = null; + if (sender instanceof AbstractGatewaySender) { + eventProcessor = ((AbstractGatewaySender) sender).getEventProcessor(); + } + sender.stop(); + + Set<RegionQueue> queues = null; + if (eventProcessor instanceof ConcurrentSerialGatewaySenderEventProcessor) { + queues = ((ConcurrentSerialGatewaySenderEventProcessor)eventProcessor).getQueues(); + for (RegionQueue queue: queues) { + if (queue instanceof SerialGatewaySenderQueue) { + assertFalse(((SerialGatewaySenderQueue) queue).isRemovalThreadAlive()); + } + } + } + } finally { + exp.remove(); + exln.remove(); + } + } + + public static void stopReceivers() { + Set<GatewayReceiver> receivers = cache.getGatewayReceivers(); + for (GatewayReceiver receiver : receivers) { + receiver.stop(); + } + } + + public static void startReceivers() { + Set<GatewayReceiver> receivers = cache.getGatewayReceivers(); + for (GatewayReceiver receiver : receivers) { + try { + receiver.start(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + public static GatewaySenderFactory configureGateway(DiskStoreFactory dsf, File[] dirs1, String dsName, + boolean isParallel, Integer maxMemory, + Integer batchSize, boolean isConflation, boolean isPersistent, + GatewayEventFilter filter, boolean isManualStart, int numDispatchers, OrderPolicy policy) { + + InternalGatewaySenderFactory gateway = (InternalGatewaySenderFactory)cache.createGatewaySenderFactory(); + gateway.setParallel(isParallel); + gateway.setMaximumQueueMemory(maxMemory); + gateway.setBatchSize(batchSize); + gateway.setBatchConflationEnabled(isConflation); + gateway.setManualStart(isManualStart); + gateway.setDispatcherThreads(numDispatchers); + gateway.setOrderPolicy(policy); + gateway.setLocatorDiscoveryCallback(new MyLocatorCallback()); + if (filter != null) { + eventFilter = filter; + gateway.addGatewayEventFilter(filter); + } + if (isPersistent) { + gateway.setPersistenceEnabled(true); + gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName) + .getName()); + } else { + DiskStore store = dsf.setDiskDirs(dirs1).create(dsName); + gateway.setDiskStoreName(store.getName()); + } + return gateway; + } + + public static void createSender(String dsName, int remoteDsId, + boolean isParallel, Integer maxMemory, + Integer batchSize, boolean isConflation, boolean isPersistent, + GatewayEventFilter filter, boolean isManualStart) { + final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); + try { + File persistentDirectory = new File(dsName + "_disk_" + + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); + persistentDirectory.mkdir(); + DiskStoreFactory dsf = cache.createDiskStoreFactory(); + File[] dirs1 = new File[] { persistentDirectory }; + GatewaySenderFactory gateway = configureGateway(dsf, dirs1, dsName, isParallel, maxMemory, batchSize, isConflation, isPersistent, filter, isManualStart, numDispatcherThreadsForTheRun, GatewaySender.DEFAULT_ORDER_POLICY); + gateway.create(dsName, remoteDsId); + + } finally { + exln.remove(); + } + } + + public static void createSenderWithMultipleDispatchers(String dsName, int remoteDsId, + boolean isParallel, Integer maxMemory, + Integer batchSize, boolean isConflation, boolean isPersistent, + GatewayEventFilter filter, boolean isManualStart, int numDispatchers, OrderPolicy orderPolicy) { + final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); + try { + File persistentDirectory = new File(dsName + "_disk_" + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); + persistentDirectory.mkdir(); + DiskStoreFactory dsf = cache.createDiskStoreFactory(); + File[] dirs1 = new File[] { persistentDirectory }; + GatewaySenderFactory gateway = configureGateway(dsf, dirs1, dsName,isParallel, maxMemory, batchSize, isConflation, isPersistent, filter, + isManualStart, numDispatchers, orderPolicy); + gateway.create(dsName, remoteDsId); + + } finally { + exln.remove(); + } + } + + public static void createSenderWithoutDiskStore(String dsName, int remoteDsId, Integer maxMemory, + Integer batchSize, boolean isConflation, boolean isManualStart) { + + GatewaySenderFactory gateway = cache.createGatewaySenderFactory(); + gateway.setParallel(true); + gateway.setMaximumQueueMemory(maxMemory); + gateway.setBatchSize(batchSize); + gateway.setManualStart(isManualStart); + //set dispatcher threads + gateway.setDispatcherThreads(numDispatcherThreadsForTheRun); + gateway.setBatchConflationEnabled(isConflation); + gateway.create(dsName, remoteDsId); + } + + public static void createConcurrentSender(String dsName, int remoteDsId, + boolean isParallel, Integer maxMemory, Integer batchSize, + boolean isConflation, boolean isPersistent, GatewayEventFilter filter, + boolean isManualStart, int concurrencyLevel, OrderPolicy policy) { + + File persistentDirectory = new File(dsName + "_disk_" + + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); + persistentDirectory.mkdir(); + DiskStoreFactory dsf = cache.createDiskStoreFactory(); + File[] dirs1 = new File[] { persistentDirectory }; + GatewaySenderFactory gateway = configureGateway(dsf, dirs1, dsName, isParallel, maxMemory, batchSize, isConflation, isPersistent, filter, isManualStart, concurrencyLevel, policy); + gateway.create(dsName, remoteDsId); + } + + public static void createSenderForValidations(String dsName, int remoteDsId, + boolean isParallel, Integer alertThreshold, + boolean isConflation, boolean isPersistent, + List<GatewayEventFilter> eventFilters, + List<GatewayTransportFilter> transportFilters, boolean isManualStart, + boolean isDiskSync) { + IgnoredException exp1 = IgnoredException.addIgnoredException(RegionDestroyedException.class + .getName()); + try { + File persistentDirectory = new File(dsName + "_disk_" + + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); + persistentDirectory.mkdir(); + DiskStoreFactory dsf = cache.createDiskStoreFactory(); + File[] dirs1 = new File[] { persistentDirectory }; + + if (isParallel) { + GatewaySenderFactory gateway = cache.createGatewaySenderFactory(); + gateway.setParallel(true); + gateway.setAlertThreshold(alertThreshold); + ((InternalGatewaySenderFactory)gateway) + .setLocatorDiscoveryCallback(new MyLocatorCallback()); + if (eventFilters != null) { + for (GatewayEventFilter filter : eventFilters) { + gateway.addGatewayEventFilter(filter); + } + } + if (transportFilters != null) { + for (GatewayTransportFilter filter : transportFilters) { + gateway.addGatewayTransportFilter(filter); + } + } + if (isPersistent) { + gateway.setPersistenceEnabled(true); + gateway.setDiskStoreName(dsf.setDiskDirs(dirs1) + .create(dsName + "_Parallel").getName()); + } + else { + DiskStore store = dsf.setDiskDirs(dirs1).create(dsName + "_Parallel"); + gateway.setDiskStoreName(store.getName()); + } + gateway.setDiskSynchronous(isDiskSync); + gateway.setBatchConflationEnabled(isConflation); + gateway.setManualStart(isManualStart); + //set dispatcher threads + gateway.setDispatcherThreads(numDispatcherThreadsForTheRun); + gateway.create(dsName, remoteDsId); + + } + else { + GatewaySenderFactory gateway = cache.createGatewaySenderFactory(); + gateway.setAlertThreshold(alertThreshold); + gateway.setManualStart(isManualStart); + //set dispatcher threads + gateway.setDispatcherThreads(numDispatcherThreadsForTheRun); + ((InternalGatewaySenderFactory)gateway) + .setLocatorDiscoveryCallback(new MyLocatorCallback()); + if (eventFilters != null) { + for (GatewayEventFilter filter : eventFilters) { + gateway.addGatewayEventFilter(filter); + } + } + if (transportFilters != null) { + for (GatewayTransportFilter filter : transportFilters) { + gateway.addGatewayTransportFilter(filter); + } + } + gateway.setBatchConflationEnabled(isConflation); + if (isPersistent) { + gateway.setPersistenceEnabled(true); + gateway.setDiskStoreName(dsf.setDiskDirs(dirs1) + .create(dsName + "_Serial").getName()); + } + else { + DiskStore store = dsf.setDiskDirs(dirs1).create(dsName + "_Serial"); + gateway.setDiskStoreName(store.getName()); + } + gateway.setDiskSynchronous(isDiskSync); + gateway.create(dsName, remoteDsId); + } + } finally { + exp1.remove(); + } + } + + public static String createSenderWithDiskStore(String dsName, int remoteDsId, + boolean isParallel, Integer maxMemory, + Integer batchSize, boolean isConflation, boolean isPersistent, + GatewayEventFilter filter, String dsStore, boolean isManualStart) { + File persistentDirectory = null; + if (dsStore == null) { + persistentDirectory = new File(dsName + "_disk_" + + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); + } + else { + persistentDirectory = new File(dsStore); + } + LogWriterUtils.getLogWriter().info("The ds is : " + persistentDirectory.getName()); + + persistentDirectory.mkdir(); + DiskStoreFactory dsf = cache.createDiskStoreFactory(); + File [] dirs1 = new File[] {persistentDirectory}; + + if(isParallel) { + GatewaySenderFactory gateway = cache.createGatewaySenderFactory(); + gateway.setParallel(true); + gateway.setMaximumQueueMemory(maxMemory); + gateway.setBatchSize(batchSize); + gateway.setManualStart(isManualStart); + //set dispatcher threads + gateway.setDispatcherThreads(numDispatcherThreadsForTheRun); + ((InternalGatewaySenderFactory)gateway).setLocatorDiscoveryCallback(new MyLocatorCallback()); + if (filter != null) { + gateway.addGatewayEventFilter(filter); + } + if(isPersistent) { + gateway.setPersistenceEnabled(true); + String dsname = dsf.setDiskDirs(dirs1).create(dsName).getName(); + gateway.setDiskStoreName(dsname); + LogWriterUtils.getLogWriter().info("The DiskStoreName is : " + dsname); + } + else { + DiskStore store = dsf.setDiskDirs(dirs1).create(dsName); + gateway.setDiskStoreName(store.getName()); + LogWriterUtils.getLogWriter().info("The ds is : " + store.getName()); + } + gateway.setBatchConflationEnabled(isConflation); + gateway.create(dsName, remoteDsId); + + }else { + GatewaySenderFactory gateway = cache.createGatewaySenderFactory(); + gateway.setMaximumQueueMemory(maxMemory); + gateway.setBatchSize(batchSize); + gateway.setManualStart(isManualStart); + //set dispatcher threads + gateway.setDispatcherThreads(numDispatcherThreadsForTheRun); + ((InternalGatewaySenderFactory)gateway).setLocatorDiscoveryCallback(new MyLocatorCallback()); + if (filter != null) { + gateway.addGatewayEventFilter(filter); + } + gateway.setBatchConflationEnabled(isConflation); + if(isPersistent) { + gateway.setPersistenceEnabled(true); + gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName()); + } + else { + DiskStore store = dsf.setDiskDirs(dirs1).create(dsName); + + gateway.setDiskStoreName(store.getName()); + } + gateway.create(dsName, remoteDsId); + } + return persistentDirectory.getName(); + } + + + public static void createSenderWithListener(String dsName, int remoteDsName, + boolean isParallel, Integer maxMemory, + Integer batchSize, boolean isConflation, boolean isPersistent, + GatewayEventFilter filter, boolean attachTwoListeners, boolean isManualStart) { + File persistentDirectory = new File(dsName + "_disk_" + + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); + persistentDirectory.mkdir(); + DiskStoreFactory dsf = cache.createDiskStoreFactory(); + File[] dirs1 = new File[] { persistentDirectory }; + + if (isParallel) { + GatewaySenderFactory gateway = cache + .createGatewaySenderFactory(); + gateway.setParallel(true); + gateway.setMaximumQueueMemory(maxMemory); + gateway.setBatchSize(batchSize); + gateway.setManualStart(isManualStart); + //set dispatcher threads + gateway.setDispatcherThreads(numDispatcherThreadsForTheRun); + ((InternalGatewaySenderFactory)gateway) + .setLocatorDiscoveryCallback(new MyLocatorCallback()); + if (filter != null) { + gateway.addGatewayEventFilter(filter); + } + if (isPersistent) { + gateway.setPersistenceEnabled(true); + gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName) + .getName()); + } else { + DiskStore store = dsf.setDiskDirs(dirs1).create(dsName); + gateway.setDiskStoreName(store.getName()); + } + gateway.setBatchConflationEnabled(isConflation); + gateway.create(dsName, remoteDsName); + + } else { + GatewaySenderFactory gateway = cache + .createGatewaySenderFactory(); + gateway.setMaximumQueueMemory(maxMemory); + gateway.setBatchSize(batchSize); + gateway.setManualStart(isManualStart); + //set dispatcher threads + gateway.setDispatcherThreads(numDispatcherThreadsForTheRun); + ((InternalGatewaySenderFactory)gateway) + .setLocatorDiscoveryCallback(new MyLocatorCallback()); + if (filter != null) { + gateway.addGatewayEventFilter(filter); + } + gateway.setBatchConflationEnabled(isConflation); + if (isPersistent) { + gateway.setPersistenceEnabled(true); + gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName) + .getName()); + } else { + DiskStore store = dsf.setDiskDirs(dirs1).create(dsName); + gateway.setDiskStoreName(store.getName()); + } + + eventListener1 = new MyGatewaySenderEventListener(); + ((InternalGatewaySenderFactory)gateway).addAsyncEventListener(eventListener1); + if (attachTwoListeners) { + eventListener2 = new MyGatewaySenderEventListener2(); + ((InternalGatewaySenderFactory)gateway).addAsyncEventListener(eventListener2); + } + ((InternalGatewaySenderFactory)gateway).create(dsName); + } + } + + public static void createReceiverInVMs(VM... vms) { + for (VM vm : vms) { + vm.invoke(() -> createReceiver()); + } + } + + public static int createReceiver() { + GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); + int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + fact.setStartPort(port); + fact.setEndPort(port); + fact.setManualStart(true); + GatewayReceiver receiver = fact.create(); + try { + receiver.start(); + } + catch (IOException e) { + e.printStackTrace(); + Assert.fail("Test " + getTestMethodName() + + " failed to start GatewayReceiver on port " + port, e); + } + return port; + } + + public static void createReceiverWithBindAddress(int locPort) { + WANTestBase test = new WANTestBase(); + Properties props = test.getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOG_LEVEL, LogWriterUtils.getDUnitLogLevel()); + props.setProperty(LOCATORS, "localhost[" + locPort + + "]"); + + InternalDistributedSystem ds = test.getSystem(props); + cache = CacheFactory.create(ds); + GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); + int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + fact.setStartPort(port); + fact.setEndPort(port); + fact.setManualStart(true); + fact.setBindAddress("200.112.204.10"); + GatewayReceiver receiver = fact.create(); + try { + receiver.start(); + fail("Expected GatewayReceiver Exception"); + } + catch (GatewayReceiverException gRE){ + LogWriterUtils.getLogWriter().fine("Got the GatewayReceiverException", gRE); + assertTrue(gRE.getMessage().contains("Failed to create server socket on")); + } + catch (IOException e) { + e.printStackTrace(); + fail("Test " + test.getName() + + " failed to start GatewayReceiver on port " + port); + } + } + public static int createReceiverWithSSL(int locPort) { + WANTestBase test = new WANTestBase(); + + Properties gemFireProps = test.getDistributedSystemProperties(); + + gemFireProps.put(LOG_LEVEL, LogWriterUtils.getDUnitLogLevel()); + gemFireProps.put(GATEWAY_SSL_ENABLED, "true"); + gemFireProps.put(GATEWAY_SSL_PROTOCOLS, "any"); + gemFireProps.put(GATEWAY_SSL_CIPHERS, "any"); + gemFireProps.put(GATEWAY_SSL_REQUIRE_AUTHENTICATION, "true"); + + gemFireProps.put(GATEWAY_SSL_KEYSTORE_TYPE, "jks"); + gemFireProps.put(GATEWAY_SSL_KEYSTORE, + TestUtil.getResourcePath(WANTestBase.class, "/com/gemstone/gemfire/cache/client/internal/cacheserver.keystore")); + gemFireProps.put(GATEWAY_SSL_KEYSTORE_PASSWORD, "password"); + gemFireProps.put(GATEWAY_SSL_TRUSTSTORE, + TestUtil.getResourcePath(WANTestBase.class, "/com/gemstone/gemfire/cache/client/internal/cacheserver.truststore")); + gemFireProps.put(GATEWAY_SSL_TRUSTSTORE_PASSWORD, "password"); + + gemFireProps.setProperty(MCAST_PORT, "0"); + gemFireProps.setProperty(LOCATORS, "localhost[" + locPort + "]"); + + LogWriterUtils.getLogWriter().info("Starting cache ds with following properties \n" + gemFireProps); + + InternalDistributedSystem ds = test.getSystem(gemFireProps); + cache = CacheFactory.create(ds); + GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); + int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + fact.setStartPort(port); + fact.setEndPort(port); + fact.setManualStart(true); + GatewayReceiver receiver = fact.create(); + try { + receiver.start(); + } + catch (IOException e) { + e.printStackTrace(); + fail("Test " + test.getName() + + " failed to start GatewayReceiver on port " + port); + } + return port; + } + + public static void createReceiverAndServer(int locPort) { + WANTestBase test = new WANTestBase(); + Properties props = test.getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, "localhost[" + locPort + + "]"); + + InternalDistributedSystem ds = test.getSystem(props); + cache = CacheFactory.create(ds); + GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); + int receiverPort = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + fact.setStartPort(receiverPort); + fact.setEndPort(receiverPort); + fact.setManualStart(true); + GatewayReceiver receiver = fact.create(); + try { + receiver.start(); + } + catch (IOException e) { + e.printStackTrace(); + fail("Test " + test.getName() + + " failed to start GatewayReceiver on port " + receiverPort); + } + CacheServer server = cache.addCacheServer(); + int serverPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); + server.setPort(serverPort); + server.setHostnameForClients("localhost"); + try { + server.start(); + } catch (IOException e) { + com.gemstone.gemfire.test.dunit.Assert.fail("Failed to start server ", e); + } + } + + public static int createServer(int locPort) { + WANTestBase test = new WANTestBase(); + Properties props = test.getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, "localhost[" + locPort + + "]"); + InternalDistributedSystem ds = test.getSystem(props); + cache = CacheFactory.create(ds); + + CacheServer server = cache.addCacheServer(); + int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); + server.setPort(port); + server.setHostnameForClients("localhost"); + try { + server.start(); + } catch (IOException e) { + com.gemstone.gemfire.test.dunit.Assert.fail("Failed to start server ", e); + } + return port; + } + + public static void createClientWithLocator(int port0,String host, + String regionName) { + WANTestBase test = new WANTestBase(); + Properties props = test.getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, ""); + + InternalDistributedSystem ds = test.getSystem(props); + cache = CacheFactory.create(ds); + + assertNotNull(cache); + CacheServerTestUtil.disableShufflingOfEndpoints(); + Pool p; + try { + p = PoolManager.createFactory().addLocator(host, port0) + .setPingInterval(250).setSubscriptionEnabled(true) + .setSubscriptionRedundancy(-1).setReadTimeout(2000) + .setSocketBufferSize(1000).setMinConnections(6).setMaxConnections(10) + .setRetryAttempts(3).create(regionName); + } finally { + CacheServerTestUtil.enableShufflingOfEndpoints(); + } + + AttributesFactory factory = new AttributesFactory(); + factory.setPoolName(p.getName()); + factory.setDataPolicy(DataPolicy.NORMAL); + RegionAttributes attrs = factory.create(); + region = cache.createRegion(regionName, attrs); + region.registerInterest("ALL_KEYS"); + assertNotNull(region); + LogWriterUtils.getLogWriter().info( +
<TRUNCATED>
