Adding executor service for threads and remove unnecessary threads

Conflicts:
        
components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/4e733930
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/4e733930
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/4e733930

Branch: refs/heads/master
Commit: 4e733930ba33a87c78cd398a2573f6b545a8d9f0
Parents: ab1ed3c
Author: gayan <[email protected]>
Authored: Mon Dec 1 19:05:20 2014 +0530
Committer: gayan <[email protected]>
Committed: Tue Dec 2 16:43:07 2014 +0530

----------------------------------------------------------------------
 .../AutoscalerHealthStatEventReceiver.java      | 19 +++--
 .../internal/AutoscalerServerComponent.java     |  7 +-
 .../stratos/cartridge/agent/CartridgeAgent.java |  4 +-
 .../CloudControllerServiceComponent.java        | 28 ++++++++
 .../application/ApplicationTopicReceiver.java   | 12 ++++
 .../status/ClusterStatusTopicReceiver.java      | 12 ++++
 .../status/InstanceStatusTopicReceiver.java     | 10 +++
 components/org.apache.stratos.common/pom.xml    |  5 ++
 .../apache/stratos/common/util/ConfUtil.java    | 73 ++++++++++++++++++++
 .../internal/ADCManagementServerComponent.java  |  4 +-
 .../applications/ApplicationsEventReceiver.java | 26 ++++---
 .../status/ClusterStatusEventReceiver.java      | 28 ++++----
 .../health/stat/HealthStatEventReceiver.java    | 11 +++
 .../status/InstanceStatusEventReceiver.java     | 19 +++--
 14 files changed, 216 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/4e733930/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
index 0e45ee1..9e440c7 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
@@ -63,24 +63,25 @@ import 
org.apache.stratos.messaging.listener.health.stat.SecondDerivativeOfReque
 import 
org.apache.stratos.messaging.message.receiver.health.stat.HealthStatEventReceiver;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 
+import java.util.concurrent.ExecutorService;
 
 /**
  * A thread for processing topology messages and updating the topology data 
structure.
  */
-public class AutoscalerHealthStatEventReceiver implements Runnable {
+public class AutoscalerHealthStatEventReceiver {
 
     private static final Log log = 
LogFactory.getLog(AutoscalerHealthStatEventReceiver.class);
     private boolean terminated = false;
 
     private HealthStatEventReceiver healthStatEventReceiver;
+       private ExecutorService executorService;
 
     public AutoscalerHealthStatEventReceiver() {
                this.healthStatEventReceiver = new HealthStatEventReceiver();
         addEventListeners();
     }
 
-    @Override
-    public void run() {
+    public void execute() {
         //FIXME this activated before autoscaler deployer activated.
         try {
             Thread.sleep(15000);
@@ -92,11 +93,7 @@ public class AutoscalerHealthStatEventReceiver implements 
Runnable {
             log.info("Autoscaler health stat event receiver thread started");
         }
 
-        // Keep the thread live until terminated
 
-        if(log.isInfoEnabled()) {
-            log.info("Autoscaler health stat event receiver thread 
terminated");
-        }
     }
 
     private void addEventListeners() {
@@ -519,4 +516,12 @@ public class AutoscalerHealthStatEventReceiver implements 
Runnable {
     public void terminate() {
         this.terminated = true;
     }
+
+       public ExecutorService getExecutorService() {
+               return executorService;
+       }
+
+       public void setExecutorService(ExecutorService executorService) {
+               this.executorService = executorService;
+       }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4e733930/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
index 2e443de..bb5e167 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
@@ -66,13 +66,14 @@ public class AutoscalerServerComponent {
        private static final String THREAD_IDENTIFIER_KEY = 
"threadPool.autoscaler.identifier";
        private static final String DEFAULT_IDENTIFIER = "Auto-Scaler";
        private static final String THREAD_POOL_SIZE_KEY = 
"threadPool.autoscaler.threadPoolSize";
-       private static final String COMPONENTS_CONFIG = "components-config";
+       private static final String COMPONENTS_CONFIG = "stratos-config";
        private static final int THREAD_POOL_SIZE = 10;
        private static final Log log = 
LogFactory.getLog(AutoscalerServerComponent.class);
 
        private AutoscalerTopologyEventReceiver asTopologyReceiver;
        private AutoscalerHealthStatEventReceiver 
autoscalerHealthStatEventReceiver;
 
+
        protected void activate(ComponentContext componentContext) throws 
Exception {
 <<<<<<< HEAD
         try {
@@ -214,8 +215,8 @@ public class AutoscalerServerComponent {
 
                        // Start health stat receiver
                        autoscalerHealthStatEventReceiver = new 
AutoscalerHealthStatEventReceiver();
-                       Thread healthDelegatorThread = new 
Thread(autoscalerHealthStatEventReceiver);
-                       healthDelegatorThread.start();
+                       
autoscalerHealthStatEventReceiver.setExecutorService(executorService);
+                       autoscalerHealthStatEventReceiver.execute();
                        if (log.isDebugEnabled()) {
                                log.debug("Health statistics receiver thread 
started");
                        }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4e733930/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
index e275db5..53fd658 100644
--- 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
+++ 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
@@ -415,8 +415,8 @@ public class CartridgeAgent implements Runnable {
             }
         });
 
-//        Thread thread = new Thread(topologyEventReceiver);
-//        thread.start();
+           topologyEventReceiver.execute();
+
         if (log.isDebugEnabled()) {
             log.info("Cartridge Agent topology receiver thread started");
         }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4e733930/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
index 6773b4a..a413218 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
@@ -23,6 +23,7 @@ package org.apache.stratos.cloud.controller.internal;
 <<<<<<< HEAD
 <<<<<<< HEAD
 
+<<<<<<< HEAD
 import com.hazelcast.core.HazelcastInstance;
 
 =======
@@ -30,17 +31,30 @@ import com.hazelcast.core.HazelcastInstance;
 =======
 
 >>>>>>> ad3e45c... Remove unnessary threads in messaging model
+=======
+import org.apache.commons.configuration.XMLConfiguration;
+>>>>>>> 1b26a96... Adding executor service for threads and remove unnecessary 
threads
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.cloud.controller.context.CloudControllerContext;
 import 
org.apache.stratos.cloud.controller.messaging.receiver.application.ApplicationTopicReceiver;
 import 
org.apache.stratos.cloud.controller.messaging.receiver.cluster.status.ClusterStatusTopicReceiver;
 import org.apache.stratos.cloud.controller.exception.CloudControllerException;
+<<<<<<< HEAD
 import org.apache.stratos.cloud.controller.services.CloudControllerService;
 import 
org.apache.stratos.cloud.controller.services.impl.CloudControllerServiceImpl;
 import 
org.apache.stratos.cloud.controller.messaging.publisher.TopologySynchronizerTaskScheduler;
 import 
org.apache.stratos.cloud.controller.messaging.receiver.instance.status.InstanceStatusTopicReceiver;
 import org.apache.stratos.common.clustering.DistributedObjectProvider;
+=======
+import org.apache.stratos.cloud.controller.impl.CloudControllerServiceImpl;
+import org.apache.stratos.cloud.controller.interfaces.CloudControllerService;
+import 
org.apache.stratos.cloud.controller.publisher.TopologySynchronizerTaskScheduler;
+import 
org.apache.stratos.cloud.controller.receiver.instance.status.InstanceStatusTopicReceiver;
+import org.apache.stratos.cloud.controller.util.ServiceReferenceHolder;
+import org.apache.stratos.common.threading.StratosThreadPool;
+import org.apache.stratos.common.util.ConfUtil;
+>>>>>>> 1b26a96... Adding executor service for threads and remove unnecessary 
threads
 import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.util.Util;
 import org.osgi.framework.BundleContext;
@@ -52,6 +66,8 @@ import org.wso2.carbon.registry.core.service.RegistryService;
 import org.wso2.carbon.registry.core.session.UserRegistry;
 import org.wso2.carbon.utils.ConfigurationContextService;
 
+import java.util.concurrent.ExecutorService;
+
 /**
  * Registering Cloud Controller Service.
  *
@@ -90,10 +106,21 @@ public class CloudControllerServiceComponent {
        private ClusterStatusTopicReceiver clusterStatusTopicReceiver;
        private InstanceStatusTopicReceiver instanceStatusTopicReceiver;
        private ApplicationTopicReceiver applicationTopicReceiver;
+       private static final String THREAD_IDENTIFIER_KEY = 
"threadPool.autoscaler.identifier";
+       private static final String DEFAULT_IDENTIFIER = "Auto-Scaler";
+       private static final String THREAD_POOL_SIZE_KEY = 
"threadPool.autoscaler.threadPoolSize";
+       private static final String COMPONENTS_CONFIG = "stratos-config";
+       private static final int THREAD_POOL_SIZE = 10;
 
        protected void activate(ComponentContext context) {
                try {
+
+                       XMLConfiguration conf = 
ConfUtil.getInstance(COMPONENTS_CONFIG).getConfiguration();
+                       int threadPoolSize = conf.getInt(THREAD_POOL_SIZE_KEY, 
THREAD_POOL_SIZE);
+                       String threadIdentifier = 
conf.getString(THREAD_IDENTIFIER_KEY, DEFAULT_IDENTIFIER);
+                       ExecutorService executorService = 
StratosThreadPool.getExecutorService(threadIdentifier, threadPoolSize);
                        applicationTopicReceiver = new 
ApplicationTopicReceiver();
+                       
applicationTopicReceiver.setExecutorService(executorService);
                        applicationTopicReceiver.execute();
 
                        if (log.isInfoEnabled()) {
@@ -106,6 +133,7 @@ public class CloudControllerServiceComponent {
             }
 =======
                        clusterStatusTopicReceiver = new 
ClusterStatusTopicReceiver();
+                       
clusterStatusTopicReceiver.setExecutorService(executorService);
                        clusterStatusTopicReceiver.execute();
 >>>>>>> ddf277b... Remove unnessary threads in messaging model
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/4e733930/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java
index d65b7f5..9df4f3a 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java
@@ -26,6 +26,8 @@ import 
org.apache.stratos.messaging.event.applications.ApplicationTerminatedEven
 import 
org.apache.stratos.messaging.listener.applications.ApplicationTerminatedEventListener;
 import 
org.apache.stratos.messaging.message.receiver.applications.ApplicationsEventReceiver;
 
+import java.util.concurrent.ExecutorService;
+
 /**
  * This is to receive the application topic messages.
  */
@@ -33,6 +35,7 @@ public class ApplicationTopicReceiver {
        private static final Log log = 
LogFactory.getLog(ApplicationTopicReceiver.class);
        private ApplicationsEventReceiver applicationsEventReceiver;
        private boolean terminated;
+       private ExecutorService executorService;
 
        public ApplicationTopicReceiver() {
                this.applicationsEventReceiver = new 
ApplicationsEventReceiver();
@@ -46,6 +49,7 @@ public class ApplicationTopicReceiver {
                        log.info("Cloud controller application status thread 
started");
                }
                applicationsEventReceiver.execute();
+               applicationsEventReceiver.setExecutorService(executorService);
 
                if (log.isInfoEnabled()) {
                        log.info("Cloud controller application status thread 
terminated");
@@ -69,4 +73,12 @@ public class ApplicationTopicReceiver {
        public void setTerminated(boolean terminated) {
                this.terminated = terminated;
        }
+
+       public ExecutorService getExecutorService() {
+               return executorService;
+       }
+
+       public void setExecutorService(ExecutorService executorService) {
+               this.executorService = executorService;
+       }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4e733930/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
index ca6d4ad..d54063c 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
@@ -26,14 +26,18 @@ import org.apache.stratos.messaging.event.cluster.status.*;
 import org.apache.stratos.messaging.listener.cluster.status.*;
 import 
org.apache.stratos.messaging.message.receiver.cluster.status.ClusterStatusEventReceiver;
 
+import java.util.concurrent.ExecutorService;
+
 public class ClusterStatusTopicReceiver {
        private static final Log log = 
LogFactory.getLog(ClusterStatusTopicReceiver.class);
 
        private ClusterStatusEventReceiver statusEventReceiver;
        private boolean terminated;
+       private ExecutorService executorService;
 
        public ClusterStatusTopicReceiver() {
                this.statusEventReceiver = new ClusterStatusEventReceiver();
+               this.statusEventReceiver.setExecutorService(executorService);
                addEventListeners();
        }
 
@@ -101,4 +105,12 @@ public class ClusterStatusTopicReceiver {
        public void setTerminated(boolean terminated) {
                this.terminated = terminated;
        }
+
+       public ExecutorService getExecutorService() {
+               return executorService;
+       }
+
+       public void setExecutorService(ExecutorService executorService) {
+               this.executorService = executorService;
+       }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4e733930/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
index 42aabed..dc21735 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
@@ -32,6 +32,8 @@ import 
org.apache.stratos.messaging.listener.instance.status.InstanceReadyToShut
 import 
org.apache.stratos.messaging.listener.instance.status.InstanceStartedEventListener;
 import 
org.apache.stratos.messaging.message.receiver.instance.status.InstanceStatusEventReceiver;
 
+import java.util.concurrent.ExecutorService;
+
 /**
  * This will handle the instance status events
  */
@@ -40,6 +42,7 @@ public class InstanceStatusTopicReceiver {
 
        private InstanceStatusEventReceiver statusEventReceiver;
        private boolean terminated;
+       private ExecutorService executorService;
 
        public InstanceStatusTopicReceiver() {
                this.statusEventReceiver = new InstanceStatusEventReceiver();
@@ -98,4 +101,11 @@ public class InstanceStatusTopicReceiver {
 
        }
 
+       public ExecutorService getExecutorService() {
+               return executorService;
+       }
+
+       public void setExecutorService(ExecutorService executorService) {
+               this.executorService = executorService;
+       }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4e733930/components/org.apache.stratos.common/pom.xml
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/pom.xml 
b/components/org.apache.stratos.common/pom.xml
index 6c33f0d..6ed9228 100644
--- a/components/org.apache.stratos.common/pom.xml
+++ b/components/org.apache.stratos.common/pom.xml
@@ -97,6 +97,11 @@
             <version>3.1</version>
         </dependency>
         <dependency>
+            <groupId>commons-configuration</groupId>
+            <artifactId>commons-configuration</artifactId>
+            <version>1.9</version>
+        </dependency>
+        <dependency>
             <groupId>org.wso2.carbon</groupId>
             <artifactId>org.wso2.carbon.databridge.agent.thrift</artifactId>
             <version>4.2.0</version>

http://git-wip-us.apache.org/repos/asf/stratos/blob/4e733930/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/util/ConfUtil.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/util/ConfUtil.java
 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/util/ConfUtil.java
new file mode 100644
index 0000000..7f9d665
--- /dev/null
+++ 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/util/ConfUtil.java
@@ -0,0 +1,73 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements. See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership. The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License. You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied. See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+package org.apache.stratos.common.util;
+
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.utils.CarbonUtils;
+
+import java.io.File;
+
+/**
+ * This class contains utility methods for read Autoscaler configuration file.
+ */
+public class ConfUtil {
+
+       private static final String CONFIG_FILE_NAME ="stratos-config" ;
+       private static Log log = LogFactory.getLog(ConfUtil.class);
+
+    private XMLConfiguration config;
+
+    private static ConfUtil instance = null;
+
+    private ConfUtil(String configFilePath) {
+        log.info("Loading configuration.....");
+        try {
+
+            File confFile;
+            if (configFilePath != null && !configFilePath.isEmpty()) {
+                confFile = new File(configFilePath);
+
+            } else {
+                confFile = new 
File(CarbonUtils.getCarbonConfigDirPath(),CONFIG_FILE_NAME);
+            }
+
+            config = new XMLConfiguration(confFile);
+        } catch (ConfigurationException e) {
+            log.error("Unable to load autoscaler configuration file",e);
+            config = new XMLConfiguration();  // continue with default values
+        }
+    }
+
+    public static ConfUtil getInstance(String configFilePath) {
+        if (instance == null) {
+            instance = new ConfUtil (configFilePath);
+        }
+        return instance;
+    }
+
+    public XMLConfiguration getConfiguration(){
+        return config;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/4e733930/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
 
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
index efce585..e4ffccc 100644
--- 
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
+++ 
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
@@ -68,7 +68,7 @@ import java.util.concurrent.ExecutorService;
 public class ADCManagementServerComponent {
 
        private static final Log log = 
LogFactory.getLog(ADCManagementServerComponent.class);
-       private static final String STRATOS_MANAGER = "Stratos_manager";
+       private static final String IDENTIFIER = "Stratos_manager";
        private static final int THREAD_POOL_SIZE = 20;
        private StratosManagerTopologyEventReceiver 
stratosManagerTopologyEventReceiver;
        private ExecutorService executorService;
@@ -76,7 +76,7 @@ public class ADCManagementServerComponent {
     protected void activate(ComponentContext componentContext) throws 
Exception {
                try {
                        CartridgeConfigFileReader.readProperties();
-                       
executorService=StratosThreadPool.getExecutorService(STRATOS_MANAGER, 
THREAD_POOL_SIZE);
+                       
executorService=StratosThreadPool.getExecutorService(IDENTIFIER, 
THREAD_POOL_SIZE);
                        
             // Schedule complete tenant event synchronizer
             if(log.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/4e733930/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventReceiver.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventReceiver.java
index cc86c29..82d8c83 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventReceiver.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventReceiver.java
@@ -24,6 +24,8 @@ import 
org.apache.stratos.messaging.broker.subscribe.Subscriber;
 import org.apache.stratos.messaging.listener.EventListener;
 import org.apache.stratos.messaging.util.Util;
 
+import java.util.concurrent.ExecutorService;
+
 public class ApplicationsEventReceiver {
     private static final Log log = 
LogFactory.getLog(ApplicationsEventReceiver.class);
 
@@ -31,6 +33,7 @@ public class ApplicationsEventReceiver {
     private ApplicationsEventMessageListener messageListener;
     private Subscriber subscriber;
     private boolean terminated;
+       private ExecutorService executorService;
 
     public ApplicationsEventReceiver() {
         ApplicationsEventMessageQueue messageQueue = new 
ApplicationsEventMessageQueue();
@@ -47,27 +50,20 @@ public class ApplicationsEventReceiver {
         try {
             // Start topic subscriber thread
             subscriber = new 
Subscriber(Util.Topics.APPLICATIONS_TOPIC.getTopicName(), messageListener);
+                       executorService.execute(subscriber);
 
-            Thread subscriberThread = new Thread(subscriber);
-            subscriberThread.start();
             if (log.isDebugEnabled()) {
                 log.debug("Application status event message receiver thread 
started");
             }
 
             // Start Application status event message delegator thread
-            Thread receiverThread = new Thread(messageDelegator);
-            receiverThread.start();
+               executorService.execute(messageDelegator);
+
             if (log.isDebugEnabled()) {
                 log.debug("Application status event message delegator thread 
started");
             }
 
-            // Keep the thread live until terminated
-            while (!terminated) {
-                try {
-                    Thread.sleep(1000);
-                } catch (InterruptedException ignore) {
-                }
-            }
+
         } catch (Exception e) {
             if (log.isErrorEnabled()) {
                 log.error("Application status failed", e);
@@ -80,4 +76,12 @@ public class ApplicationsEventReceiver {
         messageDelegator.terminate();
         terminated = true;
     }
+
+       public ExecutorService getExecutorService() {
+               return executorService;
+       }
+
+       public void setExecutorService(ExecutorService executorService) {
+               this.executorService = executorService;
+       }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4e733930/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
index 38184aa..a6de430 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
@@ -25,6 +25,9 @@ import 
org.apache.stratos.messaging.broker.subscribe.Subscriber;
 import org.apache.stratos.messaging.listener.EventListener;
 import org.apache.stratos.messaging.util.Util;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
 /**
  * A thread for receiving instance notifier information from message broker.
  */
@@ -34,6 +37,7 @@ public class ClusterStatusEventReceiver{
     private final ClusterStatusEventMessageListener messageListener;
     private Subscriber subscriber;
     private boolean terminated;
+       private ExecutorService executorService;
 
     public ClusterStatusEventReceiver() {
         ClusterStatusEventMessageQueue messageQueue = new 
ClusterStatusEventMessageQueue();
@@ -50,27 +54,19 @@ public class ClusterStatusEventReceiver{
         try {
             // Start topic subscriber thread
             subscriber = new 
Subscriber(Util.Topics.CLUSTER_STATUS_TOPIC.getTopicName(), messageListener);
-//            subscriber.setMessageListener(messageListener);
-            Thread subscriberThread = new Thread(subscriber);
-            subscriberThread.start();
+            executorService.execute(subscriber);
+
             if (log.isDebugEnabled()) {
                 log.debug("InstanceNotifier event message receiver thread 
started");
             }
 
             // Start instance notifier event message delegator thread
-            Thread receiverThread = new Thread(messageDelegator);
-            receiverThread.start();
+               executorService.execute(messageDelegator);
             if (log.isDebugEnabled()) {
                 log.debug("InstanceNotifier event message delegator thread 
started");
             }
 
-            // Keep the thread live until terminated
-            while (!terminated) {
-                try {
-                    Thread.sleep(1000);
-                } catch (InterruptedException ignore) {
-                }
-            }
+
         } catch (Exception e) {
             if (log.isErrorEnabled()) {
                 log.error("InstanceNotifier receiver failed", e);
@@ -87,4 +83,12 @@ public class ClusterStatusEventReceiver{
         messageDelegator.terminate();
         terminated = true;
     }
+
+       public ExecutorService getExecutorService() {
+               return executorService;
+       }
+
+       public void setExecutorService(ExecutorService executorService) {
+               this.executorService = executorService;
+       }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4e733930/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
index 14c7346..d324c7e 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
@@ -25,6 +25,8 @@ import 
org.apache.stratos.messaging.broker.subscribe.Subscriber;
 import org.apache.stratos.messaging.listener.EventListener;
 import org.apache.stratos.messaging.util.Util;
 
+import java.util.concurrent.ExecutorService;
+
 /**
  * A thread for receiving health stat information from message broker
  */
@@ -35,6 +37,7 @@ public class HealthStatEventReceiver {
        private final HealthStatEventMessageListener messageListener;
        private Subscriber subscriber;
        private boolean terminated;
+       private ExecutorService executorService;
 
        public HealthStatEventReceiver() {
                HealthStatEventMessageQueue messageQueue = new 
HealthStatEventMessageQueue();
@@ -84,4 +87,12 @@ public class HealthStatEventReceiver {
                messageDelegator.terminate();
                terminated = true;
        }
+
+       public ExecutorService getExecutorService() {
+               return executorService;
+       }
+
+       public void setExecutorService(ExecutorService executorService) {
+               this.executorService = executorService;
+       }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/4e733930/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
index a8f1d96..af9319f 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
@@ -25,6 +25,8 @@ import 
org.apache.stratos.messaging.broker.subscribe.Subscriber;
 import org.apache.stratos.messaging.listener.EventListener;
 import org.apache.stratos.messaging.util.Util;
 
+import java.util.concurrent.ExecutorService;
+
 /**
  * A thread for receiving instance notifier information from message broker.
  */
@@ -34,6 +36,7 @@ public class InstanceStatusEventReceiver {
     private final InstanceStatusEventMessageListener messageListener;
     private Subscriber subscriber;
     private boolean terminated;
+       private ExecutorService executorService;
 
     public InstanceStatusEventReceiver() {
         InstanceStatusEventMessageQueue messageQueue = new 
InstanceStatusEventMessageQueue();
@@ -51,15 +54,13 @@ public class InstanceStatusEventReceiver {
             // Start topic subscriber thread
             subscriber = new 
Subscriber(Util.Topics.INSTANCE_STATUS_TOPIC.getTopicName(), messageListener);
 //            subscriber.setMessageListener(messageListener);
-            Thread subscriberThread = new Thread(subscriber);
-            subscriberThread.start();
+            executorService.submit(subscriber);
             if (log.isDebugEnabled()) {
                 log.debug("InstanceNotifier event message receiver thread 
started");
             }
 
-            // Start instance notifier event message delegator thread
-            Thread receiverThread = new Thread(messageDelegator);
-            receiverThread.start();
+            // Start instance notifier event message delegate thread
+            executorService.submit(messageDelegator);
             if (log.isDebugEnabled()) {
                 log.debug("InstanceNotifier event message delegator thread 
started");
             }
@@ -81,4 +82,12 @@ public class InstanceStatusEventReceiver {
         messageDelegator.terminate();
         terminated = true;
     }
+
+       public ExecutorService getExecutorService() {
+               return executorService;
+       }
+
+       public void setExecutorService(ExecutorService executorService) {
+               this.executorService = executorService;
+       }
 }

Reply via email to