Closing STRATOS-1633: Refactoring Mock IaaS
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/5344c394 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/5344c394 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/5344c394 Branch: refs/heads/stratos-4.1.x Commit: 5344c394c45886857ef4ae08063f573d3471ad6a Parents: 4e868f2 Author: Akila Perera <[email protected]> Authored: Sun Nov 29 23:52:17 2015 +0530 Committer: Akila Perera <[email protected]> Committed: Mon Nov 30 00:33:45 2015 +0530 ---------------------------------------------------------------------- .../iaas/internal/MockIaasServiceComponent.java | 6 - .../iaas/services/impl/MockIaasServiceImpl.java | 65 +++---- .../iaas/services/impl/MockIaasServiceUtil.java | 23 --- .../mock/iaas/services/impl/MockInstance.java | 187 +++++++++++++------ .../publisher/MockHealthStatisticsNotifier.java | 56 +++--- .../mock/iaas/test/MockIaasServiceTest.java | 149 +++++++++------ .../src/test/resources/jndi.properties | 22 +++ .../src/test/resources/thrift-client-config.xml | 50 +++++ 8 files changed, 336 insertions(+), 222 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/5344c394/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/internal/MockIaasServiceComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/internal/MockIaasServiceComponent.java b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/internal/MockIaasServiceComponent.java index bc384c6..07cf540 100644 --- a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/internal/MockIaasServiceComponent.java +++ b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/internal/MockIaasServiceComponent.java @@ -65,12 +65,6 @@ public class MockIaasServiceComponent { // Wait for stratos manager to be activated componentStartUpSynchronizer.waitForComponentActivation(Component.MockIaaS, Component.StratosManager); - - PersistenceManager persistenceManager = - PersistenceManagerFactory.getPersistenceManager(PersistenceManagerType.Registry); - MockIaasServiceUtil mockIaasServiceUtil = new MockIaasServiceUtil(persistenceManager); - mockIaasServiceUtil.startInstancesPersisted(); - MockIaasService mockIaasService = new MockIaasServiceImpl(); context.getBundleContext().registerService(MockIaasService.class.getName(), mockIaasService, null); log.info("Mock IaaS service registered"); http://git-wip-us.apache.org/repos/asf/stratos/blob/5344c394/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockIaasServiceImpl.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockIaasServiceImpl.java b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockIaasServiceImpl.java index 2090199..106ece7 100644 --- a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockIaasServiceImpl.java +++ b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockIaasServiceImpl.java @@ -21,7 +21,6 @@ package org.apache.stratos.mock.iaas.services.impl; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.stratos.common.threading.StratosThreadPool; import org.apache.stratos.mock.iaas.domain.MockInstanceContext; import org.apache.stratos.mock.iaas.domain.MockInstanceMetadata; import org.apache.stratos.mock.iaas.exceptions.MockIaasException; @@ -32,12 +31,8 @@ import org.apache.stratos.mock.iaas.services.MockIaasService; import org.apache.stratos.mock.iaas.statistics.generator.MockHealthStatisticsGenerator; import org.wso2.carbon.registry.core.exceptions.RegistryException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.UUID; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; /** * Mock IaaS service implementation. This is a singleton class that simulates a standard Infrastructure as a Service @@ -52,14 +47,9 @@ import java.util.concurrent.ExecutorService; public class MockIaasServiceImpl implements MockIaasService { private static final Log log = LogFactory.getLog(MockIaasServiceImpl.class); - - private static final ExecutorService mockMemberExecutorService = StratosThreadPool - .getExecutorService(MockConstants.MOCK_MEMBER_THREAD_POOL, MockConstants.MOCK_MEMBER_THREAD_POOL_SIZE); - private static volatile MockIaasServiceImpl instance; - private PersistenceManager persistenceManager; private MockIaasServiceUtil mockIaasServiceUtil; - private Map<String, MockInstance> instanceIdToMockInstanceMap; // Map<InstanceId,MockInstance> + private Map<String, MockInstance> instanceIdToMockInstanceMap; /** * Default public constructor @@ -71,8 +61,8 @@ public class MockIaasServiceImpl implements MockIaasService { PersistenceManagerType persistenceManagerType = PersistenceManagerType.valueOf(persistenceManagerTypeStr); persistenceManager = PersistenceManagerFactory.getPersistenceManager(persistenceManagerType); mockIaasServiceUtil = new MockIaasServiceUtil(persistenceManager); - instanceIdToMockInstanceMap = mockIaasServiceUtil.readFromRegistry(); + startPersistedMockInstances(); } catch (RegistryException e) { String message = "Could not read service name -> mock member map from registry"; log.error(message, e); @@ -84,6 +74,23 @@ public class MockIaasServiceImpl implements MockIaasService { } } + private void startPersistedMockInstances() throws RegistryException { + if (instanceIdToMockInstanceMap != null) { + log.info("Starting mock instances persisted..."); + Set<String> serviceNameSet = new HashSet<String>(); + for (MockInstance mockInstance : instanceIdToMockInstanceMap.values()) { + mockInstance.initialize(); + + // Schedule statistics updater tasks for service + String serviceName = mockInstance.getMockInstanceContext().getServiceName(); + if (!serviceNameSet.contains(serviceName)) { + MockHealthStatisticsGenerator.getInstance().scheduleStatisticsUpdaterTasks(serviceName); + serviceNameSet.add(serviceName); + } + } + } + } + /** * Start mock instance. * @@ -104,40 +111,24 @@ public class MockIaasServiceImpl implements MockIaasService { MockInstance mockInstance = new MockInstance(mockInstanceContext); instanceIdToMockInstanceMap.put(instanceId, mockInstance); - mockMemberExecutorService.submit(mockInstance); + mockInstance.initialize(); // Persist changes mockIaasServiceUtil .persistInRegistry((ConcurrentHashMap<String, MockInstance>) instanceIdToMockInstanceMap); - String serviceName = mockInstanceContext.getServiceName(); MockHealthStatisticsGenerator.getInstance().scheduleStatisticsUpdaterTasks(serviceName); - - // Simulate instance creation time - sleep(2000); - return new MockInstanceMetadata(mockInstanceContext); } } catch (Exception e) { - String msg = "Could not start mock instance: " + mockInstanceContext.getMemberId(); + String msg = String + .format("Could not start mock instance: [member-id] %s", mockInstanceContext.getMemberId()); log.error(msg, e); throw new MockIaasException(msg, e); } } /** - * Sleep the current thread for a given period of time - * - * @param time time in milliseconds - */ - private void sleep(int time) { - try { - Thread.sleep(time); - } catch (InterruptedException ignore) { - } - } - - /** * Get mock instances. * * @return a list of mock instance metadata objects @@ -199,12 +190,10 @@ public class MockIaasServiceImpl implements MockIaasService { public void terminateInstance(String instanceId) { try { synchronized (MockIaasServiceImpl.class) { - log.info(String.format("Terminating instance: [instance-id] %s", instanceId)); - + log.info(String.format("Terminating mock instance: [instance-id] %s", instanceId)); MockInstance mockInstance = instanceIdToMockInstanceMap.get(instanceId); if (mockInstance != null) { String serviceName = mockInstance.getMockInstanceContext().getServiceName(); - mockInstance.terminate(); instanceIdToMockInstanceMap.remove(instanceId); mockIaasServiceUtil @@ -214,13 +203,13 @@ public class MockIaasServiceImpl implements MockIaasService { MockHealthStatisticsGenerator.getInstance().stopStatisticsUpdaterTasks(serviceName); } - log.info(String.format("Instance terminated successfully: [instance-id] %s", instanceId)); + log.info(String.format("Mock instance terminated successfully: [instance-id] %s", instanceId)); } else { - log.warn(String.format("Instance not found: [instance-id] %s", instanceId)); + log.warn(String.format("Mock instance not found: [instance-id] %s", instanceId)); } } } catch (Exception e) { - String msg = "Could not terminate mock instance: " + instanceId; + String msg = String.format("Could not terminate mock instance: [instance-id] %s", instanceId); log.error(msg, e); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/5344c394/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockIaasServiceUtil.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockIaasServiceUtil.java b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockIaasServiceUtil.java index cbdc090..c950528 100644 --- a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockIaasServiceUtil.java +++ b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockIaasServiceUtil.java @@ -57,27 +57,4 @@ public class MockIaasServiceUtil { public ConcurrentHashMap<String, MockInstance> readFromRegistry() throws RegistryException { return (ConcurrentHashMap<String, MockInstance>) persistenceManager.read(MOCK_IAAS_MEMBERS); } - - public void startInstancesPersisted() throws RegistryException { - Map<String, MockInstance> instanceIdToMockMemberMap = readFromRegistry(); - ExecutorService mockMemberExecutorService = - StratosThreadPool.getExecutorService(MockConstants.MOCK_MEMBER_THREAD_POOL, - MockConstants.MOCK_MEMBER_THREAD_POOL_SIZE); - - if (instanceIdToMockMemberMap != null) { - log.info("Starting mock instances persisted..."); - - Set<String> serviceNameSet = new HashSet<String>(); - for (MockInstance mockInstance : instanceIdToMockMemberMap.values()) { - mockMemberExecutorService.submit(mockInstance); - - // Schedule statistics updater tasks for service - String serviceName = mockInstance.getMockInstanceContext().getServiceName(); - if (!serviceNameSet.contains(serviceName)) { - MockHealthStatisticsGenerator.getInstance().scheduleStatisticsUpdaterTasks(serviceName); - serviceNameSet.add(serviceName); - } - } - } - } } http://git-wip-us.apache.org/repos/asf/stratos/blob/5344c394/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java index a8f3bcc..7b31861 100644 --- a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java +++ b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java @@ -21,20 +21,26 @@ package org.apache.stratos.mock.iaas.services.impl; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.stratos.common.domain.NameValuePair; import org.apache.stratos.common.threading.StratosThreadPool; +import org.apache.stratos.messaging.domain.topology.MemberStatus; import org.apache.stratos.messaging.event.Event; import org.apache.stratos.messaging.event.instance.notifier.InstanceCleanupClusterEvent; import org.apache.stratos.messaging.event.instance.notifier.InstanceCleanupMemberEvent; +import org.apache.stratos.messaging.event.topology.MemberInitializedEvent; +import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent; +import org.apache.stratos.messaging.event.topology.MemberStartedEvent; import org.apache.stratos.messaging.listener.instance.notifier.InstanceCleanupClusterEventListener; import org.apache.stratos.messaging.listener.instance.notifier.InstanceCleanupMemberEventListener; +import org.apache.stratos.messaging.listener.topology.MemberInitializedEventListener; +import org.apache.stratos.messaging.listener.topology.MemberMaintenanceListener; +import org.apache.stratos.messaging.listener.topology.MemberStartedEventListener; import org.apache.stratos.messaging.message.receiver.instance.notifier.InstanceNotifierEventReceiver; +import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver; import org.apache.stratos.mock.iaas.domain.MockInstanceContext; import org.apache.stratos.mock.iaas.event.publisher.MockMemberEventPublisher; import org.apache.stratos.mock.iaas.statistics.publisher.MockHealthStatisticsNotifier; import java.io.Serializable; -import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -44,41 +50,117 @@ import java.util.concurrent.atomic.AtomicBoolean; /** * Mock member instance definition. */ -public class MockInstance implements Runnable, Serializable { +public class MockInstance implements Serializable { private static final Log log = LogFactory.getLog(MockInstance.class); private static final int HEALTH_STAT_INTERVAL = 15; // 15 seconds - private final MockInstanceContext mockInstanceContext; + private transient ScheduledFuture<?> healthStatNotifierScheduledFuture; private transient InstanceNotifierEventReceiver instanceNotifierEventReceiver; + private transient TopologyEventReceiver topologyEventReceiver; + private transient MockHealthStatisticsNotifier mockHealthStatisticsNotifier; + + // this is the mock iaas instance runtime status, do not persist this state + private transient MemberStatus memberStatus = MemberStatus.Created; + + private final MockInstanceContext mockInstanceContext; + private final AtomicBoolean hasGracefullyShutdown = new AtomicBoolean(false); + private static final ExecutorService eventListenerExecutorService = StratosThreadPool - .getExecutorService("mock.iaas.event.listener.thread.pool", 20); + .getExecutorService("mock.iaas.event.listener.thread.pool", 100); private static final ScheduledExecutorService healthStatNotifierExecutorService = StratosThreadPool - .getScheduledExecutorService("mock.iaas.health.statistics.notifier.thread.pool", 20); - AtomicBoolean hasGracefullyShutdown = new AtomicBoolean(false); + .getScheduledExecutorService("mock.iaas.health.statistics.notifier.thread.pool", 100); public MockInstance(MockInstanceContext mockInstanceContext) { this.mockInstanceContext = mockInstanceContext; } - @Override - public void run() { - if (log.isInfoEnabled()) { - log.info(String.format("Mock member started: [member-id] %s", mockInstanceContext.getMemberId())); + public synchronized void initialize() { + if (MemberStatus.Created.equals(memberStatus) || memberStatus == null) { + startTopologyEventReceiver(); + startInstanceNotifierEventReceiver(); + startHealthStatisticsPublisher(); + memberStatus = MemberStatus.Initialized; + if (log.isInfoEnabled()) { + log.info(String.format("Mock instance initialized: [member-id] %s", mockInstanceContext.getMemberId())); + } + } else { + if (log.isInfoEnabled()) { + log.info(String.format( + "Mock instance cannot be initialized since it is not in created state: [member-id] %s [status] " + + "%s", mockInstanceContext.getMemberId(), memberStatus)); + } } - sleep(5000); - MockMemberEventPublisher.publishInstanceStartedEvent(mockInstanceContext); - sleep(5000); - MockMemberEventPublisher.publishInstanceActivatedEvent(mockInstanceContext); - startInstanceNotifierReceiver(); - startHealthStatisticsPublisher(); } - private void startInstanceNotifierReceiver() { + private void startHealthStatisticsPublisher() { + mockHealthStatisticsNotifier = new MockHealthStatisticsNotifier(mockInstanceContext); + if (log.isDebugEnabled()) { + log.debug(String.format("Starting health statistics notifier: [member-id] %s", + mockInstanceContext.getMemberId())); + } + healthStatNotifierScheduledFuture = healthStatNotifierExecutorService + .scheduleAtFixedRate(mockHealthStatisticsNotifier, 0, HEALTH_STAT_INTERVAL, TimeUnit.SECONDS); + if (log.isDebugEnabled()) { - log.debug("Starting instance notifier event message receiver for mock member [member-id] " - + mockInstanceContext.getMemberId()); + log.debug(String.format("Health statistics notifier started: [member-id] %s", + mockInstanceContext.getMemberId())); } + } + private void startTopologyEventReceiver() { + topologyEventReceiver = new TopologyEventReceiver(); + topologyEventReceiver.addEventListener(new MemberInitializedEventListener() { + @Override + protected void onEvent(Event event) { + MemberInitializedEvent memberInitializedEvent = (MemberInitializedEvent) event; + if (memberInitializedEvent.getMemberId().equals(mockInstanceContext.getMemberId())) { + MockMemberEventPublisher.publishInstanceStartedEvent(mockInstanceContext); + + if (log.isInfoEnabled()) { + log.info(String.format("Mock member started event published for [member-id] %s", + mockInstanceContext.getMemberId())); + } + } + } + }); + topologyEventReceiver.addEventListener(new MemberStartedEventListener() { + @Override + protected void onEvent(Event event) { + MemberStartedEvent memberStartedEvent = (MemberStartedEvent) event; + if (memberStartedEvent.getMemberId().equals(mockInstanceContext.getMemberId())) { + MockMemberEventPublisher.publishInstanceActivatedEvent(mockInstanceContext); + + if (log.isInfoEnabled()) { + log.info(String.format("Mock member activated event published for [member-id] %s", + mockInstanceContext.getMemberId())); + } + } + } + }); + topologyEventReceiver.addEventListener(new MemberMaintenanceListener() { + @Override + protected void onEvent(Event event) { + MemberMaintenanceModeEvent memberMaintenanceModeEvent = (MemberMaintenanceModeEvent) event; + if (memberMaintenanceModeEvent.getMemberId().equals(mockInstanceContext.getMemberId())) { + MockMemberEventPublisher.publishInstanceReadyToShutdownEvent(mockInstanceContext); + hasGracefullyShutdown.set(true); + if (log.isInfoEnabled()) { + log.info(String.format("Mock member ready to shutdown event published for [member-id] %s", + mockInstanceContext.getMemberId())); + } + } + } + }); + topologyEventReceiver.setExecutorService(eventListenerExecutorService); + topologyEventReceiver.execute(); + if (log.isDebugEnabled()) { + log.debug(String.format( + "Mock instance topology event message receiver started for mock member [member-id] %s", + mockInstanceContext.getMemberId())); + } + } + + private void startInstanceNotifierEventReceiver() { instanceNotifierEventReceiver = new InstanceNotifierEventReceiver(); instanceNotifierEventReceiver.addEventListener(new InstanceCleanupClusterEventListener() { @Override @@ -101,25 +183,24 @@ public class MockInstance implements Runnable, Serializable { } } }); - + // TODO: Fix InstanceNotifierEventReceiver to use executor service + // do not remove this since execute() is a blocking call eventListenerExecutorService.submit(new Runnable() { @Override public void run() { instanceNotifierEventReceiver.execute(); } }); - if (log.isDebugEnabled()) { - log.debug("Instance notifier event message receiver started"); + log.debug(String.format( + "Mock instance instance notifier event message receiver started for mock member [member-id] %s", + mockInstanceContext.getMemberId())); } } private void handleMemberTermination() { if (!hasGracefullyShutdown.get()) { MockMemberEventPublisher.publishMaintenanceModeEvent(mockInstanceContext); - sleep(5000); - MockMemberEventPublisher.publishInstanceReadyToShutdownEvent(mockInstanceContext); - hasGracefullyShutdown.set(true); } else { if (log.isDebugEnabled()) { log.debug(String.format("Mock instance is already gracefully shutdown [member-id] %s", @@ -128,50 +209,36 @@ public class MockInstance implements Runnable, Serializable { } } - private void startHealthStatisticsPublisher() { - if (log.isDebugEnabled()) { - log.debug(String.format("Starting health statistics notifier: [member-id] %s", - mockInstanceContext.getMemberId())); - } - - healthStatNotifierScheduledFuture = healthStatNotifierExecutorService - .scheduleAtFixedRate(new MockHealthStatisticsNotifier(mockInstanceContext), 0, HEALTH_STAT_INTERVAL, - TimeUnit.SECONDS); - - if (log.isDebugEnabled()) { - log.debug(String.format("Health statistics notifier started: [member-id] %s", - mockInstanceContext.getMemberId())); - } - } - private void stopHealthStatisticsPublisher() { - if (healthStatNotifierScheduledFuture != null) { - healthStatNotifierScheduledFuture.cancel(true); - } + healthStatNotifierScheduledFuture.cancel(true); } private void stopInstanceNotifierReceiver() { - if (instanceNotifierEventReceiver != null) { - instanceNotifierEventReceiver.terminate(); - } - } - - private void sleep(long time) { - try { - Thread.sleep(time); - } catch (InterruptedException ignore) { - } + instanceNotifierEventReceiver.terminate(); } public MockInstanceContext getMockInstanceContext() { return mockInstanceContext; } - public void terminate() { - stopInstanceNotifierReceiver(); - stopHealthStatisticsPublisher(); - if (log.isInfoEnabled()) { - log.info(String.format("Mock member terminated: [member-id] %s", mockInstanceContext.getMemberId())); + public synchronized void terminate() { + if (MemberStatus.Initialized.equals(memberStatus)) { + stopInstanceNotifierReceiver(); + stopHealthStatisticsPublisher(); + memberStatus = MemberStatus.Terminated; + if (log.isInfoEnabled()) { + log.info(String.format("Mock instance stopped: [member-id] %s", mockInstanceContext.getMemberId())); + } + } else { + if (log.isInfoEnabled()) { + log.info(String.format( + "Mock instance cannot be terminated since it is not in initialized state: [member-id] %s ", + mockInstanceContext.getMemberId())); + } } } + + public MockHealthStatisticsNotifier getMockHealthStatisticsNotifier() { + return mockHealthStatisticsNotifier; + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/5344c394/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java index c2d1c6c..4f3de51 100644 --- a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java +++ b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java @@ -42,12 +42,12 @@ public class MockHealthStatisticsNotifier implements Runnable { public MockHealthStatisticsNotifier(MockInstanceContext mockMemberContext) { this.mockMemberContext = mockMemberContext; - this.healthStatisticsPublisher = HealthStatisticsPublisherFactory.createHealthStatisticsPublisher( - StatisticsPublisherType.WSO2CEP); + this.healthStatisticsPublisher = HealthStatisticsPublisherFactory + .createHealthStatisticsPublisher(StatisticsPublisherType.WSO2CEP); this.healthStatisticsPublisher.setEnabled(true); - this.inFlightRequestPublisher = InFlightRequestPublisherFactory.createInFlightRequestPublisher( - StatisticsPublisherType.WSO2CEP); + this.inFlightRequestPublisher = InFlightRequestPublisherFactory + .createInFlightRequestPublisher(StatisticsPublisherType.WSO2CEP); this.inFlightRequestPublisher.setEnabled(true); } @@ -61,22 +61,17 @@ public class MockHealthStatisticsNotifier implements Runnable { } try { - double memoryConsumption = MockHealthStatistics.getInstance().getStatistics( - mockMemberContext.getServiceName(), MockScalingFactor.MemoryConsumption); + double memoryConsumption = MockHealthStatistics.getInstance() + .getStatistics(mockMemberContext.getServiceName(), MockScalingFactor.MemoryConsumption); if (log.isDebugEnabled()) { log.debug(String.format("Publishing memory consumption: [member-id] %s [value] %f", mockMemberContext.getMemberId(), memoryConsumption)); } - healthStatisticsPublisher.publish( - mockMemberContext.getClusterId(), - mockMemberContext.getClusterInstanceId(), - mockMemberContext.getNetworkPartitionId(), - mockMemberContext.getMemberId(), - mockMemberContext.getPartitionId(), - MEMORY_CONSUMPTION, - memoryConsumption - ); + healthStatisticsPublisher + .publish(mockMemberContext.getClusterId(), mockMemberContext.getClusterInstanceId(), + mockMemberContext.getNetworkPartitionId(), mockMemberContext.getMemberId(), + mockMemberContext.getPartitionId(), MEMORY_CONSUMPTION, memoryConsumption); } catch (NoStatisticsFoundException ignore) { } catch (Exception e) { if (log.isErrorEnabled()) { @@ -84,23 +79,17 @@ public class MockHealthStatisticsNotifier implements Runnable { } } - try { - double loadAvereage = MockHealthStatistics.getInstance().getStatistics( - mockMemberContext.getServiceName(), MockScalingFactor.LoadAverage); + double loadAvereage = MockHealthStatistics.getInstance() + .getStatistics(mockMemberContext.getServiceName(), MockScalingFactor.LoadAverage); if (log.isDebugEnabled()) { log.debug(String.format("Publishing load average: [member-id] %s [value] %f", mockMemberContext.getMemberId(), loadAvereage)); } - healthStatisticsPublisher.publish( - mockMemberContext.getClusterId(), - mockMemberContext.getClusterInstanceId(), - mockMemberContext.getNetworkPartitionId(), - mockMemberContext.getMemberId(), - mockMemberContext.getPartitionId(), - LOAD_AVERAGE, - loadAvereage - ); + healthStatisticsPublisher + .publish(mockMemberContext.getClusterId(), mockMemberContext.getClusterInstanceId(), + mockMemberContext.getNetworkPartitionId(), mockMemberContext.getMemberId(), + mockMemberContext.getPartitionId(), LOAD_AVERAGE, loadAvereage); } catch (NoStatisticsFoundException ignore) { } catch (Exception e) { if (log.isErrorEnabled()) { @@ -109,17 +98,14 @@ public class MockHealthStatisticsNotifier implements Runnable { } try { - int requestsInFlight = MockHealthStatistics.getInstance().getStatistics( - mockMemberContext.getServiceName(), MockScalingFactor.RequestsInFlight); + int requestsInFlight = MockHealthStatistics.getInstance() + .getStatistics(mockMemberContext.getServiceName(), MockScalingFactor.RequestsInFlight); if (log.isDebugEnabled()) { - log.debug(String.format("Publishing requests in flight: [member-id] %s [value] %f", + log.debug(String.format("Publishing requests in flight: [member-id] %s [value] %d", mockMemberContext.getMemberId(), requestsInFlight)); } - inFlightRequestPublisher.publish( - mockMemberContext.getClusterId(), - mockMemberContext.getClusterInstanceId(), - mockMemberContext.getNetworkPartitionId(), - requestsInFlight); + inFlightRequestPublisher.publish(mockMemberContext.getClusterId(), mockMemberContext.getClusterInstanceId(), + mockMemberContext.getNetworkPartitionId(), requestsInFlight); } catch (NoStatisticsFoundException ignore) { } catch (Exception e) { if (log.isErrorEnabled()) { http://git-wip-us.apache.org/repos/asf/stratos/blob/5344c394/components/org.apache.stratos.mock.iaas/src/test/java/org/apache/stratos/mock/iaas/test/MockIaasServiceTest.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.mock.iaas/src/test/java/org/apache/stratos/mock/iaas/test/MockIaasServiceTest.java b/components/org.apache.stratos.mock.iaas/src/test/java/org/apache/stratos/mock/iaas/test/MockIaasServiceTest.java index 8b7aa1f..c9c0478 100644 --- a/components/org.apache.stratos.mock.iaas/src/test/java/org/apache/stratos/mock/iaas/test/MockIaasServiceTest.java +++ b/components/org.apache.stratos.mock.iaas/src/test/java/org/apache/stratos/mock/iaas/test/MockIaasServiceTest.java @@ -19,16 +19,20 @@ package org.apache.stratos.mock.iaas.test; +import org.apache.activemq.broker.BrokerService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.common.statistics.publisher.ThriftClientConfig; import org.apache.stratos.mock.iaas.config.MockIaasConfig; import org.apache.stratos.mock.iaas.domain.MockInstanceContext; import org.apache.stratos.mock.iaas.domain.MockInstanceMetadata; import org.apache.stratos.mock.iaas.persistence.PersistenceManagerType; import org.apache.stratos.mock.iaas.services.impl.MockConstants; import org.apache.stratos.mock.iaas.services.impl.MockIaasServiceImpl; +import org.junit.BeforeClass; import org.junit.Test; +import java.io.File; import java.net.URL; import java.util.List; @@ -41,58 +45,93 @@ public class MockIaasServiceTest { private static Log log = LogFactory.getLog(MockIaasServiceTest.class); private static final String CONFIG_FILE_PATH = "/mock-iaas.xml"; + private static BrokerService broker; + + @BeforeClass + public static void init() { + URL configFileUrl = MockIaasServiceTest.class.getResource(CONFIG_FILE_PATH); + System.setProperty(MockIaasConfig.MOCK_IAAS_CONFIG_FILE_PATH, configFileUrl.getPath()); + System.setProperty(MockConstants.PERSISTENCE_MANAGER_TYPE, PersistenceManagerType.Mock.toString()); + System.setProperty(ThriftClientConfig.THRIFT_CLIENT_CONFIG_FILE_PATH, + MockIaasServiceTest.class.getResource("/thrift-client-config.xml").getPath()); + System.setProperty("carbon.home", MockIaasServiceTest.class.getResource("/").getPath()); + System.setProperty("jndi.properties.dir", MockIaasServiceTest.class.getResource("/").getPath()); + initializeActiveMQ(); + startActiveMQ(); + } - @Test - public void testStartInstance() { - + private static void initializeActiveMQ() { try { - URL configFileUrl = getClass().getResource(CONFIG_FILE_PATH); - System.setProperty(MockIaasConfig.MOCK_IAAS_CONFIG_FILE_PATH, configFileUrl.getPath()); - System.setProperty(MockConstants.PERSISTENCE_MANAGER_TYPE, PersistenceManagerType.Mock.toString()); - - MockIaasServiceImpl mockIaasService = new MockIaasServiceImpl(); - MockInstanceContext mockInstanceContext = new MockInstanceContext("app1", "service1", "cluster1", "member1", - "cluster-instance1", "network-p1", "p1"); - MockInstanceMetadata metadata = mockIaasService.startInstance(mockInstanceContext); - assertNotNull("Could not start mock instance", metadata); - assertNotNull("Mock instance not found", mockIaasService.getInstance(metadata.getInstanceId())); + log.info("Initializing ActiveMQ..."); + broker = new BrokerService(); + broker.setDataDirectory(MockIaasServiceTest.class.getResource("/").getPath() + + File.separator + ".." + File.separator + "activemq-data"); + broker.setBrokerName("testBroker"); + broker.addConnector("tcp://localhost:61617"); } catch (Exception e) { - log.error(e); - assertTrue(e.getMessage(), false); + throw new RuntimeException("Could not initialize ActiveMQ", e); } } - @Test - public void testGetInstances() { + private static void startActiveMQ() { + try { + long time1 = System.currentTimeMillis(); + broker.start(); + long time2 = System.currentTimeMillis(); + log.info(String.format("ActiveMQ started in %d sec", (time2 - time1) / 1000)); + } catch (Exception e) { + throw new RuntimeException("Could not start ActiveMQ", e); + } + } + private void stopActiveMQ() { try { - URL configFileUrl = getClass().getResource(CONFIG_FILE_PATH); - System.setProperty(MockIaasConfig.MOCK_IAAS_CONFIG_FILE_PATH, configFileUrl.getPath()); - System.setProperty(MockConstants.PERSISTENCE_MANAGER_TYPE, PersistenceManagerType.Mock.toString()); - - MockIaasServiceImpl mockIaasService = new MockIaasServiceImpl(); - MockInstanceContext mockInstanceContext = new MockInstanceContext("app1", "service1", "cluster1", "member1", - "cluster-instance1", "network-p1", "p1"); - MockInstanceMetadata metadata1 = mockIaasService.startInstance(mockInstanceContext); - assertNotNull("Could not start mock instance", metadata1); - assertNotNull("Mock instance not found", mockIaasService.getInstance(metadata1.getInstanceId())); - - mockInstanceContext = new MockInstanceContext("app1", "service1", "cluster1", "member2", - "cluster-instance1", "network-p1", "p1"); - MockInstanceMetadata metadata2 = mockIaasService.startInstance(mockInstanceContext); - assertNotNull("Could not start mock instance", metadata2); - assertNotNull("Mock instance not found", mockIaasService.getInstance(metadata2.getInstanceId())); - - List<MockInstanceMetadata> instances = mockIaasService.getInstances(); - assertNotNull(instances); - assertTrue("Mock instance 1 not found in get instances result", instanceExist(instances, metadata1.getInstanceId())); - assertTrue("Mock instance 2 not found in get instances result", instanceExist(instances, metadata2.getInstanceId())); + broker.stop(); } catch (Exception e) { - log.error(e); - assertTrue(e.getMessage(), false); + throw new RuntimeException("Could not stop ActiveMQ", e); + } + } + + private void sleep(long time) { + try { + Thread.sleep(time); + } catch (InterruptedException ignore) { } } + @Test + public void testStartInstance() throws Exception { + MockIaasServiceImpl mockIaasService = new MockIaasServiceImpl(); + MockInstanceContext mockInstanceContext = new MockInstanceContext("app1", "service1", "cluster1", "member1", + "cluster-instance1", "network-p1", "p1"); + MockInstanceMetadata metadata = mockIaasService.startInstance(mockInstanceContext); + assertNotNull("Could not start mock instance", metadata); + assertNotNull("Mock instance not found", mockIaasService.getInstance(metadata.getInstanceId())); + } + + @Test + public void testGetInstances() throws Exception { + MockIaasServiceImpl mockIaasService = new MockIaasServiceImpl(); + MockInstanceContext mockInstanceContext = new MockInstanceContext("app1", "service1", "cluster1", "member1", + "cluster-instance1", "network-p1", "p1"); + MockInstanceMetadata metadata1 = mockIaasService.startInstance(mockInstanceContext); + assertNotNull("Could not start mock instance", metadata1); + assertNotNull("Mock instance not found", mockIaasService.getInstance(metadata1.getInstanceId())); + + mockInstanceContext = new MockInstanceContext("app1", "service1", "cluster1", "member2", "cluster-instance1", + "network-p1", "p1"); + MockInstanceMetadata metadata2 = mockIaasService.startInstance(mockInstanceContext); + assertNotNull("Could not start mock instance", metadata2); + assertNotNull("Mock instance not found", mockIaasService.getInstance(metadata2.getInstanceId())); + + List<MockInstanceMetadata> instances = mockIaasService.getInstances(); + assertNotNull(instances); + assertTrue("Mock instance 1 not found in get instances result", + instanceExist(instances, metadata1.getInstanceId())); + assertTrue("Mock instance 2 not found in get instances result", + instanceExist(instances, metadata2.getInstanceId())); + } + private boolean instanceExist(List<MockInstanceMetadata> instances, String instanceId) { for (MockInstanceMetadata instance : instances) { if (instance.getInstanceId().equals(instanceId)) { @@ -103,25 +142,15 @@ public class MockIaasServiceTest { } @Test - public void testTerminateInstance() { - - try { - URL configFileUrl = getClass().getResource(CONFIG_FILE_PATH); - System.setProperty(MockIaasConfig.MOCK_IAAS_CONFIG_FILE_PATH, configFileUrl.getPath()); - System.setProperty(MockConstants.PERSISTENCE_MANAGER_TYPE, PersistenceManagerType.Mock.toString()); - - MockIaasServiceImpl mockIaasService = new MockIaasServiceImpl(); - MockInstanceContext mockInstanceContext = new MockInstanceContext("app1", "service1", "cluster1", "member1", - "cluster-instance1", "network-p1", "p1"); - MockInstanceMetadata metadata = mockIaasService.startInstance(mockInstanceContext); - assertNotNull("Could not start mock instance", metadata); - assertNotNull("Mock instance not found", mockIaasService.getInstance(metadata.getInstanceId())); - - mockIaasService.terminateInstance(metadata.getInstanceId()); - assertNull("Could not terminate mock instance", mockIaasService.getInstance(metadata.getInstanceId())); - } catch (Exception e) { - log.error(e); - assertTrue(e.getMessage(), false); - } + public void testTerminateInstance() throws Exception { + MockIaasServiceImpl mockIaasService = new MockIaasServiceImpl(); + MockInstanceContext mockInstanceContext = new MockInstanceContext("app1", "service1", "cluster1", "member1", + "cluster-instance1", "network-p1", "p1"); + MockInstanceMetadata metadata = mockIaasService.startInstance(mockInstanceContext); + assertNotNull("Could not start mock instance", metadata); + assertNotNull("Mock instance not found", mockIaasService.getInstance(metadata.getInstanceId())); + + mockIaasService.terminateInstance(metadata.getInstanceId()); + assertNull("Could not terminate mock instance", mockIaasService.getInstance(metadata.getInstanceId())); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/5344c394/components/org.apache.stratos.mock.iaas/src/test/resources/jndi.properties ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.mock.iaas/src/test/resources/jndi.properties b/components/org.apache.stratos.mock.iaas/src/test/resources/jndi.properties new file mode 100644 index 0000000..beefe3c --- /dev/null +++ b/components/org.apache.stratos.mock.iaas/src/test/resources/jndi.properties @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +connectionfactoryName=TopicConnectionFactory +java.naming.provider.url=tcp://localhost:61617 +java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory http://git-wip-us.apache.org/repos/asf/stratos/blob/5344c394/components/org.apache.stratos.mock.iaas/src/test/resources/thrift-client-config.xml ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.mock.iaas/src/test/resources/thrift-client-config.xml b/components/org.apache.stratos.mock.iaas/src/test/resources/thrift-client-config.xml new file mode 100644 index 0000000..f828e0d --- /dev/null +++ b/components/org.apache.stratos.mock.iaas/src/test/resources/thrift-client-config.xml @@ -0,0 +1,50 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> + +<!-- Apache thrift client configuration for publishing statistics to WSO2 CEP and WSO2 DAS --> +<thriftClientConfiguration> + <config> + <cep> + <node id="node-01"> + <statsPublisherEnabled>true</statsPublisherEnabled> + <username>admincep1</username> + <password>1234cep1</password> + <ip>192.168.10.10</ip> + <port>9300</port> + </node> + <node id="node-02"> + <statsPublisherEnabled>true</statsPublisherEnabled> + <username>admincep2</username> + <password>1234cep2</password> + <ip>192.168.10.20</ip> + <port>9300</port> + </node> + </cep> + <das> + <node id="node-01"> + <statsPublisherEnabled>true</statsPublisherEnabled> + <username>admindas1</username> + <password>1234das1</password> + <ip>192.168.10.11</ip> + <port>9301</port> + </node> + </das> + </config> +</thriftClientConfiguration> \ No newline at end of file
