Closing STRATOS-1633: Refactoring Mock IaaS

Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/5344c394
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/5344c394
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/5344c394

Branch: refs/heads/stratos-4.1.x
Commit: 5344c394c45886857ef4ae08063f573d3471ad6a
Parents: 4e868f2
Author: Akila Perera <[email protected]>
Authored: Sun Nov 29 23:52:17 2015 +0530
Committer: Akila Perera <[email protected]>
Committed: Mon Nov 30 00:33:45 2015 +0530

----------------------------------------------------------------------
 .../iaas/internal/MockIaasServiceComponent.java |   6 -
 .../iaas/services/impl/MockIaasServiceImpl.java |  65 +++----
 .../iaas/services/impl/MockIaasServiceUtil.java |  23 ---
 .../mock/iaas/services/impl/MockInstance.java   | 187 +++++++++++++------
 .../publisher/MockHealthStatisticsNotifier.java |  56 +++---
 .../mock/iaas/test/MockIaasServiceTest.java     | 149 +++++++++------
 .../src/test/resources/jndi.properties          |  22 +++
 .../src/test/resources/thrift-client-config.xml |  50 +++++
 8 files changed, 336 insertions(+), 222 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/5344c394/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/internal/MockIaasServiceComponent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/internal/MockIaasServiceComponent.java
 
b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/internal/MockIaasServiceComponent.java
index bc384c6..07cf540 100644
--- 
a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/internal/MockIaasServiceComponent.java
+++ 
b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/internal/MockIaasServiceComponent.java
@@ -65,12 +65,6 @@ public class MockIaasServiceComponent {
                     // Wait for stratos manager to be activated
                     
componentStartUpSynchronizer.waitForComponentActivation(Component.MockIaaS,
                             Component.StratosManager);
-
-                    PersistenceManager persistenceManager =
-                            
PersistenceManagerFactory.getPersistenceManager(PersistenceManagerType.Registry);
-                    MockIaasServiceUtil mockIaasServiceUtil = new 
MockIaasServiceUtil(persistenceManager);
-                    mockIaasServiceUtil.startInstancesPersisted();
-
                     MockIaasService mockIaasService = new 
MockIaasServiceImpl();
                     
context.getBundleContext().registerService(MockIaasService.class.getName(), 
mockIaasService, null);
                     log.info("Mock IaaS service registered");

http://git-wip-us.apache.org/repos/asf/stratos/blob/5344c394/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockIaasServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockIaasServiceImpl.java
 
b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockIaasServiceImpl.java
index 2090199..106ece7 100644
--- 
a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockIaasServiceImpl.java
+++ 
b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockIaasServiceImpl.java
@@ -21,7 +21,6 @@ package org.apache.stratos.mock.iaas.services.impl;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.common.threading.StratosThreadPool;
 import org.apache.stratos.mock.iaas.domain.MockInstanceContext;
 import org.apache.stratos.mock.iaas.domain.MockInstanceMetadata;
 import org.apache.stratos.mock.iaas.exceptions.MockIaasException;
@@ -32,12 +31,8 @@ import org.apache.stratos.mock.iaas.services.MockIaasService;
 import 
org.apache.stratos.mock.iaas.statistics.generator.MockHealthStatisticsGenerator;
 import org.wso2.carbon.registry.core.exceptions.RegistryException;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
 
 /**
  * Mock IaaS service implementation. This is a singleton class that simulates 
a standard Infrastructure as a Service
@@ -52,14 +47,9 @@ import java.util.concurrent.ExecutorService;
 public class MockIaasServiceImpl implements MockIaasService {
 
     private static final Log log = 
LogFactory.getLog(MockIaasServiceImpl.class);
-
-    private static final ExecutorService mockMemberExecutorService = 
StratosThreadPool
-            .getExecutorService(MockConstants.MOCK_MEMBER_THREAD_POOL, 
MockConstants.MOCK_MEMBER_THREAD_POOL_SIZE);
-    private static volatile MockIaasServiceImpl instance;
-
     private PersistenceManager persistenceManager;
     private MockIaasServiceUtil mockIaasServiceUtil;
-    private Map<String, MockInstance> instanceIdToMockInstanceMap; // 
Map<InstanceId,MockInstance>
+    private Map<String, MockInstance> instanceIdToMockInstanceMap;
 
     /**
      * Default public constructor
@@ -71,8 +61,8 @@ public class MockIaasServiceImpl implements MockIaasService {
             PersistenceManagerType persistenceManagerType = 
PersistenceManagerType.valueOf(persistenceManagerTypeStr);
             persistenceManager = 
PersistenceManagerFactory.getPersistenceManager(persistenceManagerType);
             mockIaasServiceUtil = new MockIaasServiceUtil(persistenceManager);
-
             instanceIdToMockInstanceMap = 
mockIaasServiceUtil.readFromRegistry();
+            startPersistedMockInstances();
         } catch (RegistryException e) {
             String message = "Could not read service name -> mock member map 
from registry";
             log.error(message, e);
@@ -84,6 +74,23 @@ public class MockIaasServiceImpl implements MockIaasService {
         }
     }
 
+    private void startPersistedMockInstances() throws RegistryException {
+        if (instanceIdToMockInstanceMap != null) {
+            log.info("Starting mock instances persisted...");
+            Set<String> serviceNameSet = new HashSet<String>();
+            for (MockInstance mockInstance : 
instanceIdToMockInstanceMap.values()) {
+                mockInstance.initialize();
+
+                // Schedule statistics updater tasks for service
+                String serviceName = 
mockInstance.getMockInstanceContext().getServiceName();
+                if (!serviceNameSet.contains(serviceName)) {
+                    
MockHealthStatisticsGenerator.getInstance().scheduleStatisticsUpdaterTasks(serviceName);
+                    serviceNameSet.add(serviceName);
+                }
+            }
+        }
+    }
+
     /**
      * Start mock instance.
      *
@@ -104,40 +111,24 @@ public class MockIaasServiceImpl implements 
MockIaasService {
 
                 MockInstance mockInstance = new 
MockInstance(mockInstanceContext);
                 instanceIdToMockInstanceMap.put(instanceId, mockInstance);
-                mockMemberExecutorService.submit(mockInstance);
+                mockInstance.initialize();
 
                 // Persist changes
                 mockIaasServiceUtil
                         .persistInRegistry((ConcurrentHashMap<String, 
MockInstance>) instanceIdToMockInstanceMap);
-
                 String serviceName = mockInstanceContext.getServiceName();
                 
MockHealthStatisticsGenerator.getInstance().scheduleStatisticsUpdaterTasks(serviceName);
-
-                // Simulate instance creation time
-                sleep(2000);
-
                 return new MockInstanceMetadata(mockInstanceContext);
             }
         } catch (Exception e) {
-            String msg = "Could not start mock instance: " + 
mockInstanceContext.getMemberId();
+            String msg = String
+                    .format("Could not start mock instance: [member-id] %s", 
mockInstanceContext.getMemberId());
             log.error(msg, e);
             throw new MockIaasException(msg, e);
         }
     }
 
     /**
-     * Sleep the current thread for a given period of time
-     *
-     * @param time time in milliseconds
-     */
-    private void sleep(int time) {
-        try {
-            Thread.sleep(time);
-        } catch (InterruptedException ignore) {
-        }
-    }
-
-    /**
      * Get mock instances.
      *
      * @return a list of mock instance metadata objects
@@ -199,12 +190,10 @@ public class MockIaasServiceImpl implements 
MockIaasService {
     public void terminateInstance(String instanceId) {
         try {
             synchronized (MockIaasServiceImpl.class) {
-                log.info(String.format("Terminating instance: [instance-id] 
%s", instanceId));
-
+                log.info(String.format("Terminating mock instance: 
[instance-id] %s", instanceId));
                 MockInstance mockInstance = 
instanceIdToMockInstanceMap.get(instanceId);
                 if (mockInstance != null) {
                     String serviceName = 
mockInstance.getMockInstanceContext().getServiceName();
-
                     mockInstance.terminate();
                     instanceIdToMockInstanceMap.remove(instanceId);
                     mockIaasServiceUtil
@@ -214,13 +203,13 @@ public class MockIaasServiceImpl implements 
MockIaasService {
                         
MockHealthStatisticsGenerator.getInstance().stopStatisticsUpdaterTasks(serviceName);
                     }
 
-                    log.info(String.format("Instance terminated successfully: 
[instance-id] %s", instanceId));
+                    log.info(String.format("Mock instance terminated 
successfully: [instance-id] %s", instanceId));
                 } else {
-                    log.warn(String.format("Instance not found: [instance-id] 
%s", instanceId));
+                    log.warn(String.format("Mock instance not found: 
[instance-id] %s", instanceId));
                 }
             }
         } catch (Exception e) {
-            String msg = "Could not terminate mock instance: " + instanceId;
+            String msg = String.format("Could not terminate mock instance: 
[instance-id] %s", instanceId);
             log.error(msg, e);
         }
     }

http://git-wip-us.apache.org/repos/asf/stratos/blob/5344c394/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockIaasServiceUtil.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockIaasServiceUtil.java
 
b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockIaasServiceUtil.java
index cbdc090..c950528 100644
--- 
a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockIaasServiceUtil.java
+++ 
b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockIaasServiceUtil.java
@@ -57,27 +57,4 @@ public class MockIaasServiceUtil {
     public ConcurrentHashMap<String, MockInstance> readFromRegistry() throws 
RegistryException {
         return (ConcurrentHashMap<String, MockInstance>) 
persistenceManager.read(MOCK_IAAS_MEMBERS);
     }
-
-    public void startInstancesPersisted() throws RegistryException {
-        Map<String, MockInstance> instanceIdToMockMemberMap = 
readFromRegistry();
-        ExecutorService mockMemberExecutorService =
-                
StratosThreadPool.getExecutorService(MockConstants.MOCK_MEMBER_THREAD_POOL,
-                        MockConstants.MOCK_MEMBER_THREAD_POOL_SIZE);
-
-        if (instanceIdToMockMemberMap != null) {
-            log.info("Starting mock instances persisted...");
-
-            Set<String> serviceNameSet = new HashSet<String>();
-            for (MockInstance mockInstance : 
instanceIdToMockMemberMap.values()) {
-                mockMemberExecutorService.submit(mockInstance);
-
-                // Schedule statistics updater tasks for service
-                String serviceName = 
mockInstance.getMockInstanceContext().getServiceName();
-                if (!serviceNameSet.contains(serviceName)) {
-                    
MockHealthStatisticsGenerator.getInstance().scheduleStatisticsUpdaterTasks(serviceName);
-                    serviceNameSet.add(serviceName);
-                }
-            }
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/5344c394/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java
 
b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java
index a8f3bcc..7b31861 100644
--- 
a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java
+++ 
b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java
@@ -21,20 +21,26 @@ package org.apache.stratos.mock.iaas.services.impl;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.common.domain.NameValuePair;
 import org.apache.stratos.common.threading.StratosThreadPool;
+import org.apache.stratos.messaging.domain.topology.MemberStatus;
 import org.apache.stratos.messaging.event.Event;
 import 
org.apache.stratos.messaging.event.instance.notifier.InstanceCleanupClusterEvent;
 import 
org.apache.stratos.messaging.event.instance.notifier.InstanceCleanupMemberEvent;
+import org.apache.stratos.messaging.event.topology.MemberInitializedEvent;
+import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent;
+import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
 import 
org.apache.stratos.messaging.listener.instance.notifier.InstanceCleanupClusterEventListener;
 import 
org.apache.stratos.messaging.listener.instance.notifier.InstanceCleanupMemberEventListener;
+import 
org.apache.stratos.messaging.listener.topology.MemberInitializedEventListener;
+import 
org.apache.stratos.messaging.listener.topology.MemberMaintenanceListener;
+import 
org.apache.stratos.messaging.listener.topology.MemberStartedEventListener;
 import 
org.apache.stratos.messaging.message.receiver.instance.notifier.InstanceNotifierEventReceiver;
+import 
org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
 import org.apache.stratos.mock.iaas.domain.MockInstanceContext;
 import org.apache.stratos.mock.iaas.event.publisher.MockMemberEventPublisher;
 import 
org.apache.stratos.mock.iaas.statistics.publisher.MockHealthStatisticsNotifier;
 
 import java.io.Serializable;
-import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -44,41 +50,117 @@ import java.util.concurrent.atomic.AtomicBoolean;
 /**
  * Mock member instance definition.
  */
-public class MockInstance implements Runnable, Serializable {
+public class MockInstance implements Serializable {
     private static final Log log = LogFactory.getLog(MockInstance.class);
     private static final int HEALTH_STAT_INTERVAL = 15; // 15 seconds
-    private final MockInstanceContext mockInstanceContext;
+
     private transient ScheduledFuture<?> healthStatNotifierScheduledFuture;
     private transient InstanceNotifierEventReceiver 
instanceNotifierEventReceiver;
+    private transient TopologyEventReceiver topologyEventReceiver;
+    private transient MockHealthStatisticsNotifier 
mockHealthStatisticsNotifier;
+
+    // this is the mock iaas instance runtime status, do not persist this state
+    private transient MemberStatus memberStatus = MemberStatus.Created;
+
+    private final MockInstanceContext mockInstanceContext;
+    private final AtomicBoolean hasGracefullyShutdown = new 
AtomicBoolean(false);
+
     private static final ExecutorService eventListenerExecutorService = 
StratosThreadPool
-            .getExecutorService("mock.iaas.event.listener.thread.pool", 20);
+            .getExecutorService("mock.iaas.event.listener.thread.pool", 100);
     private static final ScheduledExecutorService 
healthStatNotifierExecutorService = StratosThreadPool
-            
.getScheduledExecutorService("mock.iaas.health.statistics.notifier.thread.pool",
 20);
-    AtomicBoolean hasGracefullyShutdown = new AtomicBoolean(false);
+            
.getScheduledExecutorService("mock.iaas.health.statistics.notifier.thread.pool",
 100);
 
     public MockInstance(MockInstanceContext mockInstanceContext) {
         this.mockInstanceContext = mockInstanceContext;
     }
 
-    @Override
-    public void run() {
-        if (log.isInfoEnabled()) {
-            log.info(String.format("Mock member started: [member-id] %s", 
mockInstanceContext.getMemberId()));
+    public synchronized void initialize() {
+        if (MemberStatus.Created.equals(memberStatus) || memberStatus == null) 
{
+            startTopologyEventReceiver();
+            startInstanceNotifierEventReceiver();
+            startHealthStatisticsPublisher();
+            memberStatus = MemberStatus.Initialized;
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Mock instance initialized: [member-id] 
%s", mockInstanceContext.getMemberId()));
+            }
+        } else {
+            if (log.isInfoEnabled()) {
+                log.info(String.format(
+                        "Mock instance cannot be initialized since it is not 
in created state: [member-id] %s [status] "
+                                + "%s", mockInstanceContext.getMemberId(), 
memberStatus));
+            }
         }
-        sleep(5000);
-        
MockMemberEventPublisher.publishInstanceStartedEvent(mockInstanceContext);
-        sleep(5000);
-        
MockMemberEventPublisher.publishInstanceActivatedEvent(mockInstanceContext);
-        startInstanceNotifierReceiver();
-        startHealthStatisticsPublisher();
     }
 
-    private void startInstanceNotifierReceiver() {
+    private void startHealthStatisticsPublisher() {
+        mockHealthStatisticsNotifier = new 
MockHealthStatisticsNotifier(mockInstanceContext);
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Starting health statistics notifier: 
[member-id] %s",
+                    mockInstanceContext.getMemberId()));
+        }
+        healthStatNotifierScheduledFuture = healthStatNotifierExecutorService
+                .scheduleAtFixedRate(mockHealthStatisticsNotifier, 0, 
HEALTH_STAT_INTERVAL, TimeUnit.SECONDS);
+
         if (log.isDebugEnabled()) {
-            log.debug("Starting instance notifier event message receiver for 
mock member [member-id] "
-                            + mockInstanceContext.getMemberId());
+            log.debug(String.format("Health statistics notifier started: 
[member-id] %s",
+                    mockInstanceContext.getMemberId()));
         }
+    }
 
+    private void startTopologyEventReceiver() {
+        topologyEventReceiver = new TopologyEventReceiver();
+        topologyEventReceiver.addEventListener(new 
MemberInitializedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                MemberInitializedEvent memberInitializedEvent = 
(MemberInitializedEvent) event;
+                if 
(memberInitializedEvent.getMemberId().equals(mockInstanceContext.getMemberId()))
 {
+                    
MockMemberEventPublisher.publishInstanceStartedEvent(mockInstanceContext);
+
+                    if (log.isInfoEnabled()) {
+                        log.info(String.format("Mock member started event 
published for [member-id] %s",
+                                mockInstanceContext.getMemberId()));
+                    }
+                }
+            }
+        });
+        topologyEventReceiver.addEventListener(new 
MemberStartedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                MemberStartedEvent memberStartedEvent = (MemberStartedEvent) 
event;
+                if 
(memberStartedEvent.getMemberId().equals(mockInstanceContext.getMemberId())) {
+                    
MockMemberEventPublisher.publishInstanceActivatedEvent(mockInstanceContext);
+
+                    if (log.isInfoEnabled()) {
+                        log.info(String.format("Mock member activated event 
published for [member-id] %s",
+                                mockInstanceContext.getMemberId()));
+                    }
+                }
+            }
+        });
+        topologyEventReceiver.addEventListener(new MemberMaintenanceListener() 
{
+            @Override
+            protected void onEvent(Event event) {
+                MemberMaintenanceModeEvent memberMaintenanceModeEvent = 
(MemberMaintenanceModeEvent) event;
+                if 
(memberMaintenanceModeEvent.getMemberId().equals(mockInstanceContext.getMemberId()))
 {
+                    
MockMemberEventPublisher.publishInstanceReadyToShutdownEvent(mockInstanceContext);
+                    hasGracefullyShutdown.set(true);
+                    if (log.isInfoEnabled()) {
+                        log.info(String.format("Mock member ready to shutdown 
event published for [member-id] %s",
+                                mockInstanceContext.getMemberId()));
+                    }
+                }
+            }
+        });
+        topologyEventReceiver.setExecutorService(eventListenerExecutorService);
+        topologyEventReceiver.execute();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format(
+                    "Mock instance topology event message receiver started for 
mock member [member-id] %s",
+                    mockInstanceContext.getMemberId()));
+        }
+    }
+
+    private void startInstanceNotifierEventReceiver() {
         instanceNotifierEventReceiver = new InstanceNotifierEventReceiver();
         instanceNotifierEventReceiver.addEventListener(new 
InstanceCleanupClusterEventListener() {
             @Override
@@ -101,25 +183,24 @@ public class MockInstance implements Runnable, 
Serializable {
                 }
             }
         });
-
+        // TODO: Fix InstanceNotifierEventReceiver to use executor service
+        // do not remove this since execute() is a blocking call
         eventListenerExecutorService.submit(new Runnable() {
             @Override
             public void run() {
                 instanceNotifierEventReceiver.execute();
             }
         });
-
         if (log.isDebugEnabled()) {
-            log.debug("Instance notifier event message receiver started");
+            log.debug(String.format(
+                    "Mock instance instance notifier event message receiver 
started for mock member [member-id] %s",
+                    mockInstanceContext.getMemberId()));
         }
     }
 
     private void handleMemberTermination() {
         if (!hasGracefullyShutdown.get()) {
             
MockMemberEventPublisher.publishMaintenanceModeEvent(mockInstanceContext);
-            sleep(5000);
-            
MockMemberEventPublisher.publishInstanceReadyToShutdownEvent(mockInstanceContext);
-            hasGracefullyShutdown.set(true);
         } else {
             if (log.isDebugEnabled()) {
                 log.debug(String.format("Mock instance is already gracefully 
shutdown [member-id] %s",
@@ -128,50 +209,36 @@ public class MockInstance implements Runnable, 
Serializable {
         }
     }
 
-    private void startHealthStatisticsPublisher() {
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Starting health statistics notifier: 
[member-id] %s",
-                    mockInstanceContext.getMemberId()));
-        }
-
-        healthStatNotifierScheduledFuture = healthStatNotifierExecutorService
-                .scheduleAtFixedRate(new 
MockHealthStatisticsNotifier(mockInstanceContext), 0, HEALTH_STAT_INTERVAL,
-                        TimeUnit.SECONDS);
-
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Health statistics notifier started: 
[member-id] %s",
-                    mockInstanceContext.getMemberId()));
-        }
-    }
-
     private void stopHealthStatisticsPublisher() {
-        if (healthStatNotifierScheduledFuture != null) {
-            healthStatNotifierScheduledFuture.cancel(true);
-        }
+        healthStatNotifierScheduledFuture.cancel(true);
     }
 
     private void stopInstanceNotifierReceiver() {
-        if (instanceNotifierEventReceiver != null) {
-            instanceNotifierEventReceiver.terminate();
-        }
-    }
-
-    private void sleep(long time) {
-        try {
-            Thread.sleep(time);
-        } catch (InterruptedException ignore) {
-        }
+        instanceNotifierEventReceiver.terminate();
     }
 
     public MockInstanceContext getMockInstanceContext() {
         return mockInstanceContext;
     }
 
-    public void terminate() {
-        stopInstanceNotifierReceiver();
-        stopHealthStatisticsPublisher();
-        if (log.isInfoEnabled()) {
-            log.info(String.format("Mock member terminated: [member-id] %s", 
mockInstanceContext.getMemberId()));
+    public synchronized void terminate() {
+        if (MemberStatus.Initialized.equals(memberStatus)) {
+            stopInstanceNotifierReceiver();
+            stopHealthStatisticsPublisher();
+            memberStatus = MemberStatus.Terminated;
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Mock instance stopped: [member-id] 
%s", mockInstanceContext.getMemberId()));
+            }
+        } else {
+            if (log.isInfoEnabled()) {
+                log.info(String.format(
+                        "Mock instance cannot be terminated since it is not in 
initialized state: [member-id] %s ",
+                        mockInstanceContext.getMemberId()));
+            }
         }
     }
+
+    public MockHealthStatisticsNotifier getMockHealthStatisticsNotifier() {
+        return mockHealthStatisticsNotifier;
+    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/5344c394/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java
 
b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java
index c2d1c6c..4f3de51 100644
--- 
a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java
+++ 
b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java
@@ -42,12 +42,12 @@ public class MockHealthStatisticsNotifier implements 
Runnable {
 
     public MockHealthStatisticsNotifier(MockInstanceContext mockMemberContext) 
{
         this.mockMemberContext = mockMemberContext;
-        this.healthStatisticsPublisher = 
HealthStatisticsPublisherFactory.createHealthStatisticsPublisher(
-                StatisticsPublisherType.WSO2CEP);
+        this.healthStatisticsPublisher = HealthStatisticsPublisherFactory
+                
.createHealthStatisticsPublisher(StatisticsPublisherType.WSO2CEP);
         this.healthStatisticsPublisher.setEnabled(true);
 
-        this.inFlightRequestPublisher = 
InFlightRequestPublisherFactory.createInFlightRequestPublisher(
-                StatisticsPublisherType.WSO2CEP);
+        this.inFlightRequestPublisher = InFlightRequestPublisherFactory
+                
.createInFlightRequestPublisher(StatisticsPublisherType.WSO2CEP);
         this.inFlightRequestPublisher.setEnabled(true);
     }
 
@@ -61,22 +61,17 @@ public class MockHealthStatisticsNotifier implements 
Runnable {
         }
 
         try {
-            double memoryConsumption = 
MockHealthStatistics.getInstance().getStatistics(
-                    mockMemberContext.getServiceName(), 
MockScalingFactor.MemoryConsumption);
+            double memoryConsumption = MockHealthStatistics.getInstance()
+                    .getStatistics(mockMemberContext.getServiceName(), 
MockScalingFactor.MemoryConsumption);
 
             if (log.isDebugEnabled()) {
                 log.debug(String.format("Publishing memory consumption: 
[member-id] %s [value] %f",
                         mockMemberContext.getMemberId(), memoryConsumption));
             }
-            healthStatisticsPublisher.publish(
-                    mockMemberContext.getClusterId(),
-                    mockMemberContext.getClusterInstanceId(),
-                    mockMemberContext.getNetworkPartitionId(),
-                    mockMemberContext.getMemberId(),
-                    mockMemberContext.getPartitionId(),
-                    MEMORY_CONSUMPTION,
-                    memoryConsumption
-            );
+            healthStatisticsPublisher
+                    .publish(mockMemberContext.getClusterId(), 
mockMemberContext.getClusterInstanceId(),
+                            mockMemberContext.getNetworkPartitionId(), 
mockMemberContext.getMemberId(),
+                            mockMemberContext.getPartitionId(), 
MEMORY_CONSUMPTION, memoryConsumption);
         } catch (NoStatisticsFoundException ignore) {
         } catch (Exception e) {
             if (log.isErrorEnabled()) {
@@ -84,23 +79,17 @@ public class MockHealthStatisticsNotifier implements 
Runnable {
             }
         }
 
-
         try {
-            double loadAvereage = 
MockHealthStatistics.getInstance().getStatistics(
-                    mockMemberContext.getServiceName(), 
MockScalingFactor.LoadAverage);
+            double loadAvereage = MockHealthStatistics.getInstance()
+                    .getStatistics(mockMemberContext.getServiceName(), 
MockScalingFactor.LoadAverage);
             if (log.isDebugEnabled()) {
                 log.debug(String.format("Publishing load average: [member-id] 
%s [value] %f",
                         mockMemberContext.getMemberId(), loadAvereage));
             }
-            healthStatisticsPublisher.publish(
-                    mockMemberContext.getClusterId(),
-                    mockMemberContext.getClusterInstanceId(),
-                    mockMemberContext.getNetworkPartitionId(),
-                    mockMemberContext.getMemberId(),
-                    mockMemberContext.getPartitionId(),
-                    LOAD_AVERAGE,
-                    loadAvereage
-            );
+            healthStatisticsPublisher
+                    .publish(mockMemberContext.getClusterId(), 
mockMemberContext.getClusterInstanceId(),
+                            mockMemberContext.getNetworkPartitionId(), 
mockMemberContext.getMemberId(),
+                            mockMemberContext.getPartitionId(), LOAD_AVERAGE, 
loadAvereage);
         } catch (NoStatisticsFoundException ignore) {
         } catch (Exception e) {
             if (log.isErrorEnabled()) {
@@ -109,17 +98,14 @@ public class MockHealthStatisticsNotifier implements 
Runnable {
         }
 
         try {
-            int requestsInFlight = 
MockHealthStatistics.getInstance().getStatistics(
-                    mockMemberContext.getServiceName(), 
MockScalingFactor.RequestsInFlight);
+            int requestsInFlight = MockHealthStatistics.getInstance()
+                    .getStatistics(mockMemberContext.getServiceName(), 
MockScalingFactor.RequestsInFlight);
             if (log.isDebugEnabled()) {
-                log.debug(String.format("Publishing requests in flight: 
[member-id] %s [value] %f",
+                log.debug(String.format("Publishing requests in flight: 
[member-id] %s [value] %d",
                         mockMemberContext.getMemberId(), requestsInFlight));
             }
-            inFlightRequestPublisher.publish(
-                    mockMemberContext.getClusterId(),
-                    mockMemberContext.getClusterInstanceId(),
-                    mockMemberContext.getNetworkPartitionId(),
-                    requestsInFlight);
+            inFlightRequestPublisher.publish(mockMemberContext.getClusterId(), 
mockMemberContext.getClusterInstanceId(),
+                    mockMemberContext.getNetworkPartitionId(), 
requestsInFlight);
         } catch (NoStatisticsFoundException ignore) {
         } catch (Exception e) {
             if (log.isErrorEnabled()) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/5344c394/components/org.apache.stratos.mock.iaas/src/test/java/org/apache/stratos/mock/iaas/test/MockIaasServiceTest.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.mock.iaas/src/test/java/org/apache/stratos/mock/iaas/test/MockIaasServiceTest.java
 
b/components/org.apache.stratos.mock.iaas/src/test/java/org/apache/stratos/mock/iaas/test/MockIaasServiceTest.java
index 8b7aa1f..c9c0478 100644
--- 
a/components/org.apache.stratos.mock.iaas/src/test/java/org/apache/stratos/mock/iaas/test/MockIaasServiceTest.java
+++ 
b/components/org.apache.stratos.mock.iaas/src/test/java/org/apache/stratos/mock/iaas/test/MockIaasServiceTest.java
@@ -19,16 +19,20 @@
 
 package org.apache.stratos.mock.iaas.test;
 
+import org.apache.activemq.broker.BrokerService;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.statistics.publisher.ThriftClientConfig;
 import org.apache.stratos.mock.iaas.config.MockIaasConfig;
 import org.apache.stratos.mock.iaas.domain.MockInstanceContext;
 import org.apache.stratos.mock.iaas.domain.MockInstanceMetadata;
 import org.apache.stratos.mock.iaas.persistence.PersistenceManagerType;
 import org.apache.stratos.mock.iaas.services.impl.MockConstants;
 import org.apache.stratos.mock.iaas.services.impl.MockIaasServiceImpl;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.io.File;
 import java.net.URL;
 import java.util.List;
 
@@ -41,58 +45,93 @@ public class MockIaasServiceTest {
 
     private static Log log = LogFactory.getLog(MockIaasServiceTest.class);
     private static final String CONFIG_FILE_PATH = "/mock-iaas.xml";
+    private static BrokerService broker;
+
+    @BeforeClass
+    public static void init() {
+        URL configFileUrl = 
MockIaasServiceTest.class.getResource(CONFIG_FILE_PATH);
+        System.setProperty(MockIaasConfig.MOCK_IAAS_CONFIG_FILE_PATH, 
configFileUrl.getPath());
+        System.setProperty(MockConstants.PERSISTENCE_MANAGER_TYPE, 
PersistenceManagerType.Mock.toString());
+        System.setProperty(ThriftClientConfig.THRIFT_CLIENT_CONFIG_FILE_PATH,
+                
MockIaasServiceTest.class.getResource("/thrift-client-config.xml").getPath());
+        System.setProperty("carbon.home", 
MockIaasServiceTest.class.getResource("/").getPath());
+        System.setProperty("jndi.properties.dir", 
MockIaasServiceTest.class.getResource("/").getPath());
+        initializeActiveMQ();
+        startActiveMQ();
+    }
 
-    @Test
-    public void testStartInstance() {
-
+    private static void initializeActiveMQ() {
         try {
-            URL configFileUrl = getClass().getResource(CONFIG_FILE_PATH);
-            System.setProperty(MockIaasConfig.MOCK_IAAS_CONFIG_FILE_PATH, 
configFileUrl.getPath());
-            System.setProperty(MockConstants.PERSISTENCE_MANAGER_TYPE, 
PersistenceManagerType.Mock.toString());
-
-            MockIaasServiceImpl mockIaasService = new MockIaasServiceImpl();
-            MockInstanceContext mockInstanceContext = new 
MockInstanceContext("app1", "service1", "cluster1", "member1",
-                    "cluster-instance1", "network-p1", "p1");
-            MockInstanceMetadata metadata = 
mockIaasService.startInstance(mockInstanceContext);
-            assertNotNull("Could not start mock instance", metadata);
-            assertNotNull("Mock instance not found", 
mockIaasService.getInstance(metadata.getInstanceId()));
+            log.info("Initializing ActiveMQ...");
+            broker = new BrokerService();
+            
broker.setDataDirectory(MockIaasServiceTest.class.getResource("/").getPath() +
+                    File.separator + ".." + File.separator + "activemq-data");
+            broker.setBrokerName("testBroker");
+            broker.addConnector("tcp://localhost:61617");
         } catch (Exception e) {
-            log.error(e);
-            assertTrue(e.getMessage(), false);
+            throw new RuntimeException("Could not initialize ActiveMQ", e);
         }
     }
 
-    @Test
-    public void testGetInstances() {
+    private static void startActiveMQ() {
+        try {
+            long time1 = System.currentTimeMillis();
+            broker.start();
+            long time2 = System.currentTimeMillis();
+            log.info(String.format("ActiveMQ started in %d sec", (time2 - 
time1) / 1000));
+        } catch (Exception e) {
+            throw new RuntimeException("Could not start ActiveMQ", e);
+        }
+    }
 
+    private void stopActiveMQ() {
         try {
-            URL configFileUrl = getClass().getResource(CONFIG_FILE_PATH);
-            System.setProperty(MockIaasConfig.MOCK_IAAS_CONFIG_FILE_PATH, 
configFileUrl.getPath());
-            System.setProperty(MockConstants.PERSISTENCE_MANAGER_TYPE, 
PersistenceManagerType.Mock.toString());
-
-            MockIaasServiceImpl mockIaasService = new MockIaasServiceImpl();
-            MockInstanceContext mockInstanceContext = new 
MockInstanceContext("app1", "service1", "cluster1", "member1",
-                    "cluster-instance1", "network-p1", "p1");
-            MockInstanceMetadata metadata1 = 
mockIaasService.startInstance(mockInstanceContext);
-            assertNotNull("Could not start mock instance", metadata1);
-            assertNotNull("Mock instance not found", 
mockIaasService.getInstance(metadata1.getInstanceId()));
-
-            mockInstanceContext = new MockInstanceContext("app1", "service1", 
"cluster1", "member2",
-                    "cluster-instance1", "network-p1", "p1");
-            MockInstanceMetadata metadata2 = 
mockIaasService.startInstance(mockInstanceContext);
-            assertNotNull("Could not start mock instance", metadata2);
-            assertNotNull("Mock instance not found", 
mockIaasService.getInstance(metadata2.getInstanceId()));
-
-            List<MockInstanceMetadata> instances = 
mockIaasService.getInstances();
-            assertNotNull(instances);
-            assertTrue("Mock instance 1 not found in get instances result", 
instanceExist(instances, metadata1.getInstanceId()));
-            assertTrue("Mock instance 2 not found in get instances result", 
instanceExist(instances, metadata2.getInstanceId()));
+            broker.stop();
         } catch (Exception e) {
-            log.error(e);
-            assertTrue(e.getMessage(), false);
+            throw new RuntimeException("Could not stop ActiveMQ", e);
+        }
+    }
+
+    private void sleep(long time) {
+        try {
+            Thread.sleep(time);
+        } catch (InterruptedException ignore) {
         }
     }
 
+    @Test
+    public void testStartInstance() throws Exception {
+        MockIaasServiceImpl mockIaasService = new MockIaasServiceImpl();
+        MockInstanceContext mockInstanceContext = new 
MockInstanceContext("app1", "service1", "cluster1", "member1",
+                "cluster-instance1", "network-p1", "p1");
+        MockInstanceMetadata metadata = 
mockIaasService.startInstance(mockInstanceContext);
+        assertNotNull("Could not start mock instance", metadata);
+        assertNotNull("Mock instance not found", 
mockIaasService.getInstance(metadata.getInstanceId()));
+    }
+
+    @Test
+    public void testGetInstances() throws Exception {
+        MockIaasServiceImpl mockIaasService = new MockIaasServiceImpl();
+        MockInstanceContext mockInstanceContext = new 
MockInstanceContext("app1", "service1", "cluster1", "member1",
+                "cluster-instance1", "network-p1", "p1");
+        MockInstanceMetadata metadata1 = 
mockIaasService.startInstance(mockInstanceContext);
+        assertNotNull("Could not start mock instance", metadata1);
+        assertNotNull("Mock instance not found", 
mockIaasService.getInstance(metadata1.getInstanceId()));
+
+        mockInstanceContext = new MockInstanceContext("app1", "service1", 
"cluster1", "member2", "cluster-instance1",
+                "network-p1", "p1");
+        MockInstanceMetadata metadata2 = 
mockIaasService.startInstance(mockInstanceContext);
+        assertNotNull("Could not start mock instance", metadata2);
+        assertNotNull("Mock instance not found", 
mockIaasService.getInstance(metadata2.getInstanceId()));
+
+        List<MockInstanceMetadata> instances = mockIaasService.getInstances();
+        assertNotNull(instances);
+        assertTrue("Mock instance 1 not found in get instances result",
+                instanceExist(instances, metadata1.getInstanceId()));
+        assertTrue("Mock instance 2 not found in get instances result",
+                instanceExist(instances, metadata2.getInstanceId()));
+    }
+
     private boolean instanceExist(List<MockInstanceMetadata> instances, String 
instanceId) {
         for (MockInstanceMetadata instance : instances) {
             if (instance.getInstanceId().equals(instanceId)) {
@@ -103,25 +142,15 @@ public class MockIaasServiceTest {
     }
 
     @Test
-    public void testTerminateInstance() {
-
-        try {
-            URL configFileUrl = getClass().getResource(CONFIG_FILE_PATH);
-            System.setProperty(MockIaasConfig.MOCK_IAAS_CONFIG_FILE_PATH, 
configFileUrl.getPath());
-            System.setProperty(MockConstants.PERSISTENCE_MANAGER_TYPE, 
PersistenceManagerType.Mock.toString());
-
-            MockIaasServiceImpl mockIaasService = new MockIaasServiceImpl();
-            MockInstanceContext mockInstanceContext = new 
MockInstanceContext("app1", "service1", "cluster1", "member1",
-                    "cluster-instance1", "network-p1", "p1");
-            MockInstanceMetadata metadata = 
mockIaasService.startInstance(mockInstanceContext);
-            assertNotNull("Could not start mock instance", metadata);
-            assertNotNull("Mock instance not found", 
mockIaasService.getInstance(metadata.getInstanceId()));
-
-            mockIaasService.terminateInstance(metadata.getInstanceId());
-            assertNull("Could not terminate mock instance", 
mockIaasService.getInstance(metadata.getInstanceId()));
-        } catch (Exception e) {
-            log.error(e);
-            assertTrue(e.getMessage(), false);
-        }
+    public void testTerminateInstance() throws Exception {
+        MockIaasServiceImpl mockIaasService = new MockIaasServiceImpl();
+        MockInstanceContext mockInstanceContext = new 
MockInstanceContext("app1", "service1", "cluster1", "member1",
+                "cluster-instance1", "network-p1", "p1");
+        MockInstanceMetadata metadata = 
mockIaasService.startInstance(mockInstanceContext);
+        assertNotNull("Could not start mock instance", metadata);
+        assertNotNull("Mock instance not found", 
mockIaasService.getInstance(metadata.getInstanceId()));
+
+        mockIaasService.terminateInstance(metadata.getInstanceId());
+        assertNull("Could not terminate mock instance", 
mockIaasService.getInstance(metadata.getInstanceId()));
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/5344c394/components/org.apache.stratos.mock.iaas/src/test/resources/jndi.properties
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.mock.iaas/src/test/resources/jndi.properties 
b/components/org.apache.stratos.mock.iaas/src/test/resources/jndi.properties
new file mode 100644
index 0000000..beefe3c
--- /dev/null
+++ b/components/org.apache.stratos.mock.iaas/src/test/resources/jndi.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+connectionfactoryName=TopicConnectionFactory
+java.naming.provider.url=tcp://localhost:61617
+java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory

http://git-wip-us.apache.org/repos/asf/stratos/blob/5344c394/components/org.apache.stratos.mock.iaas/src/test/resources/thrift-client-config.xml
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.mock.iaas/src/test/resources/thrift-client-config.xml
 
b/components/org.apache.stratos.mock.iaas/src/test/resources/thrift-client-config.xml
new file mode 100644
index 0000000..f828e0d
--- /dev/null
+++ 
b/components/org.apache.stratos.mock.iaas/src/test/resources/thrift-client-config.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<!-- Apache thrift client configuration for publishing statistics to WSO2 CEP 
and WSO2 DAS -->
+<thriftClientConfiguration>
+     <config>
+        <cep>
+             <node id="node-01">
+                  <statsPublisherEnabled>true</statsPublisherEnabled>
+                  <username>admincep1</username>
+                  <password>1234cep1</password>
+                  <ip>192.168.10.10</ip>
+                  <port>9300</port>
+             </node>
+             <node id="node-02">
+                  <statsPublisherEnabled>true</statsPublisherEnabled>
+                  <username>admincep2</username>
+                  <password>1234cep2</password>
+                  <ip>192.168.10.20</ip>
+                  <port>9300</port>
+             </node>
+        </cep>
+        <das>
+             <node id="node-01">
+                  <statsPublisherEnabled>true</statsPublisherEnabled>
+                  <username>admindas1</username>
+                  <password>1234das1</password>
+                  <ip>192.168.10.11</ip>
+                  <port>9301</port>
+             </node>
+        </das>
+    </config>  
+</thriftClientConfiguration>
\ No newline at end of file

Reply via email to