Repository: stratos
Updated Branches:
refs/heads/master 6c10e5243 -> 868e94caf
Remove unnessary threads in messaging model
Conflicts:
components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/bfc263aa
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/bfc263aa
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/bfc263aa
Branch: refs/heads/master
Commit: bfc263aaa71e4eacbdc3947b7d40003068021cc5
Parents: fe7ee84
Author: gayan <[email protected]>
Authored: Mon Dec 1 15:36:24 2014 +0530
Committer: gayan <[email protected]>
Committed: Tue Dec 2 16:33:36 2014 +0530
----------------------------------------------------------------------
.../AutoscalerHealthStatEventReceiver.java | 11 +-
.../AutoscalerTopologyEventReceiver.java | 277 +++++++++++--------
.../internal/AutoscalerServerComponent.java | 27 +-
.../stratos/cartridge/agent/CartridgeAgent.java | 9 +-
.../CloudControllerServiceComponent.java | 10 +-
.../application/ApplicationTopicReceiver.java | 16 +-
.../status/ClusterStatusTopicReceiver.java | 28 +-
.../status/InstanceStatusTopicReceiver.java | 18 +-
.../common/threading/StratosThreadPool.java | 57 ++++
.../apache/stratos/common/test/CommonTest.java | 1 +
.../extension/api/LoadBalancerExtension.java | 17 +-
.../LoadBalancerTopologyEventReceiver.java | 4 +-
.../internal/ADCManagementServerComponent.java | 24 +-
.../StratosManagerTopologyEventReceiver.java | 17 +-
.../applications/ApplicationsEventReceiver.java | 8 +-
.../status/ClusterStatusEventReceiver.java | 6 +-
.../health/stat/HealthStatEventReceiver.java | 6 +-
.../notifier/InstanceNotifierEventReceiver.java | 6 +-
.../status/InstanceStatusEventReceiver.java | 14 +-
.../topology/TopologyEventReceiver.java | 31 ++-
.../cep/extension/CEPTopologyEventReceiver.java | 16 +-
.../extension/FaultHandlingWindowProcessor.java | 8 +-
.../apache/stratos/haproxy/extension/Main.java | 8 +-
products/stratos/conf/stratos-config.xml | 30 ++
.../distribution/src/main/conf/autoscaler.xml | 4 +
25 files changed, 418 insertions(+), 235 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/bfc263aa/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
index 7cd2fa6..0e45ee1 100644
---
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
+++
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java
@@ -86,19 +86,14 @@ public class AutoscalerHealthStatEventReceiver implements
Runnable {
Thread.sleep(15000);
} catch (InterruptedException ignore) {
}
- Thread thread = new Thread(healthStatEventReceiver);
- thread.start();
+ healthStatEventReceiver.execute();
+
if(log.isInfoEnabled()) {
log.info("Autoscaler health stat event receiver thread started");
}
// Keep the thread live until terminated
- while (!terminated){
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ignore) {
- }
- }
+
if(log.isInfoEnabled()) {
log.info("Autoscaler health stat event receiver thread
terminated");
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/bfc263aa/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
index f54654b..d59fa7d 100644
---
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
+++
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -21,20 +21,19 @@ package
org.apache.stratos.autoscaler.event.receiver.topology;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.applications.ApplicationHolder;
import org.apache.stratos.autoscaler.context.AutoscalerContext;
import org.apache.stratos.autoscaler.context.cluster.ClusterContextFactory;
import org.apache.stratos.autoscaler.context.cluster.VMClusterContext;
import
org.apache.stratos.autoscaler.event.publisher.ClusterStatusEventPublisher;
import
org.apache.stratos.autoscaler.event.publisher.InstanceNotificationPublisher;
+import
org.apache.stratos.autoscaler.exception.application.DependencyBuilderException;
import
org.apache.stratos.autoscaler.exception.partition.PartitionValidationException;
import
org.apache.stratos.autoscaler.exception.policy.PolicyValidationException;
-import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor;
+import
org.apache.stratos.autoscaler.exception.application.TopologyInConsistentException;
import org.apache.stratos.autoscaler.monitor.component.ApplicationMonitor;
+import org.apache.stratos.autoscaler.monitor.MonitorFactory;
+import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor;
import org.apache.stratos.autoscaler.monitor.events.ClusterStatusEvent;
-import org.apache.stratos.autoscaler.pojo.policy.PolicyManager;
-import org.apache.stratos.autoscaler.pojo.policy.deployment.DeploymentPolicy;
-import org.apache.stratos.autoscaler.util.AutoscalerUtil;
import org.apache.stratos.autoscaler.util.ServiceReferenceHolder;
import org.apache.stratos.messaging.domain.applications.Application;
import org.apache.stratos.messaging.domain.applications.Applications;
@@ -49,32 +48,33 @@ import org.apache.stratos.messaging.listener.topology.*;
import
org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+import java.util.concurrent.ExecutorService;
+
/**
* Autoscaler topology receiver.
*/
-public class AutoscalerTopologyEventReceiver implements Runnable {
+public class AutoscalerTopologyEventReceiver{
private static final Log log =
LogFactory.getLog(AutoscalerTopologyEventReceiver.class);
private TopologyEventReceiver topologyEventReceiver;
private boolean terminated;
private boolean topologyInitialized;
+ private ExecutorService executorService;
public AutoscalerTopologyEventReceiver() {
this.topologyEventReceiver = new TopologyEventReceiver();
addEventListeners();
}
- @Override
- public void run() {
+
+ public void execute() {
//FIXME this activated before autoscaler deployer activated.
- /*try {
- Thread.sleep(15000);
- } catch (InterruptedException ignore) {
- }*/
- Thread thread = new Thread(topologyEventReceiver);
- thread.start();
- if (log.isInfoEnabled()) {
+
+ topologyEventReceiver.setExecutorService(executorService);
+ topologyEventReceiver.execute();
+
+ if (log.isInfoEnabled()) {
log.info("Autoscaler topology receiver thread started");
}
@@ -143,17 +143,7 @@ public class AutoscalerTopologyEventReceiver implements
Runnable {
if (applications != null) {
for (Application application :
applications.getApplications().values()) {
if (allClustersInitialized(application)) {
- DeploymentPolicy policy =
PolicyManager.getInstance().
- getDeploymentPolicyByApplication(
-
application.getUniqueIdentifier());
- if (policy != null) {
- AutoscalerUtil.getInstance().
-
startApplicationMonitor(application.getUniqueIdentifier());
- } else {
- log.info("The relevant application
policy is not yet " +
- "deployed for this
[application] " +
-
application.getUniqueIdentifier());
- }
+
startApplicationMonitor(application.getUniqueIdentifier());
} else {
log.error("Complete Topology is not
consistent with the applications " +
"which got persisted");
@@ -185,7 +175,7 @@ public class AutoscalerTopologyEventReceiver implements
Runnable {
//acquire read lock
ApplicationHolder.acquireReadLock();
//start the application monitor
- //startApplicationMonitor(appId);
+ startApplicationMonitor(appId);
} catch (Exception e) {
String msg = "Error processing event " +
e.getLocalizedMessage();
log.error(msg, e);
@@ -214,7 +204,7 @@ public class AutoscalerTopologyEventReceiver implements
Runnable {
if (null == monitor) {
if (log.isDebugEnabled()) {
log.debug(String.format("A cluster monitor is not
found in autoscaler context "
- + "[cluster] %s", clusterId));
+ + "[cluster] %s", clusterId));
}
return;
}
@@ -235,7 +225,7 @@ public class AutoscalerTopologyEventReceiver implements
Runnable {
if (null == monitor) {
if (log.isDebugEnabled()) {
log.debug(String.format("A cluster monitor is not
found in autoscaler context "
- + "[cluster] %s", clusterId));
+ + "[cluster] %s", clusterId));
}
return;
}
@@ -265,7 +255,7 @@ public class AutoscalerTopologyEventReceiver implements
Runnable {
if (null == monitor) {
if (log.isDebugEnabled()) {
log.debug(String.format("A cluster monitor is not
found in autoscaler context "
- + "[cluster] %s", clusterId));
+ + "[cluster] %s", clusterId));
}
return;
}
@@ -287,7 +277,7 @@ public class AutoscalerTopologyEventReceiver implements
Runnable {
if (null == monitor) {
if (log.isDebugEnabled()) {
log.debug(String.format("A cluster monitor is not
found in autoscaler context "
- + "[cluster] %s", clusterId));
+ + "[cluster] %s", clusterId));
}
// if monitor does not exist, send cluster terminated event
ClusterStatusEventPublisher.sendClusterTerminatedEvent(clusterTerminatingEvent.getAppId(),
@@ -296,12 +286,12 @@ public class AutoscalerTopologyEventReceiver implements
Runnable {
}
//changing the status in the monitor, will notify its parent
monitor
if (monitor.getStatus() == ClusterStatus.Active) {
- // terminated gracefully
- monitor.setStatus(ClusterStatus.Terminating);
-
InstanceNotificationPublisher.sendInstanceCleanupEventForCluster(clusterId);
+ // terminated gracefully
+ monitor.setStatus(ClusterStatus.Terminating);
+
InstanceNotificationPublisher.sendInstanceCleanupEventForCluster(clusterId);
} else {
- monitor.setStatus(ClusterStatus.Terminating);
- monitor.terminateAllMembers();
+ monitor.setStatus(ClusterStatus.Terminating);
+ monitor.terminateAllMembers();
}
ServiceReferenceHolder.getInstance().getClusterStatusProcessorChain().
process("", clusterId, instanceId);
@@ -320,11 +310,11 @@ public class AutoscalerTopologyEventReceiver implements
Runnable {
if (null == monitor) {
if (log.isDebugEnabled()) {
log.debug(String.format("A cluster monitor is not
found in autoscaler context "
- + "[cluster] %s", clusterId));
+ + "[cluster] %s", clusterId));
}
// if the cluster monitor is null, assume that its
termianted
ApplicationMonitor appMonitor =
AutoscalerContext.getInstance().getAppMonitor(clusterTerminatedEvent.getAppId());
- if (appMonitor != null) {
+ if (appMonitor != null) {
appMonitor.onChildStatusEvent(new
ClusterStatusEvent(ClusterStatus.Terminated, clusterId, null));
}
return;
@@ -365,7 +355,7 @@ public class AutoscalerTopologyEventReceiver implements
Runnable {
topologyEventReceiver.addEventListener(new
MemberStartedEventListener() {
@Override
protected void onEvent(Event event) {
-
+
}
});
@@ -442,86 +432,153 @@ public class AutoscalerTopologyEventReceiver implements
Runnable {
});
topologyEventReceiver.addEventListener(new
ClusterInstanceCreatedEventListener() {
- @Override
- protected void onEvent(Event event) {
-
- ClusterInstanceCreatedEvent clusterInstanceCreatedEvent =
- (ClusterInstanceCreatedEvent) event;
- AbstractClusterMonitor clusterMonitor =
AutoscalerContext.getInstance().
-
getClusterMonitor(clusterInstanceCreatedEvent.getClusterId());
- String instanceId = ((ClusterInstanceCreatedEvent)
event).getInstanceId();
- //FIXME to take lock when clusterMonitor is running
- if (clusterMonitor != null) {
-
TopologyManager.acquireReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(),
- clusterInstanceCreatedEvent.getClusterId());
-
- try {
- Service service = TopologyManager.getTopology().
-
getService(clusterInstanceCreatedEvent.getServiceName());
-
- if (service != null) {
- Cluster cluster =
service.getCluster(clusterInstanceCreatedEvent.getClusterId());
- if (cluster != null) {
- try {
- if (cluster.isKubernetesCluster()) {
- clusterMonitor.setClusterContext(
-
ClusterContextFactory.getKubernetesClusterContext(
- instanceId,
- cluster));
- } else {
- VMClusterContext clusterContext =
- (VMClusterContext)
clusterMonitor.getClusterContext();
- if (clusterContext == null) {
- clusterMonitor.setClusterContext(
- ClusterContextFactory.
-
getVMClusterContext(instanceId,
- cluster));
- }
-
clusterContext.addInstanceContext(instanceId, cluster);
-
-
- }
- if
(clusterMonitor.hasMonitoringStarted().compareAndSet(false, true)) {
- clusterMonitor.startScheduler();
- log.info("Monitoring task for Cluster
Monitor with cluster id " +
-
clusterInstanceCreatedEvent.getClusterId() + " started successfully");
- }
- } catch (PolicyValidationException e) {
- log.error(e.getMessage(), e);
- } catch (PartitionValidationException e) {
- log.error(e.getMessage(), e);
- }
- }
-
- } else {
- log.error("Service " +
clusterInstanceCreatedEvent.getServiceName() +
- " not found, no cluster instance added to
ClusterMonitor " +
- clusterInstanceCreatedEvent.getClusterId());
- }
-
- } finally {
-
TopologyManager.releaseReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(),
- clusterInstanceCreatedEvent.getClusterId());
- }
-
- } else {
- log.error("No Cluster Monitor found for cluster id " +
- clusterInstanceCreatedEvent.getClusterId());
- }
- }
- }
-
- );
+ @Override
+ protected void onEvent(Event event) {
+
+ ClusterInstanceCreatedEvent clusterInstanceCreatedEvent =
+ (ClusterInstanceCreatedEvent) event;
+ AbstractClusterMonitor clusterMonitor =
AutoscalerContext.getInstance().
+
getClusterMonitor(clusterInstanceCreatedEvent.getClusterId());
+
+ if (clusterMonitor != null) {
+
TopologyManager.acquireReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(),
+ clusterInstanceCreatedEvent.getClusterId());
+
+ try {
+ Service service = TopologyManager.getTopology().
+
getService(clusterInstanceCreatedEvent.getServiceName());
+
+ if (service != null) {
+ Cluster cluster =
service.getCluster(clusterInstanceCreatedEvent.getClusterId());
+ if (cluster != null) {
+ // create and add Cluster Context
+ try {
+ if (cluster.isKubernetesCluster()) {
+
clusterMonitor.addClusterContextForInstance(clusterInstanceCreatedEvent.getInstanceId(),
+
ClusterContextFactory.getKubernetesClusterContext(cluster));
+ } else if (cluster.isLbCluster()) {
+
clusterMonitor.addClusterContextForInstance(clusterInstanceCreatedEvent.getInstanceId(),
+
ClusterContextFactory.getVMLBClusterContext(cluster));
+ } else {
+
clusterMonitor.addClusterContextForInstance(clusterInstanceCreatedEvent.getInstanceId(),
+
ClusterContextFactory.getVMServiceClusterContext(cluster));
+ }
+
+ if
(clusterMonitor.hasMonitoringStarted().compareAndSet(false, true)) {
+ clusterMonitor.startScheduler();
+ log.info("Monitoring task for Cluster
Monitor with cluster id " +
+
clusterInstanceCreatedEvent.getClusterId() + " started successfully");
+ }
+
+ } catch (PolicyValidationException e) {
+ log.error(e.getMessage(), e);
+ } catch (PartitionValidationException e) {
+ log.error(e.getMessage(), e);
+ }
+
+ } else {
+ log.error("Cluster not found for " +
clusterInstanceCreatedEvent.getClusterId() +
+ ", no cluster instance added to
ClusterMonitor " +
+
clusterInstanceCreatedEvent.getClusterId());
+ }
+ } else {
+ log.error("Service " +
clusterInstanceCreatedEvent.getServiceName() +
+ " not found, no cluster instance added to
ClusterMonitor " +
+ clusterInstanceCreatedEvent.getClusterId());
+ }
+
+ } finally {
+
TopologyManager.releaseReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(),
+ clusterInstanceCreatedEvent.getClusterId());
+ }
+
+ } else {
+ log.error("No Cluster Monitor found for cluster id " +
+ clusterInstanceCreatedEvent.getClusterId());
+ }
+ }
+ });
}
/**
* Terminate load balancer topology receiver thread.
*/
-
public void terminate() {
topologyEventReceiver.terminate();
terminated = true;
}
+ protected synchronized void startApplicationMonitor(String applicationId) {
+ Thread th = null;
+ if (AutoscalerContext.getInstance().getAppMonitor(applicationId) ==
null) {
+ th = new Thread(new ApplicationMonitorAdder(applicationId));
+ }
+ if (th != null) {
+ th.start();
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String
+ .format("Application monitor thread already exists: " +
+ "[application] %s ", applicationId));
+ }
+ }
+ }
+
+ private class ApplicationMonitorAdder implements Runnable {
+ private String appId;
+
+ public ApplicationMonitorAdder(String appId) {
+ this.appId = appId;
+ }
+
+ public void run() {
+ ApplicationMonitor applicationMonitor = null;
+ int retries = 5;
+ boolean success = false;
+ do {
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e1) {
+ }
+ try {
+ long start = System.currentTimeMillis();
+ if (log.isDebugEnabled()) {
+ log.debug("application monitor is going to be started
for [application] " +
+ appId);
+ }
+ try {
+ applicationMonitor =
MonitorFactory.getApplicationMonitor(appId);
+ } catch (PolicyValidationException e) {
+ String msg = "Application monitor creation failed for
Application: ";
+ log.warn(msg, e);
+ retries--;
+ }
+ long end = System.currentTimeMillis();
+ log.info("Time taken to start app monitor: " + (end -
start) / 1000);
+ success = true;
+ } catch (DependencyBuilderException e) {
+ String msg = "Application monitor creation failed for
Application: ";
+ log.warn(msg, e);
+ retries--;
+ } catch (TopologyInConsistentException e) {
+ String msg = "Application monitor creation failed for
Application: ";
+ log.warn(msg, e);
+ retries--;
+ }
+ } while (!success && retries != 0);
+
+ if (applicationMonitor == null) {
+ String msg = "Application monitor creation failed, even after
retrying for 5 times, "
+ + "for Application: " + appId;
+ log.error(msg);
+ throw new RuntimeException(msg);
+ }
+ AutoscalerContext.getInstance().addAppMonitor(applicationMonitor);
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Application monitor has been added
successfully: " +
+ "[application] %s", applicationMonitor.getId()));
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/bfc263aa/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
index 7db577c..3694066 100644
---
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
+++
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
@@ -18,6 +18,7 @@
*/
package org.apache.stratos.autoscaler.internal;
+import org.apache.commons.configuration.XMLConfiguration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
//import org.apache.stratos.autoscaler.NetworkPartitionLbHolder;
@@ -33,9 +34,11 @@ import
org.apache.stratos.autoscaler.pojo.policy.autoscale.AutoscalePolicy;
import org.apache.stratos.autoscaler.registry.RegistryManager;
import
org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusProcessorChain;
import
org.apache.stratos.autoscaler.status.processor.group.GroupStatusProcessorChain;
+import org.apache.stratos.autoscaler.util.ConfUtil;
import org.apache.stratos.autoscaler.util.ServiceReferenceHolder;
import org.apache.stratos.cloud.controller.stub.domain.Partition;
import org.apache.stratos.common.kubernetes.KubernetesGroup;
+import org.apache.stratos.common.threading.StratosThreadPool;
import org.osgi.service.component.ComponentContext;
import org.wso2.carbon.ntask.core.service.TaskService;
import org.wso2.carbon.registry.api.RegistryException;
@@ -43,6 +46,7 @@ import org.wso2.carbon.registry.core.service.RegistryService;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.ExecutorService;
/**
* @scr.component
name=org.apache.stratos.autoscaler.internal.AutoscalerServerComponent"
@@ -59,19 +63,30 @@ import java.util.List;
public class AutoscalerServerComponent {
- private static final Log log =
LogFactory.getLog(AutoscalerServerComponent.class);
+ private static final String THREAD_IDENTIFIER_KEY =
"threadPool.autoscaler.identifier";
+ private static final String DEFAULT_IDENTIFIER = "Auto-Scaler";
+ private static final String THREAD_POOL_SIZE_KEY=
"threadPool.autoscaler.threadPoolSize";
+ private static final String COMPONENTS_CONFIG = "components-config";
+ private static final int THREAD_POOL_SIZE = 10;
+ private static final Log log =
LogFactory.getLog(AutoscalerServerComponent.class);
- private AutoscalerTopologyEventReceiver asTopologyReceiver;
+
+ private AutoscalerTopologyEventReceiver asTopologyReceiver;
private AutoscalerHealthStatEventReceiver
autoscalerHealthStatEventReceiver;
- protected void activate(ComponentContext componentContext) throws
Exception {
+ protected void activate(ComponentContext componentContext) throws
Exception {
try {
// Start topology receiver
+ XMLConfiguration conf =
ConfUtil.getInstance(COMPONENTS_CONFIG).getConfiguration();
+ int threadPoolSize = conf.getInt(THREAD_POOL_SIZE_KEY,
THREAD_POOL_SIZE);
+ String threadIdentifier=conf.getString(THREAD_IDENTIFIER_KEY,
DEFAULT_IDENTIFIER);
+ ExecutorService executorService =
StratosThreadPool.getExecutorService(threadIdentifier, threadPoolSize);
asTopologyReceiver = new AutoscalerTopologyEventReceiver();
- Thread topologyTopicSubscriberThread = new
Thread(asTopologyReceiver);
- topologyTopicSubscriberThread.start();
+ asTopologyReceiver.setExecutorService(executorService);
+ asTopologyReceiver.execute();
+
if (log.isDebugEnabled()) {
- log.debug("Topology receiver thread started");
+ log.debug("Topology receiver executor service started");
}
// Start health stat receiver
http://git-wip-us.apache.org/repos/asf/stratos/blob/bfc263aa/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
index 16de655..379df28 100644
---
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
+++
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java
@@ -247,8 +247,9 @@ public class CartridgeAgent implements Runnable {
}
}
});
- Thread eventReceiverThread = new Thread(instanceNotifierEventReceiver);
- eventReceiverThread.start();
+
+ instanceNotifierEventReceiver.execute();
+
if(log.isInfoEnabled()) {
log.info("Instance notifier event message receiver thread
started");
}
@@ -414,8 +415,8 @@ public class CartridgeAgent implements Runnable {
}
});
- Thread thread = new Thread(topologyEventReceiver);
- thread.start();
+// Thread thread = new Thread(topologyEventReceiver);
+// thread.start();
if (log.isDebugEnabled()) {
log.info("Cartridge Agent topology receiver thread started");
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/bfc263aa/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
index 7f2c150..a47e924 100644
---
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
+++
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
@@ -68,24 +68,22 @@ public class CloudControllerServiceComponent {
protected void activate(ComponentContext context) {
try {
applicationTopicReceiver = new ApplicationTopicReceiver();
- Thread tApplicationTopicReceiver = new
Thread(applicationTopicReceiver);
- tApplicationTopicReceiver.start();
+ applicationTopicReceiver.execute();
if (log.isInfoEnabled()) {
log.info("Application event receiver thread started");
}
clusterStatusTopicReceiver = new ClusterStatusTopicReceiver();
- Thread tClusterStatusTopicReceiver = new
Thread(clusterStatusTopicReceiver);
- tClusterStatusTopicReceiver.start();
+ clusterStatusTopicReceiver.execute();
if (log.isInfoEnabled()) {
log.info("Cluster status receiver thread started");
}
instanceStatusTopicReceiver = new InstanceStatusTopicReceiver();
- Thread tInstanceStatusTopicReceiver = new
Thread(instanceStatusTopicReceiver);
- tInstanceStatusTopicReceiver.start();
+ instanceStatusTopicReceiver.execute();
+
if(log.isInfoEnabled()) {
log.info("Instance status message receiver thread started");
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/bfc263aa/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java
index 3e045a9..87dfe6e 100644
---
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java
+++
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java
@@ -29,7 +29,7 @@ import
org.apache.stratos.messaging.message.receiver.applications.ApplicationsEv
/**
* This is to receive the application topic messages.
*/
-public class ApplicationTopicReceiver implements Runnable{
+public class ApplicationTopicReceiver{
private static final Log log =
LogFactory.getLog(ApplicationTopicReceiver.class);
private ApplicationsEventReceiver applicationsEventReceiver;
private boolean terminated;
@@ -41,22 +41,14 @@ public class ApplicationTopicReceiver implements Runnable{
}
- @Override
- public void run() {
+
+ public void execute() {
if (log.isInfoEnabled()) {
log.info("Cloud controller application status thread started");
}
- Thread thread = new Thread(applicationsEventReceiver);
- thread.start();
+ applicationsEventReceiver.execute();
- // Keep the thread live until terminated
- while (!terminated) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ignore) {
- }
- }
if (log.isInfoEnabled()) {
log.info("Cloud controller application status thread terminated");
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/bfc263aa/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
index b4ff9f2..bd2fbf0 100644
---
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
+++
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
@@ -26,7 +26,7 @@ import org.apache.stratos.messaging.event.cluster.status.*;
import org.apache.stratos.messaging.listener.cluster.status.*;
import
org.apache.stratos.messaging.message.receiver.cluster.status.ClusterStatusEventReceiver;
-public class ClusterStatusTopicReceiver implements Runnable{
+public class ClusterStatusTopicReceiver{
private static final Log log =
LogFactory.getLog(ClusterStatusTopicReceiver.class);
private ClusterStatusEventReceiver statusEventReceiver;
@@ -37,26 +37,16 @@ public class ClusterStatusTopicReceiver implements Runnable{
addEventListeners();
}
- public void run() {
- Thread thread = new Thread(statusEventReceiver);
- thread.start();
- if (log.isInfoEnabled()) {
- log.info("Cloud controller Cluster status thread started");
- }
+ public void execute() {
- // Keep the thread live until terminated
- while (!terminated) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ignore) {
- }
- }
- if (log.isInfoEnabled()) {
- log.info("Cloud controller application status thread terminated");
- }
+ statusEventReceiver.execute();
+ if (log.isInfoEnabled()) {
+ log.info("Cloud controller Cluster status thread
started");
+ }
- }
- private void addEventListeners() {
+ }
+
+ private void addEventListeners() {
// Listen to topology events that affect clusters
statusEventReceiver.addEventListener(new
ClusterStatusClusterResetEventListener() {
@Override
http://git-wip-us.apache.org/repos/asf/stratos/blob/bfc263aa/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
index fd24c60..d5475f0 100644
---
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
+++
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
@@ -35,7 +35,7 @@ import
org.apache.stratos.messaging.message.receiver.instance.status.InstanceSta
/**
* This will handle the instance status events
*/
-public class InstanceStatusTopicReceiver implements Runnable {
+public class InstanceStatusTopicReceiver{
private static final Log log =
LogFactory.getLog(InstanceStatusTopicReceiver.class);
private InstanceStatusEventReceiver statusEventReceiver;
@@ -47,20 +47,14 @@ public class InstanceStatusTopicReceiver implements
Runnable {
}
- @Override
- public void run() {
- Thread thread = new Thread(statusEventReceiver);
- thread.start();
+
+ public void execute() {
+ statusEventReceiver.execute();
if (log.isInfoEnabled()) {
log.info("Cloud controller application status thread started");
}
- ///* Keep the thread live until terminated
- while (!terminated) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ignore) {
- }
- }
+
+
if (log.isInfoEnabled()) {
log.info("Cloud controller application status thread terminated");
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/bfc263aa/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
new file mode 100644
index 0000000..3e1ebbe
--- /dev/null
+++
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
@@ -0,0 +1,57 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+package org.apache.stratos.common.threading;
+
+import java.util.HashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Utility class for Stratos thread pool
+ */
+public class StratosThreadPool {
+
+ private static HashMap<String, ExecutorService> mapExecutorService =
new HashMap<String, ExecutorService>();
+ private static Object mutex = new Object();
+
+ /**
+ * Return the executor service based on the identifier and thread pool
size
+ *
+ * @param identifier Component identifier name
+ * @param threadPoolSize Thread pool size
+ * @return ExecutorService
+ */
+ public static ExecutorService getExecutorService(String identifier, int
threadPoolSize) {
+ ExecutorService executorService =
mapExecutorService.get(identifier);
+ if (executorService == null) {
+ synchronized (mutex) {
+ if (executorService == null) {
+ executorService =
Executors.newFixedThreadPool(threadPoolSize);
+ mapExecutorService.put(identifier,
executorService);
+ }
+ }
+
+ }
+ return executorService;
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/bfc263aa/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/CommonTest.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/CommonTest.java
b/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/CommonTest.java
index dc61fc4..eaaab7c 100644
---
a/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/CommonTest.java
+++
b/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/CommonTest.java
@@ -46,4 +46,5 @@ public class CommonTest extends TestCase {
assertEquals("Invalid email address is provided.", e.getMessage());
}
}
+
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/bfc263aa/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
index e58ce37..e74721f 100644
---
a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
+++
b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
@@ -28,6 +28,8 @@ import org.apache.stratos.messaging.listener.topology.*;
import
org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+import java.util.concurrent.ExecutorService;
+
/**
* Load balancer extension thread for executing load balancer life-cycle
according to the topology updates
* received from the message broker.
@@ -41,7 +43,7 @@ public class LoadBalancerExtension implements Runnable {
private TopologyEventReceiver topologyEventReceiver;
private LoadBalancerStatisticsNotifier statisticsNotifier;
private boolean terminated;
-
+ private ExecutorService executorService;
/**
* Load balancer extension constructor.
* @param loadBalancer Load balancer instance: Mandatory.
@@ -62,8 +64,9 @@ public class LoadBalancerExtension implements Runnable {
// Start topology receiver thread
topologyEventReceiver = new TopologyEventReceiver();
addEventListeners();
- Thread topologyReceiverThread = new Thread(topologyEventReceiver);
- topologyReceiverThread.start();
+ topologyEventReceiver.setExecutorService(executorService);
+ topologyEventReceiver.execute();
+
if(statsReader != null) {
// Start stats notifier thread
@@ -162,4 +165,12 @@ public class LoadBalancerExtension implements Runnable {
}
terminated = true;
}
+
+ public ExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ public void setExecutorService(ExecutorService executorService) {
+ this.executorService = executorService;
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/bfc263aa/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/messaging/LoadBalancerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/messaging/LoadBalancerTopologyEventReceiver.java
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/messaging/LoadBalancerTopologyEventReceiver.java
index c50a2d2..83e0c18 100644
---
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/messaging/LoadBalancerTopologyEventReceiver.java
+++
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/messaging/LoadBalancerTopologyEventReceiver.java
@@ -53,8 +53,8 @@ public class LoadBalancerTopologyEventReceiver implements
Runnable {
@Override
public void run() {
- Thread thread = new Thread(topologyEventReceiver);
- thread.start();
+ // Thread thread = new Thread(topologyEventReceiver);
+ // thread.start();
if (log.isInfoEnabled()) {
log.info("Load balancer topology receiver thread started");
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/bfc263aa/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
index 846b93a..797f1a1 100644
---
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
+++
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
@@ -18,8 +18,10 @@
*/
package org.apache.stratos.manager.internal;
+import org.apache.axis2.util.threadpool.ThreadPool;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.threading.StratosThreadPool;
import org.apache.stratos.manager.listener.InstanceStatusListener;
import org.apache.stratos.manager.listener.TenantUserRoleCreator;
import org.apache.stratos.manager.publisher.TenantEventPublisher;
@@ -39,6 +41,8 @@ import org.wso2.carbon.user.core.UserRealm;
import org.wso2.carbon.user.core.service.RealmService;
import org.wso2.carbon.utils.ConfigurationContextService;
+import java.util.concurrent.ExecutorService;
+
/**
* @scr.component
name="org.wso2.carbon.hosting.mgt.internal.ADCManagementServerComponent"
* immediate="true"
@@ -63,13 +67,17 @@ import org.wso2.carbon.utils.ConfigurationContextService;
public class ADCManagementServerComponent {
- private static final Log log =
LogFactory.getLog(ADCManagementServerComponent.class);
- private StratosManagerTopologyEventReceiver
stratosManagerTopologyEventReceiver;
+ private static final Log log =
LogFactory.getLog(ADCManagementServerComponent.class);
+ public static final String STRATOS_MANAGER = "Stratos_manager";
+ public static final int THREAD_POOL_SIZE = 20;
+ private StratosManagerTopologyEventReceiver
stratosManagerTopologyEventReceiver;
+ private ExecutorService executorService;
protected void activate(ComponentContext componentContext) throws
Exception {
try {
CartridgeConfigFileReader.readProperties();
-
+
executorService=StratosThreadPool.getExecutorService(STRATOS_MANAGER,
THREAD_POOL_SIZE);
+
// Schedule complete tenant event synchronizer
if(log.isDebugEnabled()) {
log.debug("Scheduling tenant synchronizer task...");
@@ -90,8 +98,8 @@ public class ADCManagementServerComponent {
log.debug("Starting instance status topic subscriber...");
}
Subscriber subscriber = new
Subscriber(Util.Topics.INSTANCE_STATUS_TOPIC.getTopicName(), new
InstanceStatusListener());
- Thread tsubscriber = new Thread(subscriber);
- tsubscriber.start();
+ executorService.execute(subscriber);
+
RealmService realmService = DataHolder.getRealmService();
UserRealm realm = realmService.getBootstrapRealm();
@@ -115,9 +123,11 @@ public class ADCManagementServerComponent {
Thread topologyReceiverThread = new Thread(topologyReceiver);
topologyReceiverThread.start();*/
+
stratosManagerTopologyEventReceiver = new
StratosManagerTopologyEventReceiver();
- Thread topologyReceiverThread = new
Thread(stratosManagerTopologyEventReceiver);
- topologyReceiverThread.start();
+
stratosManagerTopologyEventReceiver.setExecutorService(executorService);
+
executorService.execute(stratosManagerTopologyEventReceiver);
+
log.info("Topology receiver thread started");
// retrieve persisted CartridgeSubscriptions
http://git-wip-us.apache.org/repos/asf/stratos/blob/bfc263aa/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyEventReceiver.java
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyEventReceiver.java
index ff87b0c..9ebac57 100644
---
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyEventReceiver.java
+++
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/topology/receiver/StratosManagerTopologyEventReceiver.java
@@ -37,6 +37,7 @@ import
org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import java.util.List;
+import java.util.concurrent.ExecutorService;
public class StratosManagerTopologyEventReceiver implements Runnable {
@@ -44,6 +45,7 @@ public class StratosManagerTopologyEventReceiver implements
Runnable {
private TopologyEventReceiver topologyEventReceiver;
private boolean terminated;
+ private ExecutorService executorService;
public StratosManagerTopologyEventReceiver() {
this.terminated = false;
@@ -448,8 +450,11 @@ public class StratosManagerTopologyEventReceiver
implements Runnable {
@Override
public void run() {
- Thread thread = new Thread(topologyEventReceiver);
- thread.start();
+
+ topologyEventReceiver.setExecutorService(executorService);
+ topologyEventReceiver.execute();
+ // executorService.execute(topologyEventReceiver);
+
log.info("Stratos Manager topology receiver thread started");
//Keep running till terminate is set from deactivate method of the
component
@@ -468,4 +473,12 @@ public class StratosManagerTopologyEventReceiver
implements Runnable {
topologyEventReceiver.terminate();
terminated = true;
}
+
+ public ExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ public void setExecutorService(ExecutorService executorService) {
+ this.executorService = executorService;
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/bfc263aa/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventReceiver.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventReceiver.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventReceiver.java
index d034b8a..cc86c29 100644
---
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventReceiver.java
+++
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventReceiver.java
@@ -24,7 +24,7 @@ import
org.apache.stratos.messaging.broker.subscribe.Subscriber;
import org.apache.stratos.messaging.listener.EventListener;
import org.apache.stratos.messaging.util.Util;
-public class ApplicationsEventReceiver implements Runnable {
+public class ApplicationsEventReceiver {
private static final Log log =
LogFactory.getLog(ApplicationsEventReceiver.class);
private ApplicationsEventMessageDelegator messageDelegator;
@@ -42,12 +42,12 @@ public class ApplicationsEventReceiver implements Runnable {
messageDelegator.addEventListener(eventListener);
}
- @Override
- public void run() {
+
+ public void execute() {
try {
// Start topic subscriber thread
subscriber = new
Subscriber(Util.Topics.APPLICATIONS_TOPIC.getTopicName(), messageListener);
-// subscriber.setMessageListener(messageListener);
+
Thread subscriberThread = new Thread(subscriber);
subscriberThread.start();
if (log.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/bfc263aa/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
index 05bcf02..38184aa 100644
---
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
+++
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
@@ -28,7 +28,7 @@ import org.apache.stratos.messaging.util.Util;
/**
* A thread for receiving instance notifier information from message broker.
*/
-public class ClusterStatusEventReceiver implements Runnable {
+public class ClusterStatusEventReceiver{
private static final Log log =
LogFactory.getLog(ClusterStatusEventReceiver.class);
private final ClusterStatusEventMessageDelegator messageDelegator;
private final ClusterStatusEventMessageListener messageListener;
@@ -45,8 +45,8 @@ public class ClusterStatusEventReceiver implements Runnable {
messageDelegator.addEventListener(eventListener);
}
- @Override
- public void run() {
+
+ public void execute() {
try {
// Start topic subscriber thread
subscriber = new
Subscriber(Util.Topics.CLUSTER_STATUS_TOPIC.getTopicName(), messageListener);
http://git-wip-us.apache.org/repos/asf/stratos/blob/bfc263aa/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
index c5ddbb4..14c7346 100644
---
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
+++
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
@@ -28,7 +28,7 @@ import org.apache.stratos.messaging.util.Util;
/**
* A thread for receiving health stat information from message broker
*/
-public class HealthStatEventReceiver implements Runnable {
+public class HealthStatEventReceiver {
private static final Log log =
LogFactory.getLog(HealthStatEventReceiver.class);
private final HealthStatEventMessageDelegator messageDelegator;
@@ -46,8 +46,8 @@ public class HealthStatEventReceiver implements Runnable {
messageDelegator.addEventListener(eventListener);
}
- @Override
- public void run() {
+
+ public void execute() {
try {
// Start topic subscriber thread
subscriber = new
Subscriber(Util.Topics.HEALTH_STAT_TOPIC.getTopicName(), messageListener);
http://git-wip-us.apache.org/repos/asf/stratos/blob/bfc263aa/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
index 8bfcbeb..5e09672 100644
---
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
+++
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
@@ -29,7 +29,7 @@ import org.apache.stratos.messaging.util.Util;
/**
* A thread for receiving instance notifier information from message broker.
*/
-public class InstanceNotifierEventReceiver implements Runnable {
+public class InstanceNotifierEventReceiver {
private static final Log log =
LogFactory.getLog(InstanceNotifierEventReceiver.class);
private final InstanceNotifierEventMessageDelegator messageDelegator;
private final InstanceNotifierEventMessageListener messageListener;
@@ -46,8 +46,8 @@ public class InstanceNotifierEventReceiver implements
Runnable {
messageDelegator.addEventListener(eventListener);
}
- @Override
- public void run() {
+
+ public void execute() {
try {
// Start topic subscriber thread
subscriber = new
Subscriber(Util.Topics.INSTANCE_NOTIFIER_TOPIC.getTopicName(), messageListener);
http://git-wip-us.apache.org/repos/asf/stratos/blob/bfc263aa/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
index 72742cc..a8f1d96 100644
---
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
+++
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
@@ -28,7 +28,7 @@ import org.apache.stratos.messaging.util.Util;
/**
* A thread for receiving instance notifier information from message broker.
*/
-public class InstanceStatusEventReceiver implements Runnable {
+public class InstanceStatusEventReceiver {
private static final Log log =
LogFactory.getLog(InstanceStatusEventReceiver.class);
private final InstanceStatusEventMessageDelegator messageDelegator;
private final InstanceStatusEventMessageListener messageListener;
@@ -45,8 +45,8 @@ public class InstanceStatusEventReceiver implements Runnable {
messageDelegator.addEventListener(eventListener);
}
- @Override
- public void run() {
+
+ public void execute() {
try {
// Start topic subscriber thread
subscriber = new
Subscriber(Util.Topics.INSTANCE_STATUS_TOPIC.getTopicName(), messageListener);
@@ -64,13 +64,7 @@ public class InstanceStatusEventReceiver implements Runnable
{
log.debug("InstanceNotifier event message delegator thread
started");
}
- // Keep the thread live until terminated
- while (!terminated) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ignore) {
- }
- }
+
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("InstanceNotifier receiver failed", e);
http://git-wip-us.apache.org/repos/asf/stratos/blob/bfc263aa/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
index 5b411d3..b4beea5 100644
---
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
+++
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
@@ -25,17 +25,18 @@ import
org.apache.stratos.messaging.broker.subscribe.Subscriber;
import org.apache.stratos.messaging.listener.EventListener;
import org.apache.stratos.messaging.util.Util;
-
+import java.util.concurrent.ExecutorService;
/**
* A thread for receiving topology information from message broker and
* build topology in topology manager.
*/
-public class TopologyEventReceiver implements Runnable {
+public class TopologyEventReceiver {
private static final Log log =
LogFactory.getLog(TopologyEventReceiver.class);
private TopologyEventMessageDelegator messageDelegator;
private TopologyEventMessageListener messageListener;
private Subscriber subscriber;
+ private ExecutorService executorService;
private boolean terminated;
public TopologyEventReceiver() {
@@ -48,34 +49,26 @@ public class TopologyEventReceiver implements Runnable {
messageDelegator.addEventListener(eventListener);
}
- @Override
- public void run() {
+
+ public void execute() {
try {
// Start topic subscriber thread
subscriber = new
Subscriber(Util.Topics.TOPOLOGY_TOPIC.getTopicName(), messageListener);
// subscriber.setMessageListener(messageListener);
+ executorService.execute(subscriber);
- Thread subscriberThread = new Thread(subscriber);
- subscriberThread.start();
if (log.isDebugEnabled()) {
log.debug("Topology event message receiver thread started");
}
// Start topology event message delegator thread
- Thread receiverThread = new Thread(messageDelegator);
- receiverThread.start();
+ executorService.execute(messageDelegator);
if (log.isDebugEnabled()) {
log.debug("Topology event message delegator thread started");
}
- // Keep the thread live until terminated
- while (!terminated) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ignore) {
- }
- }
+
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Topology receiver failed", e);
@@ -88,4 +81,12 @@ public class TopologyEventReceiver implements Runnable {
messageDelegator.terminate();
terminated = true;
}
+
+ public ExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ public void setExecutorService(ExecutorService executorService) {
+ this.executorService = executorService;
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/bfc263aa/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git
a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
index 3218708..e384d95 100644
---
a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
+++
b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
@@ -31,6 +31,8 @@ import
org.apache.stratos.messaging.listener.topology.MemberTerminatedEventListe
import
org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+import java.util.concurrent.ExecutorService;
+
/**
* CEP Topology Receiver for Fault Handling Window Processor.
*/
@@ -41,6 +43,7 @@ public class CEPTopologyEventReceiver implements Runnable {
private TopologyEventReceiver topologyEventReceiver;
private boolean terminated;
private FaultHandlingWindowProcessor faultHandler;
+ private ExecutorService executorService;
public CEPTopologyEventReceiver(FaultHandlingWindowProcessor faultHandler)
{
this.topologyEventReceiver = new TopologyEventReceiver();
@@ -101,8 +104,9 @@ public class CEPTopologyEventReceiver implements Runnable {
Thread.sleep(15000);
} catch (InterruptedException ignore) {
}
- Thread thread = new Thread(topologyEventReceiver);
- thread.start();
+ topologyEventReceiver.setExecutorService(executorService);
+ topologyEventReceiver.execute();
+ // executorService.execute(topologyEventReceiver);
log.info("CEP topology receiver thread started");
// Keep the thread live until terminated
@@ -122,4 +126,12 @@ public class CEPTopologyEventReceiver implements Runnable {
topologyEventReceiver.terminate();
terminated = true;
}
+
+ public ExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ public void setExecutorService(ExecutorService executorService) {
+ this.executorService = executorService;
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/bfc263aa/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
----------------------------------------------------------------------
diff --git
a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
index 70d8b32..56a3fcf 100644
---
a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
+++
b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
@@ -20,6 +20,7 @@ package org.apache.stratos.cep.extension;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
+import org.apache.stratos.common.threading.StratosThreadPool;
import org.apache.stratos.messaging.broker.publish.EventPublisher;
import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
import org.apache.stratos.messaging.domain.topology.*;
@@ -46,6 +47,7 @@ import
org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -255,6 +257,7 @@ public class FaultHandlingWindowProcessor extends
WindowProcessor implements Run
@Override
protected void init(Expression[] parameters, QueryPostProcessingElement
nextProcessor,
AbstractDefinition streamDefinition, String elementId,
boolean async, SiddhiContext siddhiContext) {
+
if (parameters[0] instanceof IntConstant) {
timeToKeep = ((IntConstant) parameters[0]).getValue();
} else {
@@ -271,8 +274,9 @@ public class FaultHandlingWindowProcessor extends
WindowProcessor implements Run
}
MemberFaultEventMap.put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent",
memberFaultEventMessageMap);
- Thread topologyTopicSubscriberThread = new
Thread(cepTopologyEventReceiver);
- topologyTopicSubscriberThread.start();
+ ExecutorService executorService=
StratosThreadPool.getExecutorService("AutoScaler",10);
+ cepTopologyEventReceiver.setExecutorService(executorService);
+ executorService.execute(cepTopologyEventReceiver);
//Ordinary scheduling
window.schedule();
http://git-wip-us.apache.org/repos/asf/stratos/blob/bfc263aa/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
----------------------------------------------------------------------
diff --git
a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
index 270fe89..013aee9 100644
---
a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
+++
b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
@@ -22,15 +22,19 @@ package org.apache.stratos.haproxy.extension;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.PropertyConfigurator;
+import org.apache.stratos.common.threading.StratosThreadPool;
import org.apache.stratos.load.balancer.extension.api.LoadBalancerExtension;
+import java.util.concurrent.ExecutorService;
+
/**
* HAProxy extension main class.
*/
public class Main {
private static final Log log = LogFactory.getLog(Main.class);
+ private static ExecutorService executorService;
- public static void main(String[] args) {
+ public static void main(String[] args) {
LoadBalancerExtension extension = null;
try {
@@ -40,7 +44,7 @@ public class Main {
if (log.isInfoEnabled()) {
log.info("HAProxy extension started");
}
-
+ executorService =
StratosThreadPool.getExecutorService("Load_Balance_Extension", 10);
// Validate runtime parameters
HAProxyContext.getInstance().validate();
extension = new LoadBalancerExtension(new HAProxy(),
(HAProxyContext.getInstance().isCEPStatsPublisherEnabled() ? new
HAProxyStatisticsReader() : null));
http://git-wip-us.apache.org/repos/asf/stratos/blob/bfc263aa/products/stratos/conf/stratos-config.xml
----------------------------------------------------------------------
diff --git a/products/stratos/conf/stratos-config.xml
b/products/stratos/conf/stratos-config.xml
new file mode 100644
index 0000000..47b84ce
--- /dev/null
+++ b/products/stratos/conf/stratos-config.xml
@@ -0,0 +1,30 @@
+<!--
+ ~ /*
+ ~ * Licensed to the Apache Software Foundation (ASF) under one
+ ~ * or more contributor license agreements. See the NOTICE file
+ ~ * distributed with this work for additional information
+ ~ * regarding copyright ownership. The ASF licenses this file
+ ~ * to you under the Apache License, Version 2.0 (the
+ ~ * "License"); you may not use this file except in compliance
+ ~ * with the License. You may obtain a copy of the License at
+ ~ *
+ ~ * http://www.apache.org/licenses/LICENSE-2.0
+ ~ *
+ ~ * Unless required by applicable law or agreed to in writing,
+ ~ * software distributed under the License is distributed on an
+ ~ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ * KIND, either express or implied. See the License for the
+ ~ * specific language governing permissions and limitations
+ ~ * under the License.
+ ~ */
+ -->
+<configuration>
+ <threadPool>
+ <autoscaler>
+ <identifier>Autoscaler</identifier>
+ <threadPoolSize>10</threadPoolSize>
+ </autoscaler>
+ </threadPool>
+</configuration>
+
+
http://git-wip-us.apache.org/repos/asf/stratos/blob/bfc263aa/products/stratos/modules/distribution/src/main/conf/autoscaler.xml
----------------------------------------------------------------------
diff --git a/products/stratos/modules/distribution/src/main/conf/autoscaler.xml
b/products/stratos/modules/distribution/src/main/conf/autoscaler.xml
index 4d8056e..d298ead 100644
--- a/products/stratos/modules/distribution/src/main/conf/autoscaler.xml
+++ b/products/stratos/modules/distribution/src/main/conf/autoscaler.xml
@@ -64,5 +64,9 @@
<service>60000</service>
</kubernetes>
</monitorInterval>
+ <threadpool>
+ <identifier>Autoscaler</identifier>
+ <threadPoolSize>10</threadPoolSize>
+ </threadpool>
</autoscaler>
</configuration>