Remove unnessary threads in messaging model
Conflicts:
components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/8012f8c8
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/8012f8c8
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/8012f8c8
Branch: refs/heads/master
Commit: 8012f8c8d1b7ef808fae55d0f18826e651c04ed9
Parents: 9e6e91d
Author: gayan <[email protected]>
Authored: Mon Dec 1 16:25:22 2014 +0530
Committer: gayan <[email protected]>
Committed: Tue Dec 2 16:36:37 2014 +0530
----------------------------------------------------------------------
.../AutoscalerTopologyEventReceiver.java | 527 ++++++++++++++++++-
.../internal/AutoscalerServerComponent.java | 131 ++++-
.../CloudControllerServiceComponent.java | 142 ++++-
.../application/ApplicationTopicReceiver.java | 65 ++-
.../status/ClusterStatusTopicReceiver.java | 110 ++--
.../status/InstanceStatusTopicReceiver.java | 128 +++--
.../extension/api/LoadBalancerExtension.java | 256 +++++----
.../extension/FaultHandlingWindowProcessor.java | 307 +++++++++--
.../apache/stratos/haproxy/extension/Main.java | 48 +-
9 files changed, 1332 insertions(+), 382 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/8012f8c8/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
index bfdf30b..1f14542 100644
---
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
+++
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -54,15 +54,16 @@ import java.util.concurrent.ExecutorService;
/**
* Autoscaler topology receiver.
*/
-public class AutoscalerTopologyEventReceiver{
+public class AutoscalerTopologyEventReceiver {
- private static final Log log =
LogFactory.getLog(AutoscalerTopologyEventReceiver.class);
+ private static final Log log =
LogFactory.getLog(AutoscalerTopologyEventReceiver.class);
- private TopologyEventReceiver topologyEventReceiver;
- private boolean terminated;
- private boolean topologyInitialized;
+ private TopologyEventReceiver topologyEventReceiver;
+ private boolean terminated;
+ private boolean topologyInitialized;
private ExecutorService executorService;
+<<<<<<< HEAD
public AutoscalerTopologyEventReceiver() {
this.topologyEventReceiver = new TopologyEventReceiver();
addEventListeners();
@@ -523,6 +524,461 @@ public class AutoscalerTopologyEventReceiver{
topologyEventReceiver.terminate();
terminated = true;
}
+=======
+ public AutoscalerTopologyEventReceiver() {
+ this.topologyEventReceiver = new TopologyEventReceiver();
+ addEventListeners();
+ }
+
+ public void execute() {
+ //FIXME this activated before autoscaler deployer activated.
+
+ topologyEventReceiver.setExecutorService(executorService);
+ topologyEventReceiver.execute();
+
+ if (log.isInfoEnabled()) {
+ log.info("Autoscaler topology receiver thread started");
+ }
+
+ }
+
+ private boolean allClustersInitialized(Application application) {
+ boolean allClustersInitialized = false;
+ for (ClusterDataHolder holder :
application.getClusterDataRecursively()) {
+
TopologyManager.acquireReadLockForCluster(holder.getServiceType(),
+
holder.getClusterId());
+
+ try {
+ Topology topology =
TopologyManager.getTopology();
+ if (topology != null) {
+ Service service =
topology.getService(holder.getServiceType());
+ if (service != null) {
+ if
(service.clusterExists(holder.getClusterId())) {
+ allClustersInitialized
= true;
+ return
allClustersInitialized;
+ } else {
+ if
(log.isDebugEnabled()) {
+
log.debug("[Cluster] " + holder.getClusterId() + " is not found in " +
+ "the
Topology");
+ }
+ allClustersInitialized
= false;
+ }
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("Service is
null in the CompleteTopologyEvent");
+ }
+ }
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("Topology is null in
the CompleteTopologyEvent");
+ }
+ }
+ } finally {
+
TopologyManager.releaseReadLockForCluster(holder.getServiceType(),
+
holder.getClusterId());
+ }
+ }
+ return allClustersInitialized;
+ }
+
+ private void addEventListeners() {
+ // Listen to topology events that affect clusters
+ topologyEventReceiver.addEventListener(new
CompleteTopologyEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ if (!topologyInitialized) {
+ log.info("[CompleteTopologyEvent]
Received: " + event.getClass());
+ ApplicationHolder.acquireReadLock();
+ try {
+ Applications applications =
ApplicationHolder.getApplications();
+ if (applications != null) {
+ for (Application
application : applications.getApplications().values()) {
+ if
(allClustersInitialized(application)) {
+
startApplicationMonitor(application.getUniqueIdentifier());
+ } else {
+
log.error("Complete Topology is not consistent with the applications " +
+
"which got persisted");
+ }
+ }
+ topologyInitialized =
true;
+ } else {
+ log.info("No
applications found in the complete topology");
+ }
+ } catch (Exception e) {
+ log.error("Error processing
event", e);
+ } finally {
+
ApplicationHolder.releaseReadLock();
+ }
+ }
+ }
+ });
+
+ topologyEventReceiver.addEventListener(new
ApplicationClustersCreatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ try {
+
log.info("[ApplicationClustersCreatedEvent] Received: " + event.getClass());
+ ApplicationClustersCreatedEvent
applicationClustersCreatedEvent =
+
(ApplicationClustersCreatedEvent) event;
+ String appId =
applicationClustersCreatedEvent.getAppId();
+ try {
+ //acquire read lock
+
ApplicationHolder.acquireReadLock();
+ //start the application monitor
+ startApplicationMonitor(appId);
+ } catch (Exception e) {
+ String msg = "Error processing
event " + e.getLocalizedMessage();
+ log.error(msg, e);
+ } finally {
+ //release read lock
+
ApplicationHolder.releaseReadLock();
+
+ }
+ } catch (ClassCastException e) {
+ String msg = "Error while casting the
event " + e.getLocalizedMessage();
+ log.error(msg, e);
+ }
+
+ }
+ });
+
+ topologyEventReceiver.addEventListener(new
ClusterActivatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ log.info("[ClusterActivatedEvent] Received: " +
event.getClass());
+ ClusterActivatedEvent clusterActivatedEvent =
(ClusterActivatedEvent) event;
+ String clusterId =
clusterActivatedEvent.getClusterId();
+ AutoscalerContext asCtx =
AutoscalerContext.getInstance();
+ AbstractClusterMonitor monitor;
+ monitor = asCtx.getClusterMonitor(clusterId);
+ if (null == monitor) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("A
cluster monitor is not found in autoscaler context "
+ +
"[cluster] %s", clusterId));
+ }
+ return;
+ }
+ //changing the status in the monitor, will
notify its parent monitor
+
+ }
+ });
+
+ topologyEventReceiver.addEventListener(new
ClusterResetEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ log.info("[ClusterCreatedEvent] Received: " +
event.getClass());
+ ClusterResetEvent clusterResetEvent =
(ClusterResetEvent) event;
+ String clusterId =
clusterResetEvent.getClusterId();
+ AutoscalerContext asCtx =
AutoscalerContext.getInstance();
+ AbstractClusterMonitor monitor;
+ monitor = asCtx.getClusterMonitor(clusterId);
+ if (null == monitor) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("A
cluster monitor is not found in autoscaler context "
+ +
"[cluster] %s", clusterId));
+ }
+ return;
+ }
+ //changing the status in the monitor, will
notify its parent monitor
+ monitor.destroy();
+ monitor.setStatus(ClusterStatus.Created);
+
+ }
+ });
+
+ topologyEventReceiver.addEventListener(new
ClusterCreatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ log.info("[ClusterCreatedEvent] Received: " +
event.getClass());
+ }
+ });
+
+ topologyEventReceiver.addEventListener(new
ClusterInActivateEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ log.info("[ClusterInActivateEvent] Received: "
+ event.getClass());
+ ClusterInactivateEvent clusterInactivateEvent =
(ClusterInactivateEvent) event;
+ String clusterId =
clusterInactivateEvent.getClusterId();
+ AutoscalerContext asCtx =
AutoscalerContext.getInstance();
+ AbstractClusterMonitor monitor;
+ monitor = asCtx.getClusterMonitor(clusterId);
+ if (null == monitor) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("A
cluster monitor is not found in autoscaler context "
+ +
"[cluster] %s", clusterId));
+ }
+ return;
+ }
+ //changing the status in the monitor, will
notify its parent monitor
+ monitor.setStatus(ClusterStatus.Inactive);
+ }
+ });
+
+ topologyEventReceiver.addEventListener(new
ClusterTerminatingEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ log.info("[ClusterTerminatingEvent] Received: "
+ event.getClass());
+ ClusterTerminatingEvent clusterTerminatingEvent
= (ClusterTerminatingEvent) event;
+ String clusterId =
clusterTerminatingEvent.getClusterId();
+ String instanceId =
clusterTerminatingEvent.getInstanceId();
+ AutoscalerContext asCtx =
AutoscalerContext.getInstance();
+ AbstractClusterMonitor monitor;
+ monitor = asCtx.getClusterMonitor(clusterId);
+ if (null == monitor) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("A
cluster monitor is not found in autoscaler context "
+ +
"[cluster] %s", clusterId));
+ }
+ // if monitor does not exist, send
cluster terminated event
+
ClusterStatusEventPublisher.sendClusterTerminatedEvent(clusterTerminatingEvent.getAppId(),
+
clusterTerminatingEvent.getServiceName(),
+
clusterId, instanceId);
+ return;
+ }
+ //changing the status in the monitor, will
notify its parent monitor
+ if (monitor.getStatus() ==
ClusterStatus.Active) {
+ // terminated gracefully
+
monitor.setStatus(ClusterStatus.Terminating);
+
InstanceNotificationPublisher.sendInstanceCleanupEventForCluster(clusterId);
+ } else {
+
monitor.setStatus(ClusterStatus.Terminating);
+ monitor.terminateAllMembers();
+ }
+
ServiceReferenceHolder.getInstance().getClusterStatusProcessorChain().
+ process("", clusterId,
instanceId);
+ }
+ });
+
+ topologyEventReceiver.addEventListener(new
ClusterTerminatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ log.info("[ClusterTerminatedEvent] Received: "
+ event.getClass());
+ ClusterTerminatedEvent clusterTerminatedEvent =
(ClusterTerminatedEvent) event;
+ String clusterId =
clusterTerminatedEvent.getClusterId();
+ AutoscalerContext asCtx =
AutoscalerContext.getInstance();
+ AbstractClusterMonitor monitor;
+ monitor = asCtx.getClusterMonitor(clusterId);
+ if (null == monitor) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("A
cluster monitor is not found in autoscaler context "
+ +
"[cluster] %s", clusterId));
+ }
+ // if the cluster monitor is null,
assume that its termianted
+ ApplicationMonitor appMonitor =
+
AutoscalerContext.getInstance().getAppMonitor(clusterTerminatedEvent.getAppId());
+ if (appMonitor != null) {
+ appMonitor
+
.onChildStatusEvent(new ClusterStatusEvent(ClusterStatus.Terminated, clusterId,
null));
+ }
+ return;
+ }
+ //changing the status in the monitor, will
notify its parent monitor
+ monitor.setStatus(ClusterStatus.Terminated);
+ //Destroying and Removing the Cluster monitor
+ monitor.destroy();
+
AutoscalerContext.getInstance().removeClusterMonitor(clusterId);
+ }
+ });
+
+ topologyEventReceiver.addEventListener(new
MemberReadyToShutdownEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ try {
+ MemberReadyToShutdownEvent
memberReadyToShutdownEvent = (MemberReadyToShutdownEvent) event;
+ String clusterId =
memberReadyToShutdownEvent.getClusterId();
+ AutoscalerContext asCtx =
AutoscalerContext.getInstance();
+ AbstractClusterMonitor monitor;
+ monitor =
asCtx.getClusterMonitor(clusterId);
+ if (null == monitor) {
+ if (log.isDebugEnabled()) {
+
log.debug(String.format("A cluster monitor is not found in autoscaler context "
+
+ "[cluster] %s", clusterId));
+ }
+ return;
+ }
+
monitor.handleMemberReadyToShutdownEvent(memberReadyToShutdownEvent);
+ } catch (Exception e) {
+ String msg = "Error processing event "
+ e.getLocalizedMessage();
+ log.error(msg, e);
+ }
+ }
+ });
+
+ topologyEventReceiver.addEventListener(new
MemberStartedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+
+ }
+ });
+
+ topologyEventReceiver.addEventListener(new
MemberTerminatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ try {
+ MemberTerminatedEvent
memberTerminatedEvent = (MemberTerminatedEvent) event;
+ String clusterId =
memberTerminatedEvent.getClusterId();
+ AbstractClusterMonitor monitor;
+ AutoscalerContext asCtx =
AutoscalerContext.getInstance();
+ monitor =
asCtx.getClusterMonitor(clusterId);
+ if (null == monitor) {
+ if (log.isDebugEnabled()) {
+
log.debug(String.format("A cluster monitor is not found in autoscaler context "
+
+ "[cluster] %s", clusterId));
+ }
+ return;
+ }
+
monitor.handleMemberTerminatedEvent(memberTerminatedEvent);
+ } catch (Exception e) {
+ String msg = "Error processing event "
+ e.getLocalizedMessage();
+ log.error(msg, e);
+ }
+ }
+ });
+
+ topologyEventReceiver.addEventListener(new
MemberActivatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ try {
+ MemberActivatedEvent
memberActivatedEvent = (MemberActivatedEvent) event;
+ String clusterId =
memberActivatedEvent.getClusterId();
+ AbstractClusterMonitor monitor;
+ AutoscalerContext asCtx =
AutoscalerContext.getInstance();
+ monitor =
asCtx.getClusterMonitor(clusterId);
+ if (null == monitor) {
+ if (log.isDebugEnabled()) {
+
log.debug(String.format("A cluster monitor is not found in autoscaler context "
+
+ "[cluster] %s", clusterId));
+ }
+ return;
+ }
+
monitor.handleMemberActivatedEvent(memberActivatedEvent);
+ } catch (Exception e) {
+ String msg = "Error processing event "
+ e.getLocalizedMessage();
+ log.error(msg, e);
+ }
+ }
+ });
+
+ topologyEventReceiver.addEventListener(new
MemberMaintenanceListener() {
+ @Override
+ protected void onEvent(Event event) {
+ try {
+ MemberMaintenanceModeEvent
maintenanceModeEvent = (MemberMaintenanceModeEvent) event;
+ String clusterId =
maintenanceModeEvent.getClusterId();
+ AbstractClusterMonitor monitor;
+ AutoscalerContext asCtx =
AutoscalerContext.getInstance();
+ monitor =
asCtx.getClusterMonitor(clusterId);
+ if (null == monitor) {
+ if (log.isDebugEnabled()) {
+
log.debug(String.format("A cluster monitor is not found in autoscaler context "
+
+ "[cluster] %s", clusterId));
+ }
+ return;
+ }
+
monitor.handleMemberMaintenanceModeEvent(maintenanceModeEvent);
+ } catch (Exception e) {
+ String msg = "Error processing event "
+ e.getLocalizedMessage();
+ log.error(msg, e);
+ }
+ }
+ });
+
+ topologyEventReceiver.addEventListener(new
ClusterInstanceCreatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+
+ ClusterInstanceCreatedEvent
clusterInstanceCreatedEvent =
+ (ClusterInstanceCreatedEvent)
event;
+ AbstractClusterMonitor clusterMonitor =
AutoscalerContext.getInstance().
+
getClusterMonitor(clusterInstanceCreatedEvent.getClusterId());
+
+ if (clusterMonitor != null) {
+
TopologyManager.acquireReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(),
+
clusterInstanceCreatedEvent.getClusterId());
+
+ try {
+ Service service =
TopologyManager.getTopology().
+
getService(clusterInstanceCreatedEvent.getServiceName());
+
+ if (service != null) {
+ Cluster cluster =
service.getCluster(clusterInstanceCreatedEvent.getClusterId());
+ if (cluster != null) {
+ // create and
add Cluster Context
+ try {
+ if
(cluster.isKubernetesCluster()) {
+
clusterMonitor.addClusterContextForInstance(
+
clusterInstanceCreatedEvent.getInstanceId(),
+
ClusterContextFactory.getKubernetesClusterContext(cluster));
+ } else
if (cluster.isLbCluster()) {
+
clusterMonitor.addClusterContextForInstance(
+
clusterInstanceCreatedEvent.getInstanceId(),
+
ClusterContextFactory.getVMLBClusterContext(cluster));
+ } else {
+
clusterMonitor.addClusterContextForInstance(
+
clusterInstanceCreatedEvent.getInstanceId(),
+
ClusterContextFactory.getVMServiceClusterContext(cluster));
+ }
+
+ if
(clusterMonitor.hasMonitoringStarted().compareAndSet(false, true)) {
+
clusterMonitor.startScheduler();
+
log.info("Monitoring task for Cluster Monitor with cluster id " +
+
clusterInstanceCreatedEvent.getClusterId() + " started successfully");
+ }
+
+ } catch
(PolicyValidationException e) {
+
log.error(e.getMessage(), e);
+ } catch
(PartitionValidationException e) {
+
log.error(e.getMessage(), e);
+ }
+
+ } else {
+
log.error("Cluster not found for " + clusterInstanceCreatedEvent.getClusterId()
+
+ ", no
cluster instance added to ClusterMonitor " +
+
clusterInstanceCreatedEvent.getClusterId());
+ }
+ } else {
+ log.error("Service " +
clusterInstanceCreatedEvent.getServiceName() +
+ " not found,
no cluster instance added to ClusterMonitor " +
+
clusterInstanceCreatedEvent.getClusterId());
+ }
+
+ } finally {
+
TopologyManager.releaseReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(),
+
clusterInstanceCreatedEvent.getClusterId());
+ }
+
+ } else {
+ log.error("No Cluster Monitor found for
cluster id " +
+
clusterInstanceCreatedEvent.getClusterId());
+ }
+ }
+ });
+ }
+
+ /**
+ * Terminate load balancer topology receiver thread.
+ */
+ public void terminate() {
+ topologyEventReceiver.terminate();
+ terminated = true;
+ }
+
+ protected synchronized void startApplicationMonitor(String
applicationId) {
+ Thread th = null;
+ if
(AutoscalerContext.getInstance().getAppMonitor(applicationId) == null) {
+ th = new Thread(new
ApplicationMonitorAdder(applicationId));
+ }
+ if (th != null) {
+ th.start();
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String
+ .format("Application
monitor thread already exists: " +
+
"[application] %s ", applicationId));
+ }
+ }
+ }
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
public ExecutorService getExecutorService() {
return executorService;
@@ -531,4 +987,65 @@ public class AutoscalerTopologyEventReceiver{
public void setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
}
+<<<<<<< HEAD
+=======
+
+ private class ApplicationMonitorAdder implements Runnable {
+ private String appId;
+
+ public ApplicationMonitorAdder(String appId) {
+ this.appId = appId;
+ }
+
+ public void run() {
+ ApplicationMonitor applicationMonitor = null;
+ int retries = 5;
+ boolean success = false;
+ do {
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e1) {
+ }
+ try {
+ long start = System.currentTimeMillis();
+ if (log.isDebugEnabled()) {
+ log.debug("application monitor
is going to be started for [application] " +
+ appId);
+ }
+ try {
+ applicationMonitor =
MonitorFactory.getApplicationMonitor(appId);
+ } catch (PolicyValidationException e) {
+ String msg = "Application
monitor creation failed for Application: ";
+ log.warn(msg, e);
+ retries--;
+ }
+ long end = System.currentTimeMillis();
+ log.info("Time taken to start app
monitor: " + (end - start) / 1000);
+ success = true;
+ } catch (DependencyBuilderException e) {
+ String msg = "Application monitor
creation failed for Application: ";
+ log.warn(msg, e);
+ retries--;
+ } catch (TopologyInConsistentException e) {
+ String msg = "Application monitor
creation failed for Application: ";
+ log.warn(msg, e);
+ retries--;
+ }
+ } while (!success && retries != 0);
+
+ if (applicationMonitor == null) {
+ String msg = "Application monitor creation
failed, even after retrying for 5 times, "
+ + "for Application: " + appId;
+ log.error(msg);
+ throw new RuntimeException(msg);
+ }
+
+
AutoscalerContext.getInstance().addAppMonitor(applicationMonitor);
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Application monitor has
been added successfully: " +
+ "[application] %s",
applicationMonitor.getId()));
+ }
+ }
+ }
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/8012f8c8/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
index 3694066..2e443de 100644
---
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
+++
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
@@ -65,16 +65,16 @@ public class AutoscalerServerComponent {
private static final String THREAD_IDENTIFIER_KEY =
"threadPool.autoscaler.identifier";
private static final String DEFAULT_IDENTIFIER = "Auto-Scaler";
- private static final String THREAD_POOL_SIZE_KEY=
"threadPool.autoscaler.threadPoolSize";
+ private static final String THREAD_POOL_SIZE_KEY =
"threadPool.autoscaler.threadPoolSize";
private static final String COMPONENTS_CONFIG = "components-config";
private static final int THREAD_POOL_SIZE = 10;
private static final Log log =
LogFactory.getLog(AutoscalerServerComponent.class);
-
private AutoscalerTopologyEventReceiver asTopologyReceiver;
- private AutoscalerHealthStatEventReceiver
autoscalerHealthStatEventReceiver;
+ private AutoscalerHealthStatEventReceiver
autoscalerHealthStatEventReceiver;
protected void activate(ComponentContext componentContext) throws
Exception {
+<<<<<<< HEAD
try {
// Start topology receiver
XMLConfiguration conf =
ConfUtil.getInstance(COMPONENTS_CONFIG).getConfiguration();
@@ -197,3 +197,128 @@ public class AutoscalerServerComponent {
ServiceReferenceHolder.getInstance().setTaskService(null);
}
}
+=======
+ try {
+ // Start topology receiver
+ XMLConfiguration conf =
ConfUtil.getInstance(COMPONENTS_CONFIG).getConfiguration();
+ int threadPoolSize = conf.getInt(THREAD_POOL_SIZE_KEY,
THREAD_POOL_SIZE);
+ String threadIdentifier =
conf.getString(THREAD_IDENTIFIER_KEY, DEFAULT_IDENTIFIER);
+ ExecutorService executorService =
StratosThreadPool.getExecutorService(threadIdentifier, threadPoolSize);
+ asTopologyReceiver = new
AutoscalerTopologyEventReceiver();
+ asTopologyReceiver.setExecutorService(executorService);
+ asTopologyReceiver.execute();
+
+ if (log.isDebugEnabled()) {
+ log.debug("Topology receiver executor service
started");
+ }
+
+ // Start health stat receiver
+ autoscalerHealthStatEventReceiver = new
AutoscalerHealthStatEventReceiver();
+ Thread healthDelegatorThread = new
Thread(autoscalerHealthStatEventReceiver);
+ healthDelegatorThread.start();
+ if (log.isDebugEnabled()) {
+ log.debug("Health statistics receiver thread
started");
+ }
+
+ // Adding the registry stored partitions to the
information model
+ List<Partition> partitions =
RegistryManager.getInstance().retrievePartitions();
+ Iterator<Partition> partitionIterator =
partitions.iterator();
+ while (partitionIterator.hasNext()) {
+ Partition partition = partitionIterator.next();
+
PartitionManager.getInstance().addPartitionToInformationModel(partition);
+ }
+
+ // Adding the network partitions stored in registry to
the information model
+ List<NetworkPartitionLbHolder> nwPartitionHolders =
+
RegistryManager.getInstance().retrieveNetworkPartitionLbHolders();
+ Iterator<NetworkPartitionLbHolder> nwPartitionIterator
= nwPartitionHolders.iterator();
+ while (nwPartitionIterator.hasNext()) {
+ NetworkPartitionLbHolder nwPartition =
nwPartitionIterator.next();
+
PartitionManager.getInstance().addNetworkPartitionLbHolder(nwPartition);
+ }
+
+ List<AutoscalePolicy> asPolicies =
RegistryManager.getInstance().retrieveASPolicies();
+ Iterator<AutoscalePolicy> asPolicyIterator =
asPolicies.iterator();
+ while (asPolicyIterator.hasNext()) {
+ AutoscalePolicy asPolicy =
asPolicyIterator.next();
+
PolicyManager.getInstance().addASPolicyToInformationModel(asPolicy);
+ }
+
+ List<DeploymentPolicy> depPolicies =
RegistryManager.getInstance().retrieveDeploymentPolicies();
+ Iterator<DeploymentPolicy> depPolicyIterator =
depPolicies.iterator();
+ while (depPolicyIterator.hasNext()) {
+ DeploymentPolicy depPolicy =
depPolicyIterator.next();
+
PolicyManager.getInstance().addDeploymentPolicyToInformationModel(depPolicy);
+ }
+
+ // Adding KubernetesGroups stored in registry to the
information model
+ List<KubernetesGroup> kubernetesGroupList =
RegistryManager.getInstance().retrieveKubernetesGroups();
+ Iterator<KubernetesGroup> kubernetesGroupIterator =
kubernetesGroupList.iterator();
+ while (kubernetesGroupIterator.hasNext()) {
+ KubernetesGroup kubernetesGroup =
kubernetesGroupIterator.next();
+
KubernetesManager.getInstance().addNewKubernetesGroup(kubernetesGroup);
+ }
+
+ //starting the processor chain
+ ClusterStatusProcessorChain clusterStatusProcessorChain
= new ClusterStatusProcessorChain();
+
ServiceReferenceHolder.getInstance().setClusterStatusProcessorChain(clusterStatusProcessorChain);
+
+ GroupStatusProcessorChain groupStatusProcessorChain =
new GroupStatusProcessorChain();
+
ServiceReferenceHolder.getInstance().setGroupStatusProcessorChain(groupStatusProcessorChain);
+
+ if (log.isInfoEnabled()) {
+ log.info("Scheduling tasks to publish
applications");
+ }
+
+ ApplicationSynchronizerTaskScheduler
+
.schedule(ServiceReferenceHolder.getInstance()
+
.getTaskService());
+
+ if (log.isInfoEnabled()) {
+ log.info("Autoscaler server Component
activated");
+ }
+ } catch (Throwable e) {
+ log.error("Error in activating the autoscaler component
", e);
+ }
+ }
+
+ protected void deactivate(ComponentContext context) {
+ asTopologyReceiver.terminate();
+ autoscalerHealthStatEventReceiver.terminate();
+ }
+
+ protected void setRegistryService(RegistryService registryService) {
+ if (log.isDebugEnabled()) {
+ log.debug("Setting the Registry Service");
+ }
+ try {
+
ServiceReferenceHolder.getInstance().setRegistry(registryService.getGovernanceSystemRegistry());
+ } catch (RegistryException e) {
+ String msg = "Failed when retrieving Governance System
Registry.";
+ log.error(msg, e);
+ throw new AutoScalerException(msg, e);
+ }
+ }
+
+ protected void unsetRegistryService(RegistryService registryService) {
+ if (log.isDebugEnabled()) {
+ log.debug("Un-setting the Registry Service");
+ }
+ ServiceReferenceHolder.getInstance().setRegistry(null);
+ }
+
+ protected void setTaskService(TaskService taskService) {
+ if (log.isDebugEnabled()) {
+ log.debug("Setting the Task Service");
+ }
+
ServiceReferenceHolder.getInstance().setTaskService(taskService);
+ }
+
+ protected void unsetTaskService(TaskService taskService) {
+ if (log.isDebugEnabled()) {
+ log.debug("Un-setting the Task Service");
+ }
+ ServiceReferenceHolder.getInstance().setTaskService(null);
+ }
+}
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
http://git-wip-us.apache.org/repos/asf/stratos/blob/8012f8c8/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
index a47e924..dfbf1ec 100644
---
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
+++
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
@@ -20,9 +20,12 @@ package org.apache.stratos.cloud.controller.internal;
*
*/
+<<<<<<< HEAD
import com.hazelcast.core.HazelcastInstance;
+=======
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.cloud.controller.context.CloudControllerContext;
@@ -49,6 +52,7 @@ import org.wso2.carbon.utils.ConfigurationContextService;
* Registering Cloud Controller Service.
*
* @scr.component name="org.apache.stratos.cloud.controller" immediate="true"
+<<<<<<< HEAD
* @scr.reference name="distributedObjectProvider"
interface="org.apache.stratos.common.clustering.DistributedObjectProvider"
* cardinality="1..1" policy="dynamic"
bind="setDistributedObjectProvider" unbind="unsetDistributedObjectProvider"
* @scr.reference name="ntask.component"
interface="org.wso2.carbon.ntask.core.service.TaskService"
@@ -57,42 +61,72 @@ import org.wso2.carbon.utils.ConfigurationContextService;
* cardinality="1..1" policy="dynamic"
bind="setRegistryService" unbind="unsetRegistryService"
* @scr.reference name="config.context.service"
interface="org.wso2.carbon.utils.ConfigurationContextService"
* cardinality="1..1" policy="dynamic"
bind="setConfigurationContextService" unbind="unsetConfigurationContextService"
+=======
+ * @scr.reference name="distributedMapProvider"
interface="org.wso2.carbon.caching.impl.DistributedMapProvider"
+ * cardinality="1..1" policy="dynamic" bind="setDistributedMapProvider"
unbind="unsetDistributedMapProvider"
+ * @scr.reference name="ntask.component"
+ * interface="org.wso2.carbon.ntask.core.service.TaskService"
+ * cardinality="1..1" policy="dynamic" bind="setTaskService"
unbind="unsetTaskService"
+ * @scr.reference name="registry.service"
+ * interface="org.wso2.carbon.registry.core.service.RegistryService"
+ * cardinality="1..1" policy="dynamic" bind="setRegistryService"
unbind="unsetRegistryService"
+ * @scr.reference name="config.context.service"
+ * interface="org.wso2.carbon.utils.ConfigurationContextService"
+ * cardinality="1..1" policy="dynamic" bind="setConfigurationContextService"
unbind="unsetConfigurationContextService"
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
*/
public class CloudControllerServiceComponent {
- private static final Log log =
LogFactory.getLog(CloudControllerServiceComponent.class);
- private ClusterStatusTopicReceiver clusterStatusTopicReceiver;
- private InstanceStatusTopicReceiver instanceStatusTopicReceiver;
- private ApplicationTopicReceiver applicationTopicReceiver;
+ private static final Log log =
LogFactory.getLog(CloudControllerServiceComponent.class);
+ private ClusterStatusTopicReceiver clusterStatusTopicReceiver;
+ private InstanceStatusTopicReceiver instanceStatusTopicReceiver;
+ private ApplicationTopicReceiver applicationTopicReceiver;
- protected void activate(ComponentContext context) {
- try {
- applicationTopicReceiver = new ApplicationTopicReceiver();
- applicationTopicReceiver.execute();
+ protected void activate(ComponentContext context) {
+ try {
+ applicationTopicReceiver = new
ApplicationTopicReceiver();
+ applicationTopicReceiver.execute();
+ if (log.isInfoEnabled()) {
+ log.info("Application Receiver thread started");
+ }
+
+<<<<<<< HEAD
if (log.isInfoEnabled()) {
log.info("Application event receiver thread started");
}
+=======
+ clusterStatusTopicReceiver = new
ClusterStatusTopicReceiver();
+ clusterStatusTopicReceiver.execute();
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
- clusterStatusTopicReceiver = new ClusterStatusTopicReceiver();
- clusterStatusTopicReceiver.execute();
+ if (log.isInfoEnabled()) {
+ log.info("Cluster status Receiver thread
started");
+ }
+<<<<<<< HEAD
if (log.isInfoEnabled()) {
log.info("Cluster status receiver thread started");
}
+=======
+ instanceStatusTopicReceiver = new
InstanceStatusTopicReceiver();
+ instanceStatusTopicReceiver.execute();
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
- instanceStatusTopicReceiver = new InstanceStatusTopicReceiver();
- instanceStatusTopicReceiver.execute();
+ if (log.isInfoEnabled()) {
+ log.info("Instance status message receiver
thread started");
+ }
- if(log.isInfoEnabled()) {
- log.info("Instance status message receiver thread started");
- }
+ // Register cloud controller service
+ BundleContext bundleContext =
context.getBundleContext();
+
bundleContext.registerService(CloudControllerService.class.getName(),
+ new
CloudControllerServiceImpl(), null);
- // Register cloud controller service
- BundleContext bundleContext = context.getBundleContext();
-
bundleContext.registerService(CloudControllerService.class.getName(),
- new CloudControllerServiceImpl(), null);
+ if (log.isInfoEnabled()) {
+ log.info("Scheduling tasks");
+ }
+<<<<<<< HEAD
if(log.isInfoEnabled()) {
log.info("Scheduling tasks");
}
@@ -123,29 +157,71 @@ public class CloudControllerServiceComponent {
ServiceReferenceHolder.getInstance().setTaskService(null);
}
+=======
+ TopologySynchronizerTaskScheduler
+
.schedule(ServiceReferenceHolder.getInstance()
+
.getTaskService());
+
+ } catch (Throwable e) {
+ log.error("******* Cloud Controller Service bundle is
failed to activate ****", e);
+ }
+ }
+
+ protected void setTaskService(TaskService taskService) {
+ if (log.isDebugEnabled()) {
+ log.debug("Setting the Task Service");
+ }
+
ServiceReferenceHolder.getInstance().setTaskService(taskService);
+ }
+
+ protected void unsetTaskService(TaskService taskService) {
+ if (log.isDebugEnabled()) {
+ log.debug("Unsetting the Task Service");
+ }
+ ServiceReferenceHolder.getInstance().setTaskService(null);
+ }
+
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
protected void setRegistryService(RegistryService registryService) {
if (log.isDebugEnabled()) {
log.debug("Setting the Registry Service");
}
-
- try {
+
+ try {
UserRegistry registry =
registryService.getGovernanceSystemRegistry();
+<<<<<<< HEAD
ServiceReferenceHolder.getInstance().setRegistry(registry);
} catch (RegistryException e) {
String msg = "Failed when retrieving Governance System
Registry.";
log.error(msg, e);
throw new CloudControllerException(msg, e);
}
+=======
+ ServiceReferenceHolder.getInstance()
+ .setRegistry(registry);
+ } catch (RegistryException e) {
+ String msg = "Failed when retrieving Governance System
Registry.";
+ log.error(msg, e);
+ throw new CloudControllerException(msg, e);
+ }
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
}
protected void unsetRegistryService(RegistryService registryService) {
if (log.isDebugEnabled()) {
+<<<<<<< HEAD
log.debug("Un-setting the Registry Service");
}
ServiceReferenceHolder.getInstance().setRegistry(null);
+=======
+ log.debug("Unsetting the Registry Service");
+ }
+ ServiceReferenceHolder.getInstance().setRegistry(null);
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
}
-
+
protected void
setConfigurationContextService(ConfigurationContextService cfgCtxService) {
+<<<<<<< HEAD
ServiceReferenceHolder.getInstance().setAxisConfiguration(
cfgCtxService.getServerConfigContext().getAxisConfiguration());
}
@@ -162,8 +238,26 @@ public class CloudControllerServiceComponent {
ServiceReferenceHolder.getInstance().setDistributedObjectProvider(null);
}
+=======
+ ServiceReferenceHolder.getInstance().setAxisConfiguration(
+
cfgCtxService.getServerConfigContext().getAxisConfiguration());
+ }
+
+ protected void
unsetConfigurationContextService(ConfigurationContextService cfgCtxService) {
+ ServiceReferenceHolder.getInstance().setAxisConfiguration(null);
+ }
+
+ protected void setDistributedMapProvider(DistributedMapProvider
mapProvider) {
+
ServiceReferenceHolder.getInstance().setDistributedMapProvider(mapProvider);
+ }
+
+ protected void unsetDistributedMapProvider(DistributedMapProvider
mapProvider) {
+
ServiceReferenceHolder.getInstance().setDistributedMapProvider(null);
+ }
+
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
protected void deactivate(ComponentContext ctx) {
- // Close event publisher connections to message broker
- EventPublisherPool.close(Util.Topics.TOPOLOGY_TOPIC.getTopicName());
+ // Close event publisher connections to message broker
+
EventPublisherPool.close(Util.Topics.TOPOLOGY_TOPIC.getTopicName());
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/8012f8c8/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java
index 87dfe6e..d65b7f5 100644
---
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java
+++
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java
@@ -29,45 +29,44 @@ import
org.apache.stratos.messaging.message.receiver.applications.ApplicationsEv
/**
* This is to receive the application topic messages.
*/
-public class ApplicationTopicReceiver{
- private static final Log log =
LogFactory.getLog(ApplicationTopicReceiver.class);
- private ApplicationsEventReceiver applicationsEventReceiver;
- private boolean terminated;
+public class ApplicationTopicReceiver {
+ private static final Log log =
LogFactory.getLog(ApplicationTopicReceiver.class);
+ private ApplicationsEventReceiver applicationsEventReceiver;
+ private boolean terminated;
- public ApplicationTopicReceiver() {
- this.applicationsEventReceiver = new ApplicationsEventReceiver();
- addEventListeners();
+ public ApplicationTopicReceiver() {
+ this.applicationsEventReceiver = new
ApplicationsEventReceiver();
+ addEventListeners();
- }
+ }
-
+ public void execute() {
- public void execute() {
+ if (log.isInfoEnabled()) {
+ log.info("Cloud controller application status thread
started");
+ }
+ applicationsEventReceiver.execute();
- if (log.isInfoEnabled()) {
- log.info("Cloud controller application status thread started");
- }
- applicationsEventReceiver.execute();
+ if (log.isInfoEnabled()) {
+ log.info("Cloud controller application status thread
terminated");
+ }
- if (log.isInfoEnabled()) {
- log.info("Cloud controller application status thread terminated");
- }
+ }
- }
- private void addEventListeners() {
- applicationsEventReceiver.addEventListener(new
ApplicationTerminatedEventListener() {
- @Override
- protected void onEvent(Event event) {
- //Remove the application related data
- ApplicationTerminatedEvent terminatedEvent =
(ApplicationTerminatedEvent)event;
- log.info("ApplicationTerminatedEvent received for
[application] " + terminatedEvent.getAppId());
- String appId = terminatedEvent.getAppId();
- TopologyBuilder.handleApplicationClustersRemoved(appId,
terminatedEvent.getClusterData());
- }
- });
- }
+ private void addEventListeners() {
+ applicationsEventReceiver.addEventListener(new
ApplicationTerminatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ //Remove the application related data
+ ApplicationTerminatedEvent terminatedEvent =
(ApplicationTerminatedEvent) event;
+ log.info("ApplicationTerminatedEvent received
for [application] " + terminatedEvent.getAppId());
+ String appId = terminatedEvent.getAppId();
+
TopologyBuilder.handleApplicationClustersRemoved(appId,
terminatedEvent.getClusterData());
+ }
+ });
+ }
- public void setTerminated(boolean terminated) {
- this.terminated = terminated;
- }
+ public void setTerminated(boolean terminated) {
+ this.terminated = terminated;
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/8012f8c8/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
index bd2fbf0..ca6d4ad 100644
---
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
+++
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
@@ -26,16 +26,16 @@ import org.apache.stratos.messaging.event.cluster.status.*;
import org.apache.stratos.messaging.listener.cluster.status.*;
import
org.apache.stratos.messaging.message.receiver.cluster.status.ClusterStatusEventReceiver;
-public class ClusterStatusTopicReceiver{
- private static final Log log =
LogFactory.getLog(ClusterStatusTopicReceiver.class);
+public class ClusterStatusTopicReceiver {
+ private static final Log log =
LogFactory.getLog(ClusterStatusTopicReceiver.class);
- private ClusterStatusEventReceiver statusEventReceiver;
- private boolean terminated;
+ private ClusterStatusEventReceiver statusEventReceiver;
+ private boolean terminated;
- public ClusterStatusTopicReceiver() {
- this.statusEventReceiver = new ClusterStatusEventReceiver();
- addEventListeners();
- }
+ public ClusterStatusTopicReceiver() {
+ this.statusEventReceiver = new ClusterStatusEventReceiver();
+ addEventListeners();
+ }
public void execute() {
@@ -47,58 +47,58 @@ public class ClusterStatusTopicReceiver{
}
private void addEventListeners() {
- // Listen to topology events that affect clusters
- statusEventReceiver.addEventListener(new
ClusterStatusClusterResetEventListener() {
- @Override
- protected void onEvent(Event event) {
-
TopologyBuilder.handleClusterReset((ClusterStatusClusterResetEvent) event);
- }
- });
+ // Listen to topology events that affect clusters
+ statusEventReceiver.addEventListener(new
ClusterStatusClusterResetEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+
TopologyBuilder.handleClusterReset((ClusterStatusClusterResetEvent) event);
+ }
+ });
- statusEventReceiver.addEventListener(new
ClusterStatusClusterInstanceCreatedEventListener() {
- @Override
- protected void onEvent(Event event) {
-
//TopologyBuilder.handleClusterInstanceCreated((ClusterStatusClusterInstanceCreatedEvent)
event);
- }
- });
+ statusEventReceiver.addEventListener(new
ClusterStatusClusterInstanceCreatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+
//TopologyBuilder.handleClusterInstanceCreated((ClusterStatusClusterInstanceCreatedEvent)
event);
+ }
+ });
- statusEventReceiver.addEventListener(new
ClusterStatusClusterCreatedEventListener() {
- @Override
- protected void onEvent(Event event) {
-
TopologyBuilder.handleClusterCreated((ClusterStatusClusterCreatedEvent) event);
- }
- });
+ statusEventReceiver.addEventListener(new
ClusterStatusClusterCreatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+
TopologyBuilder.handleClusterCreated((ClusterStatusClusterCreatedEvent) event);
+ }
+ });
- statusEventReceiver.addEventListener(new
ClusterStatusClusterActivatedEventListener() {
- @Override
- protected void onEvent(Event event) {
-
TopologyBuilder.handleClusterActivatedEvent((ClusterStatusClusterActivatedEvent)
event);
- }
- });
+ statusEventReceiver.addEventListener(new
ClusterStatusClusterActivatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+
TopologyBuilder.handleClusterActivatedEvent((ClusterStatusClusterActivatedEvent)
event);
+ }
+ });
- statusEventReceiver.addEventListener(new
ClusterStatusClusterTerminatedEventListener() {
- @Override
- protected void onEvent(Event event) {
-
TopologyBuilder.handleClusterTerminatedEvent((ClusterStatusClusterTerminatedEvent)
event);
- }
- });
+ statusEventReceiver.addEventListener(new
ClusterStatusClusterTerminatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+
TopologyBuilder.handleClusterTerminatedEvent((ClusterStatusClusterTerminatedEvent)
event);
+ }
+ });
- statusEventReceiver.addEventListener(new
ClusterStatusClusterTerminatingEventListener() {
- @Override
- protected void onEvent(Event event) {
-
TopologyBuilder.handleClusterTerminatingEvent((ClusterStatusClusterTerminatingEvent)
event);
- }
- });
+ statusEventReceiver.addEventListener(new
ClusterStatusClusterTerminatingEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+
TopologyBuilder.handleClusterTerminatingEvent((ClusterStatusClusterTerminatingEvent)
event);
+ }
+ });
- statusEventReceiver.addEventListener(new
ClusterStatusClusterInactivateEventListener() {
- @Override
- protected void onEvent(Event event) {
-
TopologyBuilder.handleClusterInActivateEvent((ClusterStatusClusterInactivateEvent)
event);
- }
- });
- }
+ statusEventReceiver.addEventListener(new
ClusterStatusClusterInactivateEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+
TopologyBuilder.handleClusterInActivateEvent((ClusterStatusClusterInactivateEvent)
event);
+ }
+ });
+ }
- public void setTerminated(boolean terminated) {
- this.terminated = terminated;
- }
+ public void setTerminated(boolean terminated) {
+ this.terminated = terminated;
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/8012f8c8/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
index d5475f0..42aabed 100644
---
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
+++
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
@@ -35,71 +35,67 @@ import
org.apache.stratos.messaging.message.receiver.instance.status.InstanceSta
/**
* This will handle the instance status events
*/
-public class InstanceStatusTopicReceiver{
- private static final Log log =
LogFactory.getLog(InstanceStatusTopicReceiver.class);
-
- private InstanceStatusEventReceiver statusEventReceiver;
- private boolean terminated;
-
- public InstanceStatusTopicReceiver() {
- this.statusEventReceiver = new InstanceStatusEventReceiver();
- addEventListeners();
- }
-
-
-
- public void execute() {
- statusEventReceiver.execute();
- if (log.isInfoEnabled()) {
- log.info("Cloud controller application status thread started");
- }
-
-
- if (log.isInfoEnabled()) {
- log.info("Cloud controller application status thread terminated");
- }
- }
-
- private void addEventListeners() {
- statusEventReceiver.addEventListener(new
InstanceActivatedEventListener() {
- @Override
- protected void onEvent(Event event) {
- TopologyBuilder.handleMemberActivated((InstanceActivatedEvent)
event);
- }
- });
-
- statusEventReceiver.addEventListener(new
InstanceStartedEventListener() {
- @Override
- protected void onEvent(Event event) {
- TopologyBuilder.handleMemberStarted((InstanceStartedEvent)
event);
- }
- });
-
- statusEventReceiver.addEventListener(new
InstanceReadyToShutdownEventListener() {
- @Override
- protected void onEvent(Event event) {
- try {
-
TopologyBuilder.handleMemberReadyToShutdown((InstanceReadyToShutdownEvent)
event);
- } catch (Exception e) {
- String error = "Failed to retrieve the instance status
event message";
- log.error(error, e);
- }
- }
- });
-
- statusEventReceiver.addEventListener(new InstanceMaintenanceListener()
{
- @Override
- protected void onEvent(Event event) {
- try {
-
TopologyBuilder.handleMemberMaintenance((InstanceMaintenanceModeEvent) event);
- } catch (Exception e) {
- String error = "Failed to retrieve the instance status event
message";
- log.error(error, e);
- }
- }
- });
-
-
- }
+public class InstanceStatusTopicReceiver {
+ private static final Log log =
LogFactory.getLog(InstanceStatusTopicReceiver.class);
+
+ private InstanceStatusEventReceiver statusEventReceiver;
+ private boolean terminated;
+
+ public InstanceStatusTopicReceiver() {
+ this.statusEventReceiver = new InstanceStatusEventReceiver();
+ addEventListeners();
+ }
+
+ public void execute() {
+ statusEventReceiver.execute();
+ if (log.isInfoEnabled()) {
+ log.info("Cloud controller application status thread
started");
+ }
+
+ if (log.isInfoEnabled()) {
+ log.info("Cloud controller application status thread
terminated");
+ }
+ }
+
+ private void addEventListeners() {
+ statusEventReceiver.addEventListener(new
InstanceActivatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+
TopologyBuilder.handleMemberActivated((InstanceActivatedEvent) event);
+ }
+ });
+
+ statusEventReceiver.addEventListener(new
InstanceStartedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+
TopologyBuilder.handleMemberStarted((InstanceStartedEvent) event);
+ }
+ });
+
+ statusEventReceiver.addEventListener(new
InstanceReadyToShutdownEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ try {
+
TopologyBuilder.handleMemberReadyToShutdown((InstanceReadyToShutdownEvent)
event);
+ } catch (Exception e) {
+ String error = "Failed to retrieve the
instance status event message";
+ log.error(error, e);
+ }
+ }
+ });
+
+ statusEventReceiver.addEventListener(new
InstanceMaintenanceListener() {
+ @Override
+ protected void onEvent(Event event) {
+ try {
+
TopologyBuilder.handleMemberMaintenance((InstanceMaintenanceModeEvent) event);
+ } catch (Exception e) {
+ String error = "Failed to retrieve the
instance status event message";
+ log.error(error, e);
+ }
+ }
+ });
+
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/8012f8c8/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
index e74721f..188b2ac 100644
---
a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
+++
b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
@@ -35,136 +35,134 @@ import java.util.concurrent.ExecutorService;
* received from the message broker.
*/
public class LoadBalancerExtension implements Runnable {
- private static final Log log =
LogFactory.getLog(LoadBalancerExtension.class);
-
- private LoadBalancer loadBalancer;
- private LoadBalancerStatisticsReader statsReader;
- private boolean loadBalancerStarted;
- private TopologyEventReceiver topologyEventReceiver;
- private LoadBalancerStatisticsNotifier statisticsNotifier;
- private boolean terminated;
+ private static final Log log =
LogFactory.getLog(LoadBalancerExtension.class);
+
+ private LoadBalancer loadBalancer;
+ private LoadBalancerStatisticsReader statsReader;
+ private boolean loadBalancerStarted;
+ private TopologyEventReceiver topologyEventReceiver;
+ private LoadBalancerStatisticsNotifier statisticsNotifier;
+ private boolean terminated;
private ExecutorService executorService;
- /**
- * Load balancer extension constructor.
- * @param loadBalancer Load balancer instance: Mandatory.
- * @param statsReader Statistics reader: If null statistics notifier
thread will not be started.
- */
- public LoadBalancerExtension(LoadBalancer loadBalancer,
LoadBalancerStatisticsReader statsReader) {
- this.loadBalancer = loadBalancer;
- this.statsReader = statsReader;
- }
-
- @Override
- public void run() {
- try {
- if(log.isInfoEnabled()) {
- log.info("Load balancer extension started");
- }
-
- // Start topology receiver thread
- topologyEventReceiver = new TopologyEventReceiver();
- addEventListeners();
- topologyEventReceiver.setExecutorService(executorService);
- topologyEventReceiver.execute();
-
-
- if(statsReader != null) {
- // Start stats notifier thread
- statisticsNotifier = new
LoadBalancerStatisticsNotifier(statsReader);
- Thread statsNotifierThread = new Thread(statisticsNotifier);
- statsNotifierThread.start();
- }
- else {
- if(log.isWarnEnabled()) {
- log.warn("Load balancer statistics reader not found");
- }
- }
-
- // Keep the thread live until terminated
- while (!terminated);
- } catch (Exception e) {
- if (log.isErrorEnabled()) {
- log.error("Could not start load balancer extension", e);
- }
- }
- }
-
- private void addEventListeners() {
- topologyEventReceiver.addEventListener(new
CompleteTopologyEventListener() {
-
- @Override
- protected void onEvent(Event event) {
- try {
-
- if (!loadBalancerStarted) {
- // Configure load balancer
- loadBalancer.configure(TopologyManager.getTopology());
-
- // Start load balancer
- loadBalancer.start();
- loadBalancerStarted = true;
- }
- } catch (Exception e) {
- if (log.isErrorEnabled()) {
- log.error("Could not start load balancer", e);
- }
- terminate();
- }
- }
- });
- topologyEventReceiver.addEventListener(new
MemberActivatedEventListener() {
- @Override
- protected void onEvent(Event event) {
- reloadConfiguration();
- }
- });
- topologyEventReceiver.addEventListener(new
MemberSuspendedEventListener() {
- @Override
- protected void onEvent(Event event) {
- reloadConfiguration();
- }
- });
- topologyEventReceiver.addEventListener(new
MemberTerminatedEventListener() {
- @Override
- protected void onEvent(Event event) {
- reloadConfiguration();
- }
- });
- topologyEventReceiver.addEventListener(new
ClusterRemovedEventListener() {
- @Override
- protected void onEvent(Event event) {
- reloadConfiguration();
- }
- });
- topologyEventReceiver.addEventListener(new
ServiceRemovedEventListener() {
- @Override
- protected void onEvent(Event event) {
- reloadConfiguration();
- }
- });
- }
-
- private void reloadConfiguration() {
- try {
- if (loadBalancerStarted) {
- loadBalancer.reload(TopologyManager.getTopology());
- }
- } catch (Exception e) {
- if (log.isErrorEnabled()) {
- log.error("Could not reload load balancer configuration", e);
- }
- }
- }
-
- public void terminate() {
- if(topologyEventReceiver != null) {
- topologyEventReceiver.terminate();
- }
- if(statisticsNotifier != null) {
- statisticsNotifier.terminate();
- }
- terminated = true;
- }
+
+ /**
+ * Load balancer extension constructor.
+ *
+ * @param loadBalancer Load balancer instance: Mandatory.
+ * @param statsReader Statistics reader: If null statistics notifier
thread will not be started.
+ */
+ public LoadBalancerExtension(LoadBalancer loadBalancer,
LoadBalancerStatisticsReader statsReader) {
+ this.loadBalancer = loadBalancer;
+ this.statsReader = statsReader;
+ }
+
+ @Override
+ public void run() {
+ try {
+ if (log.isInfoEnabled()) {
+ log.info("Load balancer extension started");
+ }
+
+ // Start topology receiver thread
+ topologyEventReceiver = new TopologyEventReceiver();
+ addEventListeners();
+
topologyEventReceiver.setExecutorService(executorService);
+ topologyEventReceiver.execute();
+
+ if (statsReader != null) {
+ // Start stats notifier thread
+ statisticsNotifier = new
LoadBalancerStatisticsNotifier(statsReader);
+ Thread statsNotifierThread = new
Thread(statisticsNotifier);
+ statsNotifierThread.start();
+ } else {
+ if (log.isWarnEnabled()) {
+ log.warn("Load balancer statistics
reader not found");
+ }
+ }
+
+ } catch (Exception e) {
+ if (log.isErrorEnabled()) {
+ log.error("Could not start load balancer
extension", e);
+ }
+ }
+ }
+
+ private void addEventListeners() {
+ topologyEventReceiver.addEventListener(new
CompleteTopologyEventListener() {
+
+ @Override
+ protected void onEvent(Event event) {
+ try {
+
+ if (!loadBalancerStarted) {
+ // Configure load balancer
+
loadBalancer.configure(TopologyManager.getTopology());
+
+ // Start load balancer
+ loadBalancer.start();
+ loadBalancerStarted = true;
+ }
+ } catch (Exception e) {
+ if (log.isErrorEnabled()) {
+ log.error("Could not start load
balancer", e);
+ }
+ terminate();
+ }
+ }
+ });
+ topologyEventReceiver.addEventListener(new
MemberActivatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ reloadConfiguration();
+ }
+ });
+ topologyEventReceiver.addEventListener(new
MemberSuspendedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ reloadConfiguration();
+ }
+ });
+ topologyEventReceiver.addEventListener(new
MemberTerminatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ reloadConfiguration();
+ }
+ });
+ topologyEventReceiver.addEventListener(new
ClusterRemovedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ reloadConfiguration();
+ }
+ });
+ topologyEventReceiver.addEventListener(new
ServiceRemovedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ reloadConfiguration();
+ }
+ });
+ }
+
+ private void reloadConfiguration() {
+ try {
+ if (loadBalancerStarted) {
+
loadBalancer.reload(TopologyManager.getTopology());
+ }
+ } catch (Exception e) {
+ if (log.isErrorEnabled()) {
+ log.error("Could not reload load balancer
configuration", e);
+ }
+ }
+ }
+
+ public void terminate() {
+ if (topologyEventReceiver != null) {
+ topologyEventReceiver.terminate();
+ }
+ if (statisticsNotifier != null) {
+ statisticsNotifier.terminate();
+ }
+ terminated = true;
+ }
public ExecutorService getExecutorService() {
return executorService;
http://git-wip-us.apache.org/repos/asf/stratos/blob/8012f8c8/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
----------------------------------------------------------------------
diff --git
a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
index 56a3fcf..5ee28d1 100644
---
a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
+++
b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
@@ -58,53 +58,56 @@ import java.util.concurrent.TimeUnit;
@SiddhiExtension(namespace = "stratos", function = "faultHandling")
public class FaultHandlingWindowProcessor extends WindowProcessor implements
RunnableWindowProcessor {
- private static final int TIME_OUT = 60 * 1000;
- static final Logger log =
Logger.getLogger(FaultHandlingWindowProcessor.class);
- private ScheduledExecutorService faultHandleScheduler;
- private ThreadBarrier threadBarrier;
- private long timeToKeep;
- private ISchedulerSiddhiQueue<StreamEvent> window;
- private EventPublisher healthStatPublisher =
EventPublisherPool.getPublisher(Util.Topics.HEALTH_STAT_TOPIC.getTopicName());
- private Map<String, Object> MemberFaultEventMap = new HashMap<String,
Object>();
- private Map<String, Object> memberFaultEventMessageMap = new
HashMap<String, Object>();
-
- // Map of member id's to their last received health event time stamp
- private ConcurrentHashMap<String, Long> memberTimeStampMap = new
ConcurrentHashMap<String, Long>();
-
- // Event receiver to receive topology events published by cloud-controller
- private CEPTopologyEventReceiver cepTopologyEventReceiver = new
CEPTopologyEventReceiver(this);
-
- // Stratos member id attribute index in stream execution plan
- private int memberIdAttrIndex;
-
- @Override
- protected void processEvent(InEvent event) {
- addDataToMap(event);
- }
-
- @Override
- protected void processEvent(InListEvent listEvent) {
- for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) {
- addDataToMap((InEvent) listEvent.getEvent(i));
- }
- }
-
- /**
- * Add new entry to time stamp map from the received event.
- *
- * @param event Event received by Siddhi.
- */
- protected void addDataToMap(InEvent event) {
- String id = (String) event.getData()[memberIdAttrIndex];
- //checking whether this member is the topology.
- //sometimes there can be a delay between publishing member terminated
events
- //and actually terminating instances. Hence CEP might get events for
already terminated members
- //so we are checking the topology for the member existence
- Member member = getMemberFromId(id);
- if (null == member) {
+ private static final int TIME_OUT = 60 * 1000;
+ static final Logger log =
Logger.getLogger(FaultHandlingWindowProcessor.class);
+ public static final String IDENTIFIER = "AutoScaler";
+ private ScheduledExecutorService faultHandleScheduler;
+ private ThreadBarrier threadBarrier;
+ private long timeToKeep;
+ private ISchedulerSiddhiQueue<StreamEvent> window;
+ private EventPublisher healthStatPublisher =
+
EventPublisherPool.getPublisher(Util.Topics.HEALTH_STAT_TOPIC.getTopicName());
+ private Map<String, Object> MemberFaultEventMap = new HashMap<String,
Object>();
+ private Map<String, Object> memberFaultEventMessageMap = new
HashMap<String, Object>();
+
+ // Map of member id's to their last received health event time stamp
+ private ConcurrentHashMap<String, Long> memberTimeStampMap = new
ConcurrentHashMap<String, Long>();
+
+ // Event receiver to receive topology events published by
cloud-controller
+ private CEPTopologyEventReceiver cepTopologyEventReceiver = new
CEPTopologyEventReceiver(this);
+
+ // Stratos member id attribute index in stream execution plan
+ private int memberIdAttrIndex;
+
+ @Override
+ protected void processEvent(InEvent event) {
+ addDataToMap(event);
+ }
+
+ @Override
+ protected void processEvent(InListEvent listEvent) {
+ for (int i = 0, size = listEvent.getActiveEvents(); i < size;
i++) {
+ addDataToMap((InEvent) listEvent.getEvent(i));
+ }
+ }
+
+ /**
+ * Add new entry to time stamp map from the received event.
+ *
+ * @param event Event received by Siddhi.
+ */
+ protected void addDataToMap(InEvent event) {
+ String id = (String) event.getData()[memberIdAttrIndex];
+ //checking whether this member is the topology.
+ //sometimes there can be a delay between publishing member
terminated events
+ //and actually terminating instances. Hence CEP might get
events for already terminated members
+ //so we are checking the topology for the member existence
+ Member member = getMemberFromId(id);
+ if (null == member) {
log.debug("Member not found in the toplogy. Event
rejected");
return;
}
+<<<<<<< HEAD
if (StringUtils.isNotEmpty(id)) {
memberTimeStampMap.put(id, event.getTimeStamp());
} else {
@@ -317,4 +320,220 @@ public class FaultHandlingWindowProcessor extends
WindowProcessor implements Run
public ConcurrentHashMap<String, Long> getMemberTimeStampMap() {
return memberTimeStampMap;
}
+=======
+ if (StringUtils.isNotEmpty(id)) {
+ memberTimeStampMap.put(id, event.getTimeStamp());
+ } else {
+ log.warn("NULL member id found in the event received.
Event rejected.");
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("Event received from [member-id] " + id + "
[time-stamp] " + event.getTimeStamp());
+ }
+ }
+
+ @Override
+ public Iterator<StreamEvent> iterator() {
+ return window.iterator();
+ }
+
+ @Override
+ public Iterator<StreamEvent> iterator(String predicate) {
+ if (siddhiContext.isDistributedProcessingEnabled()) {
+ return ((SchedulerSiddhiQueueGrid<StreamEvent>)
window).iterator(predicate);
+ } else {
+ return window.iterator();
+ }
+ }
+
+ /**
+ * Retrieve the current activated members from the topology and
initialize the time stamp map.
+ * This will allow the system to recover from a restart
+ *
+ * @param topology Topology model object
+ */
+ boolean loadTimeStampMapFromTopology(Topology topology) {
+
+ long currentTimeStamp = System.currentTimeMillis();
+ if (topology == null || topology.getServices() == null) {
+ return false;
+ }
+ // TODO make this efficient by adding APIs to messaging
component
+ for (Service service : topology.getServices()) {
+ if (service.getClusters() != null) {
+ for (Cluster cluster : service.getClusters()) {
+ if (cluster.getMembers() != null) {
+ for (Member member :
cluster.getMembers()) {
+ // we are checking
faulty status only in previously activated members
+ if (member != null &&
MemberStatus.Activated.equals(member.getStatus())) {
+ // Initialize
the member time stamp map from the topology at the beginning
+
memberTimeStampMap.putIfAbsent(member.getMemberId(), currentTimeStamp);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ log.info("Member time stamp map was successfully loaded from
the topology.");
+ if (log.isDebugEnabled()) {
+ log.debug("Member TimeStamp Map: " +
memberTimeStampMap);
+ }
+ return true;
+ }
+
+ private Member getMemberFromId(String memberId) {
+ if (StringUtils.isEmpty(memberId)) {
+ return null;
+ }
+ if (TopologyManager.getTopology().isInitialized()) {
+ try {
+ TopologyManager.acquireReadLock();
+ if (TopologyManager.getTopology().getServices()
== null) {
+ return null;
+ }
+ // TODO make this efficient by adding APIs to
messaging component
+ for (Service service :
TopologyManager.getTopology().getServices()) {
+ if (service.getClusters() != null) {
+ for (Cluster cluster :
service.getClusters()) {
+ if
(cluster.getMembers() != null) {
+ for (Member
member : cluster.getMembers()) {
+ if
(memberId.equals(member.getMemberId())) {
+
return member;
+ }
+ }
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.error("Error while reading topology" + e);
+ } finally {
+ TopologyManager.releaseReadLock();
+ }
+ }
+ return null;
+ }
+
+ private void publishMemberFault(String memberId) {
+ Member member = getMemberFromId(memberId);
+ if (member == null) {
+ log.error("Failed to publish member fault event. Member
having [member-id] " + memberId +
+ " does not exist in topology");
+ return;
+ }
+ log.info("Publishing member fault event for [member-id] " +
memberId);
+
+ MemberFaultEvent memberFaultEvent =
+ new MemberFaultEvent(member.getClusterId(),
member.getInstanceId(), member.getMemberId(),
+ member.getPartitionId(),
0);
+
+ memberFaultEventMessageMap.put("message", memberFaultEvent);
+ healthStatPublisher.publish(MemberFaultEventMap, true);
+ }
+
+ @Override
+ public void run() {
+ try {
+ threadBarrier.pass();
+
+ for (Object o : memberTimeStampMap.entrySet()) {
+ Map.Entry pair = (Map.Entry) o;
+ long currentTime = System.currentTimeMillis();
+ Long eventTimeStamp = (Long) pair.getValue();
+
+ if ((currentTime - eventTimeStamp) > TIME_OUT) {
+ log.info("Faulty member detected
[member-id] " + pair.getKey() + " with [last time-stamp] " +
+ eventTimeStamp + " [time-out]
" + TIME_OUT + " milliseconds");
+ publishMemberFault((String)
pair.getKey());
+ }
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("Fault handling processor iteration
completed with [time-stamp map length] " +
+ memberTimeStampMap.size() + "
[time-stamp map] " + memberTimeStampMap);
+ }
+ } catch (Throwable t) {
+ log.error(t.getMessage(), t);
+ } finally {
+ faultHandleScheduler.schedule(this, timeToKeep,
TimeUnit.MILLISECONDS);
+ }
+ }
+
+ @Override
+ protected Object[] currentState() {
+ return new Object[] { window.currentState() };
+ }
+
+ @Override
+ protected void restoreState(Object[] data) {
+ window.restoreState(data);
+ window.restoreState((Object[]) data[0]);
+ window.reSchedule();
+ }
+
+ @Override
+ protected void init(Expression[] parameters, QueryPostProcessingElement
nextProcessor,
+ AbstractDefinition streamDefinition, String
elementId, boolean async,
+ SiddhiContext siddhiContext) {
+
+ if (parameters[0] instanceof IntConstant) {
+ timeToKeep = ((IntConstant) parameters[0]).getValue();
+ } else {
+ timeToKeep = ((LongConstant) parameters[0]).getValue();
+ }
+
+ String memberIdAttrName = ((Variable)
parameters[1]).getAttributeName();
+ memberIdAttrIndex =
streamDefinition.getAttributePosition(memberIdAttrName);
+
+ if (this.siddhiContext.isDistributedProcessingEnabled()) {
+ window = new
SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext,
this.async);
+ } else {
+ window = new SchedulerSiddhiQueue<StreamEvent>(this);
+ }
+ MemberFaultEventMap
+
.put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent",
memberFaultEventMessageMap);
+
+ ExecutorService executorService =
StratosThreadPool.getExecutorService(IDENTIFIER, 10);
+ cepTopologyEventReceiver.setExecutorService(executorService);
+ executorService.execute(cepTopologyEventReceiver);
+
+ //Ordinary scheduling
+ window.schedule();
+ if (log.isDebugEnabled()) {
+ log.debug("Fault handling window processor initialized
with [timeToKeep] " + timeToKeep +
+ ", [memberIdAttrName] " + memberIdAttrName +
", [memberIdAttrIndex] " + memberIdAttrIndex +
+ ", [distributed-enabled] " +
this.siddhiContext.isDistributedProcessingEnabled());
+ }
+ }
+
+ @Override
+ public void schedule() {
+ faultHandleScheduler.schedule(this, timeToKeep,
TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void scheduleNow() {
+ faultHandleScheduler.schedule(this, 0, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void setScheduledExecutorService(ScheduledExecutorService
scheduledExecutorService) {
+ this.faultHandleScheduler = scheduledExecutorService;
+ }
+
+ @Override
+ public void setThreadBarrier(ThreadBarrier threadBarrier) {
+ this.threadBarrier = threadBarrier;
+ }
+
+ @Override
+ public void destroy() {
+ // terminate topology listener thread
+ cepTopologyEventReceiver.terminate();
+ window = null;
+ }
+
+ public ConcurrentHashMap<String, Long> getMemberTimeStampMap() {
+ return memberTimeStampMap;
+ }
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/8012f8c8/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
----------------------------------------------------------------------
diff --git
a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
index 013aee9..7996672 100644
---
a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
+++
b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
@@ -31,32 +31,34 @@ import java.util.concurrent.ExecutorService;
* HAProxy extension main class.
*/
public class Main {
- private static final Log log = LogFactory.getLog(Main.class);
+ private static final Log log = LogFactory.getLog(Main.class);
private static ExecutorService executorService;
public static void main(String[] args) {
- LoadBalancerExtension extension = null;
- try {
- // Configure log4j properties
-
PropertyConfigurator.configure(System.getProperty("log4j.properties.file.path"));
+ LoadBalancerExtension extension = null;
+ try {
+ // Configure log4j properties
+
PropertyConfigurator.configure(System.getProperty("log4j.properties.file.path"));
- if (log.isInfoEnabled()) {
- log.info("HAProxy extension started");
- }
- executorService =
StratosThreadPool.getExecutorService("Load_Balance_Extension", 10);
- // Validate runtime parameters
- HAProxyContext.getInstance().validate();
- extension = new LoadBalancerExtension(new HAProxy(),
(HAProxyContext.getInstance().isCEPStatsPublisherEnabled() ? new
HAProxyStatisticsReader() : null));
- Thread thread = new Thread(extension);
- thread.start();
- } catch (Exception e) {
- if (log.isErrorEnabled()) {
- log.error(e);
- }
- if (extension != null) {
- extension.terminate();
- }
- }
- }
+ if (log.isInfoEnabled()) {
+ log.info("HAProxy extension started");
+ }
+ executorService =
StratosThreadPool.getExecutorService("Load_Balance_Extension", 10);
+ // Validate runtime parameters
+ HAProxyContext.getInstance().validate();
+ extension = new LoadBalancerExtension(new HAProxy(),
+
(HAProxyContext.getInstance().isCEPStatsPublisherEnabled() ?
+ new
HAProxyStatisticsReader() : null));
+ Thread thread = new Thread(extension);
+ thread.start();
+ } catch (Exception e) {
+ if (log.isErrorEnabled()) {
+ log.error(e);
+ }
+ if (extension != null) {
+ extension.terminate();
+ }
+ }
+ }
}