Repository: stratos Updated Branches: refs/heads/4.0.0-grouping 572cbe329 -> 14f5fe030
support application monitor creation based on complete topology event Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/14f5fe03 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/14f5fe03 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/14f5fe03 Branch: refs/heads/4.0.0-grouping Commit: 14f5fe030ae1b5423af5e178d66be4b27ca01ac4 Parents: 572cbe3 Author: reka <[email protected]> Authored: Mon Nov 3 11:26:21 2014 +0530 Committer: reka <[email protected]> Committed: Mon Nov 3 11:26:21 2014 +0530 ---------------------------------------------------------------------- .../ApplicationSynchronizeTask.java | 4 +- .../applications/topic/ApplicationBuilder.java | 149 +++++++++++++++++++ .../internal/AutoscalerServerComponent.java | 6 +- .../AutoscalerTopologyEventReceiver.java | 62 +++++++- 4 files changed, 210 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/14f5fe03/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizeTask.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizeTask.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizeTask.java index 9f0ca8c..22a2805 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizeTask.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizeTask.java @@ -21,6 +21,7 @@ package org.apache.stratos.autoscaler.applications; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.autoscaler.applications.topic.ApplicationBuilder; import org.apache.stratos.autoscaler.applications.topic.ApplicationsEventPublisher; import org.wso2.carbon.ntask.core.Task; @@ -36,7 +37,8 @@ public class ApplicationSynchronizeTask implements Task { } // publish to the topic if (ApplicationHolder.getApplications() != null) { - ApplicationsEventPublisher.sendCompleteApplicationsEvent(ApplicationHolder.getApplications()); + //ApplicationsEventPublisher.sendCompleteApplicationsEvent(ApplicationHolder.getApplications()); + ApplicationBuilder.handleCompleteApplication(ApplicationHolder.getApplications()); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/14f5fe03/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java index 6adf3b1..bf0ad01 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java @@ -20,9 +20,16 @@ package org.apache.stratos.autoscaler.applications.topic; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.autoscaler.AutoscalerContext; import org.apache.stratos.autoscaler.applications.ApplicationHolder; import org.apache.stratos.autoscaler.applications.pojo.ApplicationClusterContext; +import org.apache.stratos.autoscaler.exception.DependencyBuilderException; +import org.apache.stratos.autoscaler.exception.TopologyInConsistentException; +import org.apache.stratos.autoscaler.monitor.ApplicationMonitorFactory; +import org.apache.stratos.autoscaler.monitor.application.ApplicationMonitor; import org.apache.stratos.messaging.domain.applications.*; +import org.apache.stratos.messaging.domain.topology.Topology; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; import java.util.Collection; import java.util.Set; @@ -143,6 +150,17 @@ public class ApplicationBuilder { ApplicationsEventPublisher.sendApplicationUndeployedEvent(applicationId, clusterData); }*/ + + public static synchronized void handleCompleteApplication (Applications applications) { + log.info("Handling complete application"); + ApplicationHolder.acquireReadLock(); + try { + ApplicationsEventPublisher.sendCompleteApplicationsEvent(applications); + } finally { + ApplicationHolder.releaseReadLock(); + } + } + public static synchronized void handleApplicationCreated (Application application, Set<ApplicationClusterContext> appClusterContexts) { @@ -153,6 +171,7 @@ public class ApplicationBuilder { try { if (applications.getApplication(application.getUniqueIdentifier()) != null) { ApplicationHolder.persistApplication(application); + // startApplicationMonitor(application.getUniqueIdentifier()); } else { log.warn("Application [ " + application.getUniqueIdentifier() + " ] already exists in Applications"); } @@ -512,4 +531,134 @@ public class ApplicationBuilder { ApplicationHolder.releaseWriteLock(); } } + + + protected static synchronized void startApplicationMonitor(String appId) { + + ApplicationMonitor applicationMonitor = null; + int retries = 5; + boolean success = false; + do { + try { + Thread.sleep(5000); + } catch (InterruptedException e1) { + } + try { + long start = System.currentTimeMillis(); + if (log.isDebugEnabled()) { + log.debug("application monitor is going to be started for [application] " + + appId); + } + applicationMonitor = ApplicationMonitorFactory.getApplicationMonitor(appId); + + long end = System.currentTimeMillis(); + log.info("Time taken to start app monitor: " + (end - start) / 1000); + success = true; + } catch (DependencyBuilderException e) { + String msg = "Application monitor creation failed for Application: "; + log.warn(msg, e); + retries--; + } catch (TopologyInConsistentException e) { + String msg = "Application monitor creation failed for Application: "; + log.warn(msg, e); + retries--; + } + } while (!success && retries != 0); + + if (applicationMonitor == null) { + String msg = "Application monitor creation failed, even after retrying for 5 times, " + + "for Application: " + appId; + log.error(msg); + throw new RuntimeException(msg); + } + + AutoscalerContext.getInstance().addAppMonitor(applicationMonitor); + + if (log.isInfoEnabled()) { + log.info(String.format("Application monitor has been added successfully: " + + "[application] %s", applicationMonitor.getId())); + } + } + + + /*Thread th = null; + if (!AutoscalerContext.getInstance() + .appMonitorExist(applicationId)) { + th = new Thread( + new ApplicationMonitorAdder(applicationId)); + } + + if (th != null) { + th.start(); + // try { + // th.join(); + // } catch (InterruptedException ignore) { + + if (log.isDebugEnabled()) { + log.debug(String + .format("Application monitor thread has been started successfully: " + + "[application] %s ", applicationId)); + } + } else { + if (log.isDebugEnabled()) { + log.debug(String + .format("Application monitor thread already exists: " + + "[application] %s ", applicationId)); + } + }*/ + + + private class ApplicationMonitorAdder implements Runnable { + private String appId; + + public ApplicationMonitorAdder(String appId) { + this.appId = appId; + } + + public void run() { + ApplicationMonitor applicationMonitor = null; + int retries = 5; + boolean success = false; + do { + try { + Thread.sleep(5000); + } catch (InterruptedException e1) { + } + try { + long start = System.currentTimeMillis(); + if (log.isDebugEnabled()) { + log.debug("application monitor is going to be started for [application] " + + appId); + } + applicationMonitor = ApplicationMonitorFactory.getApplicationMonitor(appId); + + long end = System.currentTimeMillis(); + log.info("Time taken to start app monitor: " + (end - start) / 1000); + success = true; + } catch (DependencyBuilderException e) { + String msg = "Application monitor creation failed for Application: "; + log.warn(msg, e); + retries--; + } catch (TopologyInConsistentException e) { + String msg = "Application monitor creation failed for Application: "; + log.warn(msg, e); + retries--; + } + } while (!success && retries != 0); + + if (applicationMonitor == null) { + String msg = "Application monitor creation failed, even after retrying for 5 times, " + + "for Application: " + appId; + log.error(msg); + throw new RuntimeException(msg); + } + + AutoscalerContext.getInstance().addAppMonitor(applicationMonitor); + + if (log.isInfoEnabled()) { + log.info(String.format("Application monitor has been added successfully: " + + "[application] %s", applicationMonitor.getId())); + } + } + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/14f5fe03/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java index 5ada0b7..a946977 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java @@ -43,14 +43,14 @@ import java.util.List; /** * @scr.component name=org.apache.stratos.autoscaler.internal.AutoscalerServerComponent" * immediate="true" - * @scr.reference name="ntask.component" interface="org.wso2.carbon.ntask.core.service.TaskService" - * cardinality="1..1" policy="dynamic" bind="setTaskService" - * unbind="unsetTaskService" * @scr.reference name="registry.service" * interface= * "org.wso2.carbon.registry.core.service.RegistryService" * cardinality="1..1" policy="dynamic" bind="setRegistryService" * unbind="unsetRegistryService" + * @scr.reference name="ntask.component" interface="org.wso2.carbon.ntask.core.service.TaskService" + * cardinality="1..1" policy="dynamic" bind="setTaskService" + * unbind="unsetTaskService" */ public class AutoscalerServerComponent { http://git-wip-us.apache.org/repos/asf/stratos/blob/14f5fe03/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java index dcf3a82..72dc6e5 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java @@ -22,6 +22,7 @@ package org.apache.stratos.autoscaler.message.receiver.topology; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.autoscaler.*; +import org.apache.stratos.autoscaler.applications.ApplicationHolder; import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient; import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy; import org.apache.stratos.autoscaler.exception.DependencyBuilderException; @@ -39,6 +40,7 @@ import org.apache.stratos.autoscaler.policy.PolicyManager; import org.apache.stratos.autoscaler.status.checker.StatusChecker; import org.apache.stratos.messaging.domain.applications.Application; import org.apache.stratos.messaging.domain.applications.ApplicationStatus; +import org.apache.stratos.messaging.domain.applications.Applications; import org.apache.stratos.messaging.domain.applications.ClusterDataHolder; import org.apache.stratos.messaging.domain.topology.*; import org.apache.stratos.messaging.event.Event; @@ -93,6 +95,46 @@ public class AutoscalerTopologyEventReceiver implements Runnable { } } + private boolean allClustersInitialized(Application application) { + boolean allClustersInitialized = false; + for(ClusterDataHolder holder : application.getClusterDataMap().values()) { + TopologyManager.acquireReadLockForCluster(holder.getServiceType(), + holder.getClusterId()); + + try { + Topology topology = TopologyManager.getTopology(); + if(topology != null) { + Service service = topology.getService(holder.getServiceType()); + if(service != null) { + if(service.clusterExists(holder.getClusterId())) { + allClustersInitialized = true; + } else { + if(log.isDebugEnabled()) { + log.debug("[Cluster] " + holder.getClusterId() + " is not found in " + + "the Topology"); + } + allClustersInitialized = false; + return allClustersInitialized; + } + } else { + if(log.isDebugEnabled()) { + log.debug("Service is null in the CompleteTopologyEvent"); + } + } + } else { + if(log.isDebugEnabled()) { + log.debug("Topology is null in the CompleteTopologyEvent"); + } + } + } finally { + TopologyManager.releaseReadLockForCluster(holder.getServiceType(), + holder.getClusterId()); + } + } + return allClustersInitialized; + } + + private void addEventListeners() { // Listen to topology events that affect clusters topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() { @@ -101,18 +143,24 @@ public class AutoscalerTopologyEventReceiver implements Runnable { if (!topologyInitialized) { log.info("[CompleteTopologyEvent] Received: " + event.getClass()); - - TopologyManager.acquireReadLock(); + ApplicationHolder.acquireReadLock(); try { - for (Application application : ApplicationManager.getApplications().getApplications().values()) { - startApplicationMonitor(application.getUniqueIdentifier()); + Applications applications = ApplicationHolder.getApplications(); + if(applications != null) { + for (Application application : applications.getApplications().values()) { + if(allClustersInitialized(application)) { + startApplicationMonitor(application.getUniqueIdentifier()); + } else { + log.error("Complete Topology is not consistent with the applications " + + "which got persisted"); + } + } + topologyInitialized = true; } - - topologyInitialized = true; } catch (Exception e) { log.error("Error processing event", e); } finally { - TopologyManager.releaseReadLock(); + ApplicationHolder.releaseReadLock(); } } }
