Repository: incubator-geode Updated Branches: refs/heads/feature/GEODE-37_2 3897a76ea -> 7c20e6931
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/management/internal/configuration/ClusterConfigurationDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/management/internal/configuration/ClusterConfigurationDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/management/internal/configuration/ClusterConfigurationDUnitTest.java new file mode 100644 index 0000000..b1e070d --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/management/internal/configuration/ClusterConfigurationDUnitTest.java @@ -0,0 +1,1013 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.management.internal.configuration; + +import com.gemstone.gemfire.cache.Cache; +import com.gemstone.gemfire.cache.CacheFactory; +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.cache.RegionShortcut; +import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue; +import com.gemstone.gemfire.cache.query.Index; +import com.gemstone.gemfire.cache.wan.GatewayReceiver; +import com.gemstone.gemfire.cache.wan.GatewaySender; +import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy; +import com.gemstone.gemfire.distributed.Locator; +import com.gemstone.gemfire.distributed.internal.InternalLocator; +import com.gemstone.gemfire.internal.ClassBuilder; +import com.gemstone.gemfire.internal.JarClassLoader; +import com.gemstone.gemfire.internal.JarDeployer; +import com.gemstone.gemfire.internal.admin.remote.ShutdownAllRequest; +import com.gemstone.gemfire.internal.cache.DiskStoreImpl; +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; +import com.gemstone.gemfire.internal.cache.extension.Extensible; +import com.gemstone.gemfire.internal.cache.extension.Extension; +import com.gemstone.gemfire.internal.cache.extension.mock.MockCacheExtension; +import com.gemstone.gemfire.internal.cache.extension.mock.MockExtensionCommands; +import com.gemstone.gemfire.internal.cache.extension.mock.MockRegionExtension; +import com.gemstone.gemfire.internal.cache.xmlcache.XmlGenerator; +import com.gemstone.gemfire.internal.cache.xmlcache.XmlParser; +import com.gemstone.gemfire.management.cli.Result.Status; +import com.gemstone.gemfire.management.internal.cli.HeadlessGfsh; +import com.gemstone.gemfire.management.internal.cli.commands.CliCommandTestBase; +import com.gemstone.gemfire.management.internal.cli.i18n.CliStrings; +import com.gemstone.gemfire.management.internal.cli.result.CommandResult; +import com.gemstone.gemfire.management.internal.cli.util.CommandStringBuilder; +import com.gemstone.gemfire.management.internal.configuration.domain.XmlEntity; +import com.gemstone.gemfire.test.dunit.SerializableCallable; +import com.gemstone.gemfire.test.dunit.VM; +import com.gemstone.gemfire.test.dunit.WaitCriterion; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.*; +import java.util.Map.Entry; + +import static com.gemstone.gemfire.distributed.ConfigurationProperties.*; +import static com.gemstone.gemfire.internal.AvailablePortHelper.getRandomAvailableTCPPorts; +import static com.gemstone.gemfire.internal.FileUtil.delete; +import static com.gemstone.gemfire.internal.FileUtil.deleteMatching; +import static com.gemstone.gemfire.internal.lang.StringUtils.isBlank; +import static com.gemstone.gemfire.management.internal.cli.CliUtil.getAllNormalMembers; +import static com.gemstone.gemfire.test.dunit.Assert.*; +import static com.gemstone.gemfire.test.dunit.Host.getHost; +import static com.gemstone.gemfire.test.dunit.IgnoredException.addIgnoredException; +import static com.gemstone.gemfire.test.dunit.Wait.waitForCriterion; +import static org.apache.commons.io.FileUtils.*; + +@Category(DistributedTest.class) +public class ClusterConfigurationDUnitTest extends CliCommandTestBase { + + private static final int TIMEOUT = 10000; + private static final int INTERVAL = 500; + + private static final String REPLICATE_REGION = "ReplicateRegion1"; + private static final String PARTITION_REGION = "PartitionRegion1"; + private static final String DISK_REGION1 = "DR1"; + private static final String INDEX1 = "ID1"; + private static final String INDEX2 = "ID2"; + private static final String GROUP1 = "G1"; + private static final String GROUP2 = "G2"; + private static final String JAR1 = "D1.jar"; + private static final String JAR2 = "D2.jar"; + private static final String JAR3 = "D3.jar"; + private static final String AsyncEventQueue1 = "Q1"; + + private static final String dataMember = "DataMember"; + private static final String newMember = "NewMember"; + + private static Set<String> serverNames = new HashSet<>(); + private static Set<String> jarFileNames = new HashSet<>(); + + private transient ClassBuilder classBuilder = new ClassBuilder(); + + @Override + public final void postSetUpCliCommandTestBase() throws Exception { + disconnectAllFromDS(); + } + + @Override + public final void preTearDownCliCommandTestBase() throws Exception { + shutdownAll(); + + serverNames.clear(); + jarFileNames.clear(); + } + + @Test + public void testConfigDistribution() throws Exception { + addIgnoredException("could not get remote locator"); + addIgnoredException("EntryDestroyedException"); + + String workingDir = this.temporaryFolder.getRoot().getCanonicalPath() + File.separator; + + Object[] result = setup(); + final int locatorPort = (Integer) result[0]; + final String jmxHost = (String) result[1]; + final int jmxPort = (Integer) result[2]; + final int httpPort = (Integer) result[3]; + final String locatorString = "localHost[" + locatorPort + "]"; + + String gatewayReceiverStartPort = "10000"; + String gatewayReceiverEndPort = "20000"; + final String gsId = "GatewaySender1"; + final String batchSize = "1000"; + final String dispatcherThreads = "5"; + final String enableConflation = "false"; + final String manualStart = "false"; + final String receiverManualStart = "true"; + final String alertThreshold = "1000"; + final String batchTimeInterval = "20"; + final String maxQueueMemory = "100"; + final String orderPolicy = OrderPolicy.KEY.toString(); + final String parallel = "true"; + final String rmDsId = "250"; + final String socketBufferSize = String.valueOf(GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT + 1000); + final String socketReadTimeout = String.valueOf(GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT+200); + final String DESTROY_REGION = "regionToBeDestroyed"; + + createRegion(REPLICATE_REGION, RegionShortcut.REPLICATE, null); + createRegion(PARTITION_REGION, RegionShortcut.PARTITION, null); + createRegion(DESTROY_REGION, RegionShortcut.REPLICATE, null); + createIndex(INDEX1, "AAPL", REPLICATE_REGION, null); + createIndex(INDEX2, "VMW", PARTITION_REGION, null); + + createAndDeployJar(workingDir + JAR1, null); + createAndDeployJar(workingDir + JAR2, null); + createAndDeployJar(workingDir + JAR3, null); + + createAsyncEventQueue(AsyncEventQueue1, "false", null, "1000", "1000", null); + destroyRegion(DESTROY_REGION); + destroyIndex(INDEX2, PARTITION_REGION, null); + + undeployJar(JAR3, null); + + alterRuntime("true", "", "", ""); + createGatewayReceiver(receiverManualStart, "", gatewayReceiverStartPort, gatewayReceiverEndPort, "20", ""); + createGatewaySender(gsId, batchSize, alertThreshold, batchTimeInterval, dispatcherThreads, enableConflation, manualStart, maxQueueMemory, orderPolicy, parallel, rmDsId, socketBufferSize, socketReadTimeout); + + //alterRegion(PARTITION_REGION, "false", AsyncEventQueue1, "", "", "", "", "", "", gsId); + //Start a new member which receives the shared configuration + //Verify the config creation on this member + + final String newMemberWorkingDir = this.temporaryFolder.getRoot().getCanonicalPath() + File.separator + newMember; + + VM newMember = getHost(0).getVM(2); + newMember.invoke(new SerializableCallable() { + @Override + public Object call() throws IOException { + Properties localProps = new Properties(); + + File workingDir = new File(newMemberWorkingDir); + workingDir.mkdirs(); + + localProps.setProperty(MCAST_PORT, "0"); + localProps.setProperty(LOCATORS, "localhost[" + locatorPort+"]"); + localProps.setProperty(NAME, ClusterConfigurationDUnitTest.newMember); + localProps.setProperty(USE_CLUSTER_CONFIGURATION, "true"); + localProps.setProperty(DEPLOY_WORKING_DIR, workingDir.getCanonicalPath()); + + getSystem(localProps); + Cache cache = getCache(); + + assertNotNull(cache); + assertTrue(cache.getCopyOnRead()); + + Region region1 = cache.getRegion(REPLICATE_REGION); + assertNotNull(region1); + Region region2 = cache.getRegion(PARTITION_REGION); + assertNotNull(region2); + + Region region3 = cache.getRegion(DESTROY_REGION); + assertNull(region3); + + //Index verification + Index index1 = cache.getQueryService().getIndex(region1, INDEX1); + assertNotNull(index1); + assertNull(cache.getQueryService().getIndex(region2, INDEX2)); + + final JarDeployer jarDeployer = new JarDeployer(((GemFireCacheImpl) cache).getDistributedSystem().getConfig().getDeployWorkingDir()); + + final List<JarClassLoader> jarClassLoaders = jarDeployer.findJarClassLoaders(); + + Set<String> jarNames = new HashSet<String>(); + + for (JarClassLoader jarClassLoader : jarClassLoaders) { + jarNames.add(jarClassLoader.getJarName()); + } + + assertTrue(jarNames.contains(JAR1)); + assertTrue(jarNames.contains(JAR2)); + assertFalse(jarNames.contains(JAR3)); + + //ASYNC-EVENT-QUEUE verification + AsyncEventQueue aeq = cache.getAsyncEventQueue(AsyncEventQueue1); + assertNotNull(aeq); + assertFalse(aeq.isPersistent()); + assertTrue(aeq.getBatchSize() == 1000); + assertTrue(aeq.getMaximumQueueMemory() == 1000); + + //GatewayReceiver verification + Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers(); + assertFalse(gatewayReceivers.isEmpty()); + assertTrue(gatewayReceivers.size() == 1); + + //Gateway Sender verification + GatewaySender gs = cache.getGatewaySender(gsId); + assertNotNull(gs); + assertTrue(alertThreshold.equals(Integer.toString(gs.getAlertThreshold()))); + assertTrue(batchSize.equals(Integer.toString(gs.getBatchSize()))); + assertTrue(dispatcherThreads.equals(Integer.toString(gs.getDispatcherThreads()))); + assertTrue(enableConflation.equals(Boolean.toString(gs.isBatchConflationEnabled()))); + assertTrue(manualStart.equals(Boolean.toString(gs.isManualStart()))); + assertTrue(alertThreshold.equals(Integer.toString(gs.getAlertThreshold()))); + assertTrue(batchTimeInterval.equals(Integer.toString(gs.getBatchTimeInterval()))); + assertTrue(maxQueueMemory.equals(Integer.toString(gs.getMaximumQueueMemory()))); + assertTrue(orderPolicy.equals(gs.getOrderPolicy().toString())); + assertTrue(parallel.equals(Boolean.toString(gs.isParallel()))); + assertTrue(rmDsId.equals(Integer.toString(gs.getRemoteDSId()))); + assertTrue(socketBufferSize.equals(Integer.toString(gs.getSocketBufferSize()))); + assertTrue(socketReadTimeout.equals(Integer.toString(gs.getSocketReadTimeout()))); + + return getAllNormalMembers(cache); + } + }); + } + + /** + * Tests for {@link Extension}, {@link Extensible}, {@link XmlParser}, + * {@link XmlGenerator}, {@link XmlEntity} as it applies to Extensions. + * Asserts that Mock Extension is created and altered on region and cache. + * + * @since GemFire 8.1 + */ + @Test + public void testCreateExtensions() throws Exception { + Object[] result = setup(); + final int locatorPort = (Integer) result[0]; + + createRegion(REPLICATE_REGION, RegionShortcut.REPLICATE, null); + createMockRegionExtension(REPLICATE_REGION, "value1"); + alterMockRegionExtension(REPLICATE_REGION, "value2"); + createMockCacheExtension("value1"); + alterMockCacheExtension("value2"); + + //Start a new member which receives the shared configuration + //Verify the config creation on this member + + final String newMemberWorkDir = this.temporaryFolder.getRoot().getCanonicalPath() + File.separator + newMember; + + VM newMember = getHost(0).getVM(2); + newMember.invoke(new SerializableCallable() { + private static final long serialVersionUID = 1L; + + @Override + public Object call() throws IOException { + Properties localProps = new Properties(); + + File workingDir = new File(newMemberWorkDir); + workingDir.mkdirs(); + + localProps.setProperty(MCAST_PORT, "0"); + localProps.setProperty(LOCATORS, "localhost[" + locatorPort+"]"); + localProps.setProperty(NAME, ClusterConfigurationDUnitTest.newMember); + localProps.setProperty(USE_CLUSTER_CONFIGURATION, "true"); + localProps.setProperty(DEPLOY_WORKING_DIR, workingDir.getCanonicalPath()); + + getSystem(localProps); + Cache cache = getCache(); + + assertNotNull(cache); + + Region<?, ?> region1 = cache.getRegion(REPLICATE_REGION); + assertNotNull(region1); + + //MockRegionExtension verification + @SuppressWarnings("unchecked") + // should only be one region extension + final MockRegionExtension mockRegionExtension = (MockRegionExtension) ((Extensible<Region<?,?>>) region1).getExtensionPoint().getExtensions().iterator().next(); + assertNotNull(mockRegionExtension); + assertEquals(1, mockRegionExtension.beforeCreateCounter.get()); + assertEquals(1, mockRegionExtension.onCreateCounter.get()); + assertEquals("value2", mockRegionExtension.getValue()); + + //MockCacheExtension verification + @SuppressWarnings("unchecked") + // should only be one cache extension + final MockCacheExtension mockCacheExtension = (MockCacheExtension) ((Extensible<Cache>) cache).getExtensionPoint().getExtensions().iterator().next(); + assertNotNull(mockCacheExtension); + assertEquals(1, mockCacheExtension.beforeCreateCounter.get()); + assertEquals(1, mockCacheExtension.onCreateCounter.get()); + assertEquals("value2", mockCacheExtension.getValue()); + + return getAllNormalMembers(cache); + } + }); + } + + /** + * Tests for {@link Extension}, {@link Extensible}, {@link XmlParser}, + * {@link XmlGenerator}, {@link XmlEntity} as it applies to Extensions. + * Asserts that Mock Extension is created and destroyed on region and cache. + * + * @since GemFire 8.1 + */ + @Test + public void testDestroyExtensions() throws Exception { + Object[] result = setup(); + final int locatorPort = (Integer) result[0]; + + createRegion(REPLICATE_REGION, RegionShortcut.REPLICATE, null); + createMockRegionExtension(REPLICATE_REGION, "value1"); + destroyMockRegionExtension(REPLICATE_REGION); + createMockCacheExtension("value1"); + destroyMockCacheExtension(); + + //Start a new member which receives the shared configuration + //Verify the config creation on this member + + final String newMemberWorkingDir = this.temporaryFolder.getRoot().getCanonicalPath() + File.separator + newMember; + + VM newMember = getHost(0).getVM(2); + newMember.invoke(new SerializableCallable() { + + @Override + public Object call() throws IOException { + Properties localProps = new Properties(); + + File workingDir = new File(newMemberWorkingDir); + workingDir.mkdirs(); + + localProps.setProperty(MCAST_PORT, "0"); + localProps.setProperty(LOCATORS, "localhost[" + locatorPort+"]"); + localProps.setProperty(NAME, ClusterConfigurationDUnitTest.newMember); + localProps.setProperty(USE_CLUSTER_CONFIGURATION, "true"); + localProps.setProperty(DEPLOY_WORKING_DIR, workingDir.getCanonicalPath()); + + getSystem(localProps); + Cache cache = getCache(); + + assertNotNull(cache); + + Region<?, ?> region1 = cache.getRegion(REPLICATE_REGION); + assertNotNull(region1); + + //MockRegionExtension verification + @SuppressWarnings("unchecked") + final Extensible<Region<?, ?>> extensibleRegion = (Extensible<Region<?,?>>) region1; + // Should not be any region extensions + assertTrue(!extensibleRegion.getExtensionPoint().getExtensions().iterator().hasNext()); + + //MockCacheExtension verification + @SuppressWarnings("unchecked") + final Extensible<Cache> extensibleCache = (Extensible<Cache>) cache; + // Should not be any cache extensions + assertTrue(!extensibleCache.getExtensionPoint().getExtensions().iterator().hasNext()); + + return getAllNormalMembers(cache); + } + }); + } + + @Ignore("disabled for unknown reason") // this passes when @Ignore is removed + @Test + public void testCreateDiskStore () throws Exception { + Object[] result = setup(); + final int locatorPort = (Integer) result[0]; + final String jmxHost = (String) result[1]; + final int jmxPort = (Integer) result[2]; + final int httpPort = (Integer) result[3]; + final String locatorString = "localHost[" + locatorPort + "]"; + + final String diskStoreName = "clusterConfigTestDiskStore"; + final String diskDirs = "dir1"; + + //final String + //createPersistentRegion(peersRegion, RegionShortcut.PARTITION_PERSISTENT, "", diskStoreName); + + final String autoCompact = "true"; + final String allowForceCompaction = "true"; + final String compactionThreshold = "50"; + final String duCritical = "90"; + final String duWarning = "85"; + final String maxOplogSize = "1000"; + final String queueSize = "300"; + final String timeInterval = "10"; + final String writeBufferSize="100"; + + createDiskStore(diskStoreName, diskDirs, autoCompact, allowForceCompaction, compactionThreshold, duCritical, duWarning, maxOplogSize, queueSize, timeInterval, writeBufferSize); + + //createAsyncEventQueue(id, persistent, diskStoreName, batchSize, maxQueueMemory, group) + + //Stop the existing data member + VM dataMember = getHost(0).getVM(1); + dataMember.invoke(new SerializableCallable() { + @Override + public Object call() throws IOException { + + CacheFactory cf = new CacheFactory(); + GemFireCacheImpl cache = (GemFireCacheImpl)getCache(); + File[] diskDirs = null; + Collection<DiskStoreImpl> diskStoreList = cache.listDiskStores(); + + assertFalse(diskStoreList.isEmpty()); + assertTrue(diskStoreList.size() == 1); + + for (DiskStoreImpl diskStore : diskStoreList) { + diskDirs = diskStore.getDiskDirs(); + break; + } + + assertNotNull(diskDirs); + assertTrue(diskDirs.length > 0); + + //close the cache + cache.close(); + + //Delete the disk-store files + for (File diskDir : diskDirs) { + deleteDirectory(diskDir); + } + return getAllNormalMembers(cache); + } + }); + + final String newMemberWorkingDir = this.temporaryFolder.getRoot().getCanonicalPath() + File.separator + newMember; + + //Now start the new data member and it should create all the disk-store artifacts + VM newMember = getHost(0).getVM(2); + newMember.invoke(new SerializableCallable() { + @Override + public Object call() throws IOException { + Properties localProps = new Properties(); + + File workingDir = new File(newMemberWorkingDir); + workingDir.mkdirs(); + + localProps.setProperty(MCAST_PORT, "0"); + localProps.setProperty(LOCATORS, "localhost[" + locatorPort+"]"); + localProps.setProperty(NAME, ClusterConfigurationDUnitTest.newMember); + localProps.setProperty(USE_CLUSTER_CONFIGURATION, "true"); + localProps.setProperty(DEPLOY_WORKING_DIR, workingDir.getCanonicalPath()); + + getSystem(localProps); + GemFireCacheImpl cache = (GemFireCacheImpl)getCache(); + assertNotNull(cache); + + + Collection<DiskStoreImpl> diskStoreList = cache.listDiskStores(); + + assertFalse(diskStoreList.isEmpty()); + assertTrue(diskStoreList.size() == 1); + + for (DiskStoreImpl diskStore : diskStoreList) { + assertTrue(diskStore.getName().equals(diskStoreName)); + assertTrue(Boolean.toString(diskStore.getAutoCompact()).equals(autoCompact)); + assertTrue(Boolean.toString(diskStore.getAllowForceCompaction()).equals(allowForceCompaction)); + assertTrue(Integer.toString(diskStore.getCompactionThreshold()).equals(compactionThreshold)); + assertTrue(Long.toString(diskStore.getMaxOplogSize()).equals(maxOplogSize)); + assertTrue(Integer.toString(diskStore.getQueueSize()).equals(queueSize)); + assertTrue(Integer.toString(diskStore.getWriteBufferSize()).equals(writeBufferSize)); + assertTrue(Long.toString(diskStore.getTimeInterval()).equals(timeInterval)); + break; + } + cache.close(); + return null; + } + }); + } + + @Ignore("disabled for unknown reason") // this fails in configurePDX when @Ignore is removed + @Test + public void testConfigurePDX() throws Exception { + Object[] result = setup(); + final int locatorPort = (Integer) result[0]; + final String jmxHost = (String) result[1]; + final int jmxPort = (Integer) result[2]; + final int httpPort = (Integer) result[3]; + final String locatorString = "localHost[" + locatorPort + "]"; + + configurePDX("com.foo.*", "true", "true", null, "true"); + + VM dataMember = getHost(0).getVM(1); + dataMember.invoke(new SerializableCallable() { + @Override + public Object call() throws IOException { + GemFireCacheImpl cache = (GemFireCacheImpl)getCache(); + assertTrue(cache.getPdxReadSerialized()); + assertTrue(cache.getPdxIgnoreUnreadFields()); + assertTrue(cache.getPdxPersistent()); + return null; + } + }); + } + + @Test + public void testClusterConfigDir() throws Exception { + final int [] ports = getRandomAvailableTCPPorts(3); + final int locator1Port = ports[0]; + final String locator1Name = "locator1-" + locator1Port; + + final String locatorLogPath = this.temporaryFolder.getRoot().getCanonicalPath() + File.separator + "locator-" + locator1Port + ".log"; + final String clusterConfigPath = this.temporaryFolder.getRoot().getCanonicalPath() + File.separator + "userSpecifiedDir"; + + VM locatorAndMgr = getHost(0).getVM(3); + Object[] result = (Object[]) locatorAndMgr.invoke(new SerializableCallable() { + @Override + public Object call() throws IOException { + int httpPort; + int jmxPort; + String jmxHost; + + try { + jmxHost = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException ignore) { + jmxHost = "localhost"; + } + + final int[] ports = getRandomAvailableTCPPorts(2); + + jmxPort = ports[0]; + httpPort = ports[1]; + + final File locatorLogFile = new File(locatorLogPath); + + final Properties locatorProps = new Properties(); + locatorProps.setProperty(NAME, locator1Name); + locatorProps.setProperty(MCAST_PORT, "0"); + locatorProps.setProperty(LOG_LEVEL, "config"); + locatorProps.setProperty(ENABLE_CLUSTER_CONFIGURATION, "true"); + locatorProps.setProperty(JMX_MANAGER, "true"); + locatorProps.setProperty(JMX_MANAGER_START, "true"); + locatorProps.setProperty(JMX_MANAGER_BIND_ADDRESS, String.valueOf(jmxHost)); + locatorProps.setProperty(JMX_MANAGER_PORT, String.valueOf(jmxPort)); + + File clusterConfigDir = new File(clusterConfigPath); + assertTrue(clusterConfigDir.mkdir()); + + locatorProps.setProperty(CLUSTER_CONFIGURATION_DIR, clusterConfigDir.getCanonicalPath()); + locatorProps.setProperty(HTTP_SERVICE_PORT, String.valueOf(httpPort)); + + final InternalLocator locator = (InternalLocator) Locator.startLocatorAndDS(locator1Port, locatorLogFile, null, locatorProps); + + WaitCriterion wc = new WaitCriterion() { + @Override + public boolean done() { + return locator.isSharedConfigurationRunning(); + } + @Override + public String description() { + return "Waiting for shared configuration to be started"; + } + }; + waitForCriterion(wc, TIMEOUT, INTERVAL, true); + + assertTrue(clusterConfigDir.list().length > 0); + + final Object[] result = new Object[4]; + result[0] = locator1Port; + result[1] = jmxHost; + result[2] = jmxPort; + result[3] = httpPort; + return result; + } + }); + } + + private Object[] setup() throws IOException { + final int [] ports = getRandomAvailableTCPPorts(3); + final int locator1Port = ports[0]; + final String locator1Name = "locator1-" + locator1Port; + final String locatorLogPath = this.temporaryFolder.getRoot().getCanonicalPath() + File.separator + "locator-" + locator1Port + ".log"; + + VM locatorAndMgr = getHost(0).getVM(3); + Object[] result = (Object[]) locatorAndMgr.invoke(new SerializableCallable() { + @Override + public Object call() throws IOException { + int httpPort; + int jmxPort; + String jmxHost; + + try { + jmxHost = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException ignore) { + jmxHost = "localhost"; + } + + final int[] ports = getRandomAvailableTCPPorts(2); + + jmxPort = ports[0]; + httpPort = ports[1]; + + final File locatorLogFile = new File(locatorLogPath); + + final Properties locatorProps = new Properties(); + locatorProps.setProperty(NAME, locator1Name); + locatorProps.setProperty(MCAST_PORT, "0"); + locatorProps.setProperty(LOG_LEVEL, "config"); + locatorProps.setProperty(ENABLE_CLUSTER_CONFIGURATION, "true"); + locatorProps.setProperty(JMX_MANAGER, "true"); + locatorProps.setProperty(JMX_MANAGER_START, "true"); + locatorProps.setProperty(JMX_MANAGER_BIND_ADDRESS, String.valueOf(jmxHost)); + locatorProps.setProperty(JMX_MANAGER_PORT, String.valueOf(jmxPort)); + locatorProps.setProperty(HTTP_SERVICE_PORT, String.valueOf(httpPort)); + + final InternalLocator locator = (InternalLocator) Locator.startLocatorAndDS(locator1Port, locatorLogFile, null, locatorProps); + + WaitCriterion wc = new WaitCriterion() { + @Override + public boolean done() { + return locator.isSharedConfigurationRunning(); + } + @Override + public String description() { + return "Waiting for shared configuration to be started"; + } + }; + waitForCriterion(wc, TIMEOUT, INTERVAL, true); + + final Object[] result = new Object[4]; + result[0] = locator1Port; + result[1] = jmxHost; + result[2] = jmxPort; + result[3] = httpPort; + return result; + } + }); + + HeadlessGfsh gfsh = getDefaultShell(); + String jmxHost = (String)result[1]; + int jmxPort = (Integer)result[2]; + int httpPort = (Integer)result[3]; + + connect(jmxHost, jmxPort, httpPort, gfsh); + + final String dataMemberWorkingDir = this.temporaryFolder.getRoot().getCanonicalPath() + File.separator + dataMember; + + // Create a cache in VM 1 + VM dataMember = getHost(0).getVM(1); + dataMember.invoke(new SerializableCallable() { + @Override + public Object call() throws IOException { + Properties localProps = new Properties(); + File workingDir = new File(dataMemberWorkingDir); + workingDir.mkdirs(); + + localProps.setProperty(MCAST_PORT, "0"); + localProps.setProperty(LOCATORS, "localhost[" + locator1Port+"]"); + localProps.setProperty(NAME, ClusterConfigurationDUnitTest.dataMember); + localProps.setProperty(USE_CLUSTER_CONFIGURATION, "true"); + localProps.setProperty(DEPLOY_WORKING_DIR, workingDir.getCanonicalPath()); + + getSystem(localProps); + Cache cache = getCache(); + assertNotNull(cache); + return getAllNormalMembers(cache); + } + }); + + return result; + } + + private void createRegion(String regionName, RegionShortcut regionShortCut, String group) { + CommandStringBuilder csb = new CommandStringBuilder(CliStrings.CREATE_REGION); + csb.addOption(CliStrings.CREATE_REGION__REGION, regionName); + csb.addOption(CliStrings.CREATE_REGION__REGIONSHORTCUT, regionShortCut.name()); + csb.addOptionWithValueCheck(CliStrings.CREATE_REGION__GROUP, group); + executeAndVerifyCommand(csb.toString()); + } + + private void createMockRegionExtension(final String regionName, final String value) { + CommandStringBuilder csb = new CommandStringBuilder(MockExtensionCommands.CREATE_MOCK_REGION_EXTENSION); + csb.addOption(MockExtensionCommands.OPTION_REGION_NAME, regionName); + csb.addOption(MockExtensionCommands.OPTION_VALUE, value); + executeAndVerifyCommand(csb.toString()); + } + + private void alterMockRegionExtension(final String regionName, final String value) { + CommandStringBuilder csb = new CommandStringBuilder(MockExtensionCommands.ALTER_MOCK_REGION_EXTENSION); + csb.addOption(MockExtensionCommands.OPTION_REGION_NAME, regionName); + csb.addOption(MockExtensionCommands.OPTION_VALUE, value); + executeAndVerifyCommand(csb.toString()); + } + + private void destroyMockRegionExtension(final String regionName) { + CommandStringBuilder csb = new CommandStringBuilder(MockExtensionCommands.DESTROY_MOCK_REGION_EXTENSION); + csb.addOption(MockExtensionCommands.OPTION_REGION_NAME, regionName); + executeAndVerifyCommand(csb.toString()); + } + + private void createMockCacheExtension(final String value) { + CommandStringBuilder csb = new CommandStringBuilder(MockExtensionCommands.CREATE_MOCK_CACHE_EXTENSION); + csb.addOption(MockExtensionCommands.OPTION_VALUE, value); + executeAndVerifyCommand(csb.toString()); + } + + private void alterMockCacheExtension(final String value) { + CommandStringBuilder csb = new CommandStringBuilder(MockExtensionCommands.ALTER_MOCK_CACHE_EXTENSION); + csb.addOption(MockExtensionCommands.OPTION_VALUE, value); + executeAndVerifyCommand(csb.toString()); + } + + private void destroyMockCacheExtension() { + CommandStringBuilder csb = new CommandStringBuilder(MockExtensionCommands.DESTROY_MOCK_CACHE_EXTENSION); + executeAndVerifyCommand(csb.toString()); + } + + private void createPersistentRegion(String regionName, RegionShortcut regionShortCut, String group, String diskStoreName) { + CommandStringBuilder csb = new CommandStringBuilder(CliStrings.CREATE_REGION); + csb.addOptionWithValueCheck(CliStrings.CREATE_REGION__REGION, regionName); + csb.addOptionWithValueCheck(CliStrings.CREATE_REGION__REGIONSHORTCUT, regionShortCut.name()); + csb.addOptionWithValueCheck(CliStrings.CREATE_REGION__DISKSTORE, diskStoreName); + csb.addOptionWithValueCheck(CliStrings.CREATE_REGION__GROUP, group); + executeAndVerifyCommand(csb.getCommandString()); + } + + private void destroyRegion(String regionName) { + CommandStringBuilder csb = new CommandStringBuilder(CliStrings.DESTROY_REGION); + csb.addOption(CliStrings.DESTROY_REGION__REGION, regionName); + executeAndVerifyCommand(csb.getCommandString()); + } + + private void alterRegion(String regionName, + String cloningEnabled, + String aeqId, + String cacheListener, + String cacheWriter, + String cacheLoader, + String entryExpIdleTime, + String entryExpIdleTimeAction, + String evictionMax, + String gsId) { + + CommandStringBuilder csb = new CommandStringBuilder(CliStrings.ALTER_REGION); + csb.addOptionWithValueCheck(CliStrings.ALTER_REGION__CLONINGENABLED, "false"); + csb.addOptionWithValueCheck(CliStrings.ALTER_REGION__ASYNCEVENTQUEUEID, aeqId); + csb.addOptionWithValueCheck(CliStrings.ALTER_REGION__CACHELISTENER, cacheListener); + csb.addOptionWithValueCheck(CliStrings.ALTER_REGION__CACHEWRITER, cacheWriter); + csb.addOptionWithValueCheck(CliStrings.ALTER_REGION__CACHELOADER, cacheLoader); + csb.addOptionWithValueCheck(CliStrings.ALTER_REGION__CLONINGENABLED, cloningEnabled); + csb.addOptionWithValueCheck(CliStrings.ALTER_REGION__ENTRYEXPIRATIONIDLETIME, entryExpIdleTime); + csb.addOptionWithValueCheck(CliStrings.ALTER_REGION__ENTRYEXPIRATIONIDLETIMEACTION, entryExpIdleTimeAction); + csb.addOptionWithValueCheck(CliStrings.ALTER_REGION__EVICTIONMAX, evictionMax); + csb.addOptionWithValueCheck(CliStrings.ALTER_REGION__GATEWAYSENDERID, gsId); + + executeAndVerifyCommand(csb.getCommandString()); + } + + private void executeAndVerifyCommand(String commandString) { + CommandResult cmdResult = executeCommand(commandString); + com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().info("Command : " + commandString); + com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().info("Command Result : " + commandResultToString(cmdResult)); + assertEquals(Status.OK, cmdResult.getStatus()); + assertFalse(cmdResult.failedToPersist()); + } + + private void createIndex(String indexName, String expression, String regionName, String group) { + CommandStringBuilder csb = new CommandStringBuilder(CliStrings.CREATE_INDEX); + csb.addOption(CliStrings.CREATE_INDEX__NAME, indexName); + csb.addOption(CliStrings.CREATE_INDEX__EXPRESSION, expression); + csb.addOption(CliStrings.CREATE_INDEX__REGION, regionName); + executeAndVerifyCommand(csb.getCommandString()); + } + + private void destroyIndex(String indexName, String regionName, String group) { + if (isBlank(indexName) && isBlank(regionName) && isBlank(group)) { + return; + } + CommandStringBuilder csb = new CommandStringBuilder(CliStrings.DESTROY_INDEX); + csb.addOptionWithValueCheck(CliStrings.DESTROY_INDEX__NAME, indexName); + csb.addOptionWithValueCheck(CliStrings.DESTROY_INDEX__REGION, regionName); + csb.addOptionWithValueCheck(CliStrings.DESTROY_INDEX__GROUP, group); + executeAndVerifyCommand(csb.getCommandString()); + } + + private void createDiskStore(String diskStoreName, + String diskDirs, + String autoCompact, + String allowForceCompaction, + String compactionThreshold, + String duCritical, + String duWarning, + String maxOplogSize, + String queueSize, + String timeInterval, + String writeBufferSize) { + + CommandStringBuilder csb = new CommandStringBuilder(CliStrings.CREATE_DISK_STORE); + csb.addOption(CliStrings.CREATE_DISK_STORE__NAME, diskStoreName); + csb.addOption(CliStrings.CREATE_DISK_STORE__DIRECTORY_AND_SIZE, diskDirs); + csb.addOptionWithValueCheck(CliStrings.CREATE_DISK_STORE__AUTO_COMPACT, autoCompact); + csb.addOptionWithValueCheck(CliStrings.CREATE_DISK_STORE__ALLOW_FORCE_COMPACTION, allowForceCompaction); + csb.addOptionWithValueCheck(CliStrings.CREATE_DISK_STORE__COMPACTION_THRESHOLD, compactionThreshold); + csb.addOptionWithValueCheck(CliStrings.CREATE_DISK_STORE__DISK_USAGE_CRITICAL_PCT, duCritical); + csb.addOptionWithValueCheck(CliStrings.CREATE_DISK_STORE__DISK_USAGE_WARNING_PCT, duWarning); + csb.addOptionWithValueCheck(CliStrings.CREATE_DISK_STORE__MAX_OPLOG_SIZE, maxOplogSize); + csb.addOptionWithValueCheck(CliStrings.CREATE_DISK_STORE__QUEUE_SIZE, queueSize); + csb.addOptionWithValueCheck(CliStrings.CREATE_DISK_STORE__TIME_INTERVAL, timeInterval); + csb.addOptionWithValueCheck(CliStrings.CREATE_DISK_STORE__WRITE_BUFFER_SIZE, writeBufferSize); + executeAndVerifyCommand(csb.getCommandString()); + } + + private void destroyDiskStore(String diskStoreName, String group) { + CommandStringBuilder csb = new CommandStringBuilder(CliStrings.DESTROY_DISK_STORE); + csb.addOption(CliStrings.DESTROY_DISK_STORE__NAME, diskStoreName); + csb.addOptionWithValueCheck(CliStrings.DESTROY_DISK_STORE__GROUP, group); + executeAndVerifyCommand(csb.toString()); + } + + private void createGatewayReceiver(String manualStart, String bindAddress, String startPort, String endPort, String maxTimeBetweenPings, String group) { + CommandStringBuilder csb = new CommandStringBuilder(CliStrings.CREATE_GATEWAYRECEIVER); + csb.addOptionWithValueCheck(CliStrings.CREATE_GATEWAYRECEIVER__MANUALSTART, manualStart); + csb.addOption(CliStrings.CREATE_GATEWAYRECEIVER__STARTPORT, startPort); + csb.addOption(CliStrings.CREATE_GATEWAYRECEIVER__ENDPORT, endPort); + csb.addOptionWithValueCheck(CliStrings.CREATE_GATEWAYRECEIVER__BINDADDRESS, bindAddress); + csb.addOptionWithValueCheck(CliStrings.CREATE_GATEWAYRECEIVER__GROUP, group); + csb.addOptionWithValueCheck(CliStrings.CREATE_GATEWAYRECEIVER__MAXTIMEBETWEENPINGS, maxTimeBetweenPings); + executeAndVerifyCommand(csb.getCommandString()); + } + + private void createGatewaySender(String id, + String batchSize, + String alertThreshold, + String batchTimeInterval, + String dispatcherThreads, + String enableConflation, + String manualStart, + String maxQueueMemory, + String orderPolicy, + String parallel, + String rmDsId, + String socketBufferSize, + String socketReadTimeout) { + + CommandStringBuilder csb = new CommandStringBuilder(CliStrings.CREATE_GATEWAYSENDER); + csb.addOptionWithValueCheck(CliStrings.CREATE_GATEWAYSENDER__ID, id); + csb.addOptionWithValueCheck(CliStrings.CREATE_GATEWAYSENDER__BATCHSIZE, batchSize); + csb.addOptionWithValueCheck(CliStrings.CREATE_GATEWAYSENDER__ALERTTHRESHOLD, alertThreshold); + csb.addOptionWithValueCheck(CliStrings.CREATE_GATEWAYSENDER__BATCHTIMEINTERVAL, batchTimeInterval); + csb.addOptionWithValueCheck(CliStrings.CREATE_GATEWAYSENDER__DISPATCHERTHREADS, dispatcherThreads); + csb.addOptionWithValueCheck(CliStrings.CREATE_GATEWAYSENDER__ENABLEBATCHCONFLATION, enableConflation); + csb.addOptionWithValueCheck(CliStrings.CREATE_GATEWAYSENDER__MANUALSTART, manualStart); + csb.addOptionWithValueCheck(CliStrings.CREATE_GATEWAYSENDER__MAXQUEUEMEMORY, maxQueueMemory); + csb.addOptionWithValueCheck(CliStrings.CREATE_GATEWAYSENDER__ORDERPOLICY, orderPolicy); + csb.addOptionWithValueCheck(CliStrings.CREATE_GATEWAYSENDER__PARALLEL, parallel); + csb.addOptionWithValueCheck(CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID, rmDsId); + csb.addOptionWithValueCheck(CliStrings.CREATE_GATEWAYSENDER__SOCKETBUFFERSIZE, socketBufferSize); + csb.addOptionWithValueCheck(CliStrings.CREATE_GATEWAYSENDER__SOCKETREADTIMEOUT, socketReadTimeout); + + executeAndVerifyCommand(csb.getCommandString()); + } + + private void createAsyncEventQueue(String id, String persistent , String diskStoreName, String batchSize, String maxQueueMemory, String group) throws IOException { + String queueCommandsJarName = this.temporaryFolder.getRoot().getCanonicalPath() + File.separator + "testEndToEndSC-QueueCommands.jar"; + final File jarFile = new File(queueCommandsJarName); + + try { + ClassBuilder classBuilder = new ClassBuilder(); + byte[] jarBytes = classBuilder.createJarFromClassContent("com/qcdunit/QueueCommandsDUnitTestListener", + "package com.qcdunit;" + + "import java.util.List; import java.util.Properties;" + + "import com.gemstone.gemfire.internal.cache.xmlcache.Declarable2; import com.gemstone.gemfire.cache.asyncqueue.AsyncEvent;" + + "import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;" + + "public class QueueCommandsDUnitTestListener implements Declarable2, AsyncEventListener {" + + "Properties props;" + + "public boolean processEvents(List<AsyncEvent> events) { return true; }" + + "public void close() {}" + + "public void init(final Properties props) {this.props = props;}" + + "public Properties getConfig() {return this.props;}}"); + + writeByteArrayToFile(jarFile, jarBytes); + CommandStringBuilder csb = new CommandStringBuilder(CliStrings.DEPLOY); + csb.addOption(CliStrings.DEPLOY__JAR, queueCommandsJarName); + executeAndVerifyCommand(csb.getCommandString()); + + csb = new CommandStringBuilder(CliStrings.CREATE_ASYNC_EVENT_QUEUE); + csb.addOptionWithValueCheck(CliStrings.CREATE_ASYNC_EVENT_QUEUE__ID, id); + csb.addOptionWithValueCheck(CliStrings.CREATE_ASYNC_EVENT_QUEUE__LISTENER, "com.qcdunit.QueueCommandsDUnitTestListener"); + csb.addOptionWithValueCheck(CliStrings.CREATE_ASYNC_EVENT_QUEUE__DISK_STORE, diskStoreName); + csb.addOptionWithValueCheck(CliStrings.CREATE_ASYNC_EVENT_QUEUE__BATCH_SIZE, batchSize); + csb.addOptionWithValueCheck(CliStrings.CREATE_ASYNC_EVENT_QUEUE__GROUP, group); + csb.addOptionWithValueCheck(CliStrings.CREATE_ASYNC_EVENT_QUEUE__PERSISTENT, persistent); + csb.addOptionWithValueCheck(CliStrings.CREATE_ASYNC_EVENT_QUEUE__MAXIMUM_QUEUE_MEMORY, maxQueueMemory); + executeAndVerifyCommand(csb.getCommandString()); + + } finally { + deleteQuietly(jarFile); + } + } + + private void configurePDX(String autoSerializerClasses, String ignoreUnreadFields, String persistent, String portableAutoSerializerClasses, String readSerialized) { + CommandStringBuilder csb = new CommandStringBuilder(CliStrings.CONFIGURE_PDX); + csb.addOptionWithValueCheck(CliStrings.CONFIGURE_PDX__AUTO__SERIALIZER__CLASSES, autoSerializerClasses); + csb.addOptionWithValueCheck(CliStrings.CONFIGURE_PDX__IGNORE__UNREAD_FIELDS, ignoreUnreadFields); + csb.addOptionWithValueCheck(CliStrings.CONFIGURE_PDX__PERSISTENT, persistent); + csb.addOptionWithValueCheck(CliStrings.CONFIGURE_PDX__PORTABLE__AUTO__SERIALIZER__CLASSES, portableAutoSerializerClasses); + csb.addOptionWithValueCheck(CliStrings.CONFIGURE_PDX__READ__SERIALIZED, readSerialized); + executeAndVerifyCommand(csb.getCommandString()); + } + + private void createAndDeployJar(String jarName, String group) throws IOException { + File newDeployableJarFile = new File(jarName); + this.classBuilder.writeJarFromName("ShareConfigClass", newDeployableJarFile); + CommandStringBuilder csb = new CommandStringBuilder(CliStrings.DEPLOY); + csb.addOption(CliStrings.DEPLOY__JAR, jarName); + if (!isBlank(group)) { + csb.addOption(CliStrings.DEPLOY__GROUP, group); + } + executeAndVerifyCommand(csb.getCommandString()); + jarFileNames.add(jarName); + } + + private void undeployJar(String jarName, String group) { + CommandStringBuilder csb = new CommandStringBuilder(CliStrings.UNDEPLOY); + if (!isBlank(jarName)) { + csb.addOption(CliStrings.UNDEPLOY__JAR, jarName); + } + if (!isBlank(group)) { + csb.addOption(CliStrings.UNDEPLOY__GROUP, group); + } + executeAndVerifyCommand(csb.getCommandString()); + } + + private void alterRuntime(String copyOnRead, String lockLease, String lockTimeout, String messageSyncInterval) { + CommandStringBuilder csb = new CommandStringBuilder(CliStrings.ALTER_RUNTIME_CONFIG); + csb.addOptionWithValueCheck(CliStrings.ALTER_RUNTIME_CONFIG__COPY__ON__READ, copyOnRead); + csb.addOptionWithValueCheck(CliStrings.ALTER_RUNTIME_CONFIG__LOCK__LEASE, lockLease); + csb.addOptionWithValueCheck(CliStrings.ALTER_RUNTIME_CONFIG__LOCK__TIMEOUT, lockTimeout); + csb.addOptionWithValueCheck(CliStrings.ALTER_RUNTIME_CONFIG__MESSAGE__SYNC__INTERVAL, messageSyncInterval); + executeAndVerifyCommand(csb.toString()); + } + + private void deleteSavedJarFiles() throws IOException { + deleteMatching(new File("."), "^" + JarDeployer.JAR_PREFIX + "Deploy1.*#\\d++$"); + delete(new File("Deploy1.jar")); + } + + private void shutdownAll() throws IOException { + VM locatorAndMgr = getHost(0).getVM(3); + locatorAndMgr.invoke(new SerializableCallable() { + @Override + public Object call() throws Exception { + GemFireCacheImpl cache = (GemFireCacheImpl)CacheFactory.getAnyInstance(); + ShutdownAllRequest.send(cache.getDistributedSystem().getDistributionManager(), -1); + return null; + } + }); + + locatorAndMgr.invoke(SharedConfigurationTestUtils.cleanupLocator); + //Clean up the directories + if (serverNames != null && !serverNames.isEmpty()) { + for (String serverName : serverNames) { + final File serverDir = new File(serverName); + cleanDirectory(serverDir); + deleteDirectory(serverDir); + } + } + } + + private static class CommandBuilder { + + private CommandStringBuilder csb; + + public CommandBuilder(String commandName, Map<String, String> options) { + csb = new CommandStringBuilder(commandName); + + Set<Entry<String, String>> entries = options.entrySet(); + + Iterator<Entry<String, String>> iter = entries.iterator(); + + while (iter.hasNext()) { + Entry<String, String> entry = iter.next(); + String option = entry.getKey(); + + if (isBlank(option)) { + csb.addOption(option, entry.getValue()); + } + } + } + + public String getCommandString() { + return csb.toString(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/management/internal/pulse/TestRemoteClusterDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/management/internal/pulse/TestRemoteClusterDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/management/internal/pulse/TestRemoteClusterDUnitTest.java new file mode 100644 index 0000000..4829523 --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/management/internal/pulse/TestRemoteClusterDUnitTest.java @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.management.internal.pulse; + +import org.junit.experimental.categories.Category; +import org.junit.Test; + +import static org.junit.Assert.*; + +import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; +import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +import java.util.Map; + +import javax.management.MBeanServer; +import javax.management.ObjectName; + +import com.gemstone.gemfire.cache.Cache; +import com.gemstone.gemfire.distributed.DistributedMember; +import com.gemstone.gemfire.distributed.Locator; +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; +import com.gemstone.gemfire.management.DistributedSystemMXBean; +import com.gemstone.gemfire.management.GatewayReceiverMXBean; +import com.gemstone.gemfire.management.GatewaySenderMXBean; +import com.gemstone.gemfire.management.MBeanUtil; +import com.gemstone.gemfire.management.ManagementService; +import com.gemstone.gemfire.management.ManagementTestBase; +import com.gemstone.gemfire.management.RegionMXBean; + +import com.gemstone.gemfire.management.internal.MBeanJMXAdapter; +import com.gemstone.gemfire.test.dunit.Host; +import com.gemstone.gemfire.test.dunit.LogWriterUtils; +import com.gemstone.gemfire.test.dunit.SerializableRunnable; +import com.gemstone.gemfire.test.dunit.VM; +import com.gemstone.gemfire.test.dunit.Wait; +import com.gemstone.gemfire.test.dunit.WaitCriterion; + +/** + * This is for testing remote Cluster + * + * + */ + +@Category(DistributedTest.class) +public class TestRemoteClusterDUnitTest extends ManagementTestBase { + + private static final long serialVersionUID = 1L; + + public static MBeanServer mbeanServer = MBeanJMXAdapter.mbeanServer; + + public TestRemoteClusterDUnitTest() throws Exception { + super(); + } + + @Test + public void testMBeanCallback() throws Exception { + + VM nyLocator = getManagedNodeList().get(0); + VM nyReceiver = getManagedNodeList().get(1); + VM puneSender = getManagedNodeList().get(2); + VM managing = getManagingNode(); + VM puneLocator = Host.getLocator(); + + int punePort = (Integer) puneLocator.invoke(() -> TestRemoteClusterDUnitTest.getLocatorPort()); + + Integer nyPort = (Integer) nyLocator.invoke(() -> WANTestBase.createFirstRemoteLocator( 12, punePort )); + + puneSender.invoke(() -> WANTestBase.createCache( punePort )); + managing.invoke(() -> WANTestBase.createManagementCache( punePort )); + startManagingNode(managing); + + puneSender.invoke(() -> WANTestBase.createSender( "pn", + 12, true, 100, 300, false, false, null, true )); + managing.invoke(() -> WANTestBase.createSender( "pn", 12, + true, 100, 300, false, false, null, true )); + + puneSender.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", "pn", 1, 100, false )); + managing.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "pn", 1, 100, false )); + + WANTestBase.createCacheInVMs(nyPort, nyReceiver); + nyReceiver.invoke(() -> WANTestBase.createReceiver()); + nyReceiver.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", null, 1, 100, false )); + + WANTestBase.startSenderInVMs("pn", puneSender, managing); + + // make sure all the senders are running before doing any puts + puneSender.invoke(() -> WANTestBase.waitForSenderRunningState( "pn" )); + managing.invoke(() -> WANTestBase.waitForSenderRunningState( "pn" )); + + checkSenderMBean(puneSender, getTestMethodName() + "_PR"); + checkSenderMBean(managing, getTestMethodName() + "_PR"); + + checkReceiverMBean(nyReceiver); + + stopGatewaySender(puneSender); + startGatewaySender(puneSender); + + DistributedMember puneMember = (DistributedMember) puneSender.invoke(() -> TestRemoteClusterDUnitTest.getMember()); + + checkRemoteClusterStatus(managing, puneMember); + + } + + private static int getLocatorPort() { + return Locator.getLocators().get(0).getPort(); + } + + private static DistributedMember getMember() { + return GemFireCacheImpl.getInstance().getMyId(); + } + + /** + * Checks Proxy GatewaySender + * + * @param vm + * reference to VM + */ + @SuppressWarnings("serial") + protected void checkRemoteClusterStatus(final VM vm, + final DistributedMember senderMember) { + SerializableRunnable checkProxySender = new SerializableRunnable( + "DS Map Size") { + public void run() { + Cache cache = GemFireCacheImpl.getInstance(); + final WaitCriterion waitCriteria2 = new WaitCriterion() { + @Override + public boolean done() { + Cache cache = GemFireCacheImpl.getInstance(); + final ManagementService service = ManagementService + .getManagementService(cache); + final DistributedSystemMXBean dsBean = service + .getDistributedSystemMXBean(); + if (dsBean != null) { + return true; + } + return false; + } + @Override + public String description() { + return "wait for getDistributedSystemMXBean to complete and get results"; + } + }; + Wait.waitForCriterion(waitCriteria2, 2 * 60 * 1000, 5000, true); + ManagementService service = ManagementService + .getManagementService(cache); + final DistributedSystemMXBean dsBean = service + .getDistributedSystemMXBean(); + assertNotNull(dsBean); + Map<String, Boolean> dsMap = dsBean.viewRemoteClusterStatus(); + LogWriterUtils.getLogWriter().info( + "Ds Map is: " + dsMap.size()); + assertNotNull(dsMap); + assertEquals(true, dsMap.size() > 0 ? true : false); + } + }; + vm.invoke(checkProxySender); + } + + + /** + * stops a gateway sender + * + * @param vm + * reference to VM + */ + @SuppressWarnings("serial") + protected void stopGatewaySender(final VM vm) { + SerializableRunnable stopGatewaySender = new SerializableRunnable( + "Stop Gateway Sender") { + public void run() { + Cache cache = GemFireCacheImpl.getInstance(); + ManagementService service = ManagementService + .getManagementService(cache); + GatewaySenderMXBean bean = service.getLocalGatewaySenderMXBean("pn"); + assertNotNull(bean); + bean.stop(); + assertFalse(bean.isRunning()); + } + }; + vm.invoke(stopGatewaySender); + } + + /** + * start a gateway sender + * + * @param vm + * reference to VM + */ + @SuppressWarnings("serial") + protected void startGatewaySender(final VM vm) { + SerializableRunnable stopGatewaySender = new SerializableRunnable( + "Start Gateway Sender") { + public void run() { + Cache cache = GemFireCacheImpl.getInstance(); + ManagementService service = ManagementService + .getManagementService(cache); + GatewaySenderMXBean bean = service.getLocalGatewaySenderMXBean("pn"); + assertNotNull(bean); + bean.start(); + assertTrue(bean.isRunning()); + } + }; + vm.invoke(stopGatewaySender); + } + + /** + * Checks whether a GatewayReceiverMBean is created or not + * + * @param vm + * reference to VM + */ + @SuppressWarnings("serial") + protected void checkReceiverMBean(final VM vm) { + SerializableRunnable checkMBean = new SerializableRunnable( + "Check Receiver MBean") { + public void run() { + Cache cache = GemFireCacheImpl.getInstance(); + ManagementService service = ManagementService + .getManagementService(cache); + GatewayReceiverMXBean bean = service.getLocalGatewayReceiverMXBean(); + assertNotNull(bean); + } + }; + vm.invoke(checkMBean); + } + + /** + * Checks whether a GatewayReceiverMBean is created or not + * + * @param vm reference to VM + */ + @SuppressWarnings("serial") + protected void checkSenderMBean(final VM vm, final String regionPath) { + SerializableRunnable checkMBean = new SerializableRunnable( + "Check Sender MBean") { + public void run() { + Cache cache = GemFireCacheImpl.getInstance(); + ManagementService service = ManagementService + .getManagementService(cache); + + GatewaySenderMXBean bean = service.getLocalGatewaySenderMXBean("pn"); + assertNotNull(bean); + assertTrue(bean.isConnected()); + + ObjectName regionBeanName = service.getRegionMBeanName(cache + .getDistributedSystem().getDistributedMember(), "/" + regionPath); + RegionMXBean rBean = service.getMBeanInstance(regionBeanName, + RegionMXBean.class); + assertTrue(rBean.isGatewayEnabled()); + + } + }; + vm.invoke(checkMBean); + } +} \ No newline at end of file