Repository: stratos
Updated Branches:
  refs/heads/4.0.0-grouping 572cbe329 -> 14f5fe030


support application monitor creation based on complete topology event


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/14f5fe03
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/14f5fe03
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/14f5fe03

Branch: refs/heads/4.0.0-grouping
Commit: 14f5fe030ae1b5423af5e178d66be4b27ca01ac4
Parents: 572cbe3
Author: reka <[email protected]>
Authored: Mon Nov 3 11:26:21 2014 +0530
Committer: reka <[email protected]>
Committed: Mon Nov 3 11:26:21 2014 +0530

----------------------------------------------------------------------
 .../ApplicationSynchronizeTask.java             |   4 +-
 .../applications/topic/ApplicationBuilder.java  | 149 +++++++++++++++++++
 .../internal/AutoscalerServerComponent.java     |   6 +-
 .../AutoscalerTopologyEventReceiver.java        |  62 +++++++-
 4 files changed, 210 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/14f5fe03/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizeTask.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizeTask.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizeTask.java
index 9f0ca8c..22a2805 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizeTask.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizeTask.java
@@ -21,6 +21,7 @@ package org.apache.stratos.autoscaler.applications;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.applications.topic.ApplicationBuilder;
 import 
org.apache.stratos.autoscaler.applications.topic.ApplicationsEventPublisher;
 import org.wso2.carbon.ntask.core.Task;
 
@@ -36,7 +37,8 @@ public class ApplicationSynchronizeTask implements Task {
         }
         // publish to the topic
         if (ApplicationHolder.getApplications() != null) {
-            
ApplicationsEventPublisher.sendCompleteApplicationsEvent(ApplicationHolder.getApplications());
+            
//ApplicationsEventPublisher.sendCompleteApplicationsEvent(ApplicationHolder.getApplications());
+            
ApplicationBuilder.handleCompleteApplication(ApplicationHolder.getApplications());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/14f5fe03/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java
index 6adf3b1..bf0ad01 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationBuilder.java
@@ -20,9 +20,16 @@ package org.apache.stratos.autoscaler.applications.topic;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.AutoscalerContext;
 import org.apache.stratos.autoscaler.applications.ApplicationHolder;
 import 
org.apache.stratos.autoscaler.applications.pojo.ApplicationClusterContext;
+import org.apache.stratos.autoscaler.exception.DependencyBuilderException;
+import org.apache.stratos.autoscaler.exception.TopologyInConsistentException;
+import org.apache.stratos.autoscaler.monitor.ApplicationMonitorFactory;
+import org.apache.stratos.autoscaler.monitor.application.ApplicationMonitor;
 import org.apache.stratos.messaging.domain.applications.*;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 
 import java.util.Collection;
 import java.util.Set;
@@ -143,6 +150,17 @@ public class ApplicationBuilder {
 
         
ApplicationsEventPublisher.sendApplicationUndeployedEvent(applicationId, 
clusterData);
     }*/
+
+    public static synchronized void handleCompleteApplication (Applications 
applications) {
+        log.info("Handling complete application");
+        ApplicationHolder.acquireReadLock();
+        try {
+            
ApplicationsEventPublisher.sendCompleteApplicationsEvent(applications);
+        } finally {
+            ApplicationHolder.releaseReadLock();
+        }
+    }
+
     public static synchronized void handleApplicationCreated (Application 
application,
                                                               
Set<ApplicationClusterContext> appClusterContexts) {
 
@@ -153,6 +171,7 @@ public class ApplicationBuilder {
         try {
             if (applications.getApplication(application.getUniqueIdentifier()) 
!= null) {
                 ApplicationHolder.persistApplication(application);
+               // startApplicationMonitor(application.getUniqueIdentifier());
             } else {
                 log.warn("Application [ " + application.getUniqueIdentifier() 
+ " ] already exists in Applications");
             }
@@ -512,4 +531,134 @@ public class ApplicationBuilder {
             ApplicationHolder.releaseWriteLock();
         }
     }
+
+
+    protected static synchronized void startApplicationMonitor(String appId) {
+
+        ApplicationMonitor applicationMonitor = null;
+        int retries = 5;
+        boolean success = false;
+        do {
+            try {
+                Thread.sleep(5000);
+            } catch (InterruptedException e1) {
+            }
+            try {
+                long start = System.currentTimeMillis();
+                if (log.isDebugEnabled()) {
+                    log.debug("application monitor is going to be started for 
[application] " +
+                            appId);
+                }
+                applicationMonitor = 
ApplicationMonitorFactory.getApplicationMonitor(appId);
+
+                long end = System.currentTimeMillis();
+                log.info("Time taken to start app monitor: " + (end - start) / 
1000);
+                success = true;
+            } catch (DependencyBuilderException e) {
+                String msg = "Application monitor creation failed for 
Application: ";
+                log.warn(msg, e);
+                retries--;
+            } catch (TopologyInConsistentException e) {
+                String msg = "Application monitor creation failed for 
Application: ";
+                log.warn(msg, e);
+                retries--;
+            }
+        } while (!success && retries != 0);
+
+        if (applicationMonitor == null) {
+            String msg = "Application monitor creation failed, even after 
retrying for 5 times, "
+                    + "for Application: " + appId;
+            log.error(msg);
+            throw new RuntimeException(msg);
+        }
+
+        AutoscalerContext.getInstance().addAppMonitor(applicationMonitor);
+
+        if (log.isInfoEnabled()) {
+            log.info(String.format("Application monitor has been added 
successfully: " +
+                    "[application] %s", applicationMonitor.getId()));
+        }
+    }
+
+
+        /*Thread th = null;
+        if (!AutoscalerContext.getInstance()
+                .appMonitorExist(applicationId)) {
+            th = new Thread(
+                    new ApplicationMonitorAdder(applicationId));
+        }
+
+        if (th != null) {
+            th.start();
+            //    try {
+            //        th.join();
+            //    } catch (InterruptedException ignore) {
+
+            if (log.isDebugEnabled()) {
+                log.debug(String
+                        .format("Application monitor thread has been started 
successfully: " +
+                                "[application] %s ", applicationId));
+            }
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug(String
+                        .format("Application monitor thread already exists: " +
+                                "[application] %s ", applicationId));
+            }
+        }*/
+
+
+    private class ApplicationMonitorAdder implements Runnable {
+        private String appId;
+
+        public ApplicationMonitorAdder(String appId) {
+            this.appId = appId;
+        }
+
+        public void run() {
+            ApplicationMonitor applicationMonitor = null;
+            int retries = 5;
+            boolean success = false;
+            do {
+                try {
+                    Thread.sleep(5000);
+                } catch (InterruptedException e1) {
+                }
+                try {
+                    long start = System.currentTimeMillis();
+                    if (log.isDebugEnabled()) {
+                        log.debug("application monitor is going to be started 
for [application] " +
+                                appId);
+                    }
+                    applicationMonitor = 
ApplicationMonitorFactory.getApplicationMonitor(appId);
+
+                    long end = System.currentTimeMillis();
+                    log.info("Time taken to start app monitor: " + (end - 
start) / 1000);
+                    success = true;
+                } catch (DependencyBuilderException e) {
+                    String msg = "Application monitor creation failed for 
Application: ";
+                    log.warn(msg, e);
+                    retries--;
+                } catch (TopologyInConsistentException e) {
+                    String msg = "Application monitor creation failed for 
Application: ";
+                    log.warn(msg, e);
+                    retries--;
+                }
+            } while (!success && retries != 0);
+
+            if (applicationMonitor == null) {
+                String msg = "Application monitor creation failed, even after 
retrying for 5 times, "
+                        + "for Application: " + appId;
+                log.error(msg);
+                throw new RuntimeException(msg);
+            }
+
+            AutoscalerContext.getInstance().addAppMonitor(applicationMonitor);
+
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Application monitor has been added 
successfully: " +
+                        "[application] %s", applicationMonitor.getId()));
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/14f5fe03/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
index 5ada0b7..a946977 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
@@ -43,14 +43,14 @@ import java.util.List;
 /**
  * @scr.component 
name=org.apache.stratos.autoscaler.internal.AutoscalerServerComponent"
  * immediate="true"
- * @scr.reference name="ntask.component" 
interface="org.wso2.carbon.ntask.core.service.TaskService"
- * cardinality="1..1" policy="dynamic" bind="setTaskService"
- * unbind="unsetTaskService"
  * @scr.reference name="registry.service"
  * interface=
  * "org.wso2.carbon.registry.core.service.RegistryService"
  * cardinality="1..1" policy="dynamic" bind="setRegistryService"
  * unbind="unsetRegistryService"
+ * @scr.reference name="ntask.component" 
interface="org.wso2.carbon.ntask.core.service.TaskService"
+ * cardinality="1..1" policy="dynamic" bind="setTaskService"
+ * unbind="unsetTaskService"
  */
 
 public class AutoscalerServerComponent {

http://git-wip-us.apache.org/repos/asf/stratos/blob/14f5fe03/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
index dcf3a82..72dc6e5 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -22,6 +22,7 @@ package 
org.apache.stratos.autoscaler.message.receiver.topology;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.autoscaler.*;
+import org.apache.stratos.autoscaler.applications.ApplicationHolder;
 import 
org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
 import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
 import org.apache.stratos.autoscaler.exception.DependencyBuilderException;
@@ -39,6 +40,7 @@ import org.apache.stratos.autoscaler.policy.PolicyManager;
 import org.apache.stratos.autoscaler.status.checker.StatusChecker;
 import org.apache.stratos.messaging.domain.applications.Application;
 import org.apache.stratos.messaging.domain.applications.ApplicationStatus;
+import org.apache.stratos.messaging.domain.applications.Applications;
 import org.apache.stratos.messaging.domain.applications.ClusterDataHolder;
 import org.apache.stratos.messaging.domain.topology.*;
 import org.apache.stratos.messaging.event.Event;
@@ -93,6 +95,46 @@ public class AutoscalerTopologyEventReceiver implements 
Runnable {
         }
     }
 
+    private boolean allClustersInitialized(Application application) {
+        boolean allClustersInitialized = false;
+        for(ClusterDataHolder holder : 
application.getClusterDataMap().values()) {
+            TopologyManager.acquireReadLockForCluster(holder.getServiceType(),
+                    holder.getClusterId());
+
+            try {
+                Topology topology = TopologyManager.getTopology();
+                if(topology != null) {
+                    Service service = 
topology.getService(holder.getServiceType());
+                    if(service != null) {
+                        if(service.clusterExists(holder.getClusterId())) {
+                            allClustersInitialized = true;
+                        } else {
+                            if(log.isDebugEnabled()) {
+                                log.debug("[Cluster] " + holder.getClusterId() 
+ " is not found in " +
+                                        "the Topology");
+                            }
+                            allClustersInitialized = false;
+                            return allClustersInitialized;
+                        }
+                    } else {
+                        if(log.isDebugEnabled()) {
+                            log.debug("Service is null in the 
CompleteTopologyEvent");
+                        }
+                    }
+                } else {
+                    if(log.isDebugEnabled()) {
+                        log.debug("Topology is null in the 
CompleteTopologyEvent");
+                    }
+                }
+            } finally {
+                
TopologyManager.releaseReadLockForCluster(holder.getServiceType(),
+                        holder.getClusterId());
+            }
+        }
+        return allClustersInitialized;
+    }
+
+
     private void addEventListeners() {
         // Listen to topology events that affect clusters
         topologyEventReceiver.addEventListener(new 
CompleteTopologyEventListener() {
@@ -101,18 +143,24 @@ public class AutoscalerTopologyEventReceiver implements 
Runnable {
 
                 if (!topologyInitialized) {
                     log.info("[CompleteTopologyEvent] Received: " + 
event.getClass());
-
-                    TopologyManager.acquireReadLock();
+                    ApplicationHolder.acquireReadLock();
                     try {
-                        for (Application application : 
ApplicationManager.getApplications().getApplications().values()) {
-                            
startApplicationMonitor(application.getUniqueIdentifier());
+                        Applications applications = 
ApplicationHolder.getApplications();
+                        if(applications != null) {
+                            for (Application application : 
applications.getApplications().values()) {
+                                if(allClustersInitialized(application)) {
+                                    
startApplicationMonitor(application.getUniqueIdentifier());
+                                } else {
+                                    log.error("Complete Topology is not 
consistent with the applications " +
+                                            "which got persisted");
+                                }
+                            }
+                            topologyInitialized = true;
                         }
-
-                        topologyInitialized = true;
                     } catch (Exception e) {
                         log.error("Error processing event", e);
                     } finally {
-                        TopologyManager.releaseReadLock();
+                        ApplicationHolder.releaseReadLock();
                     }
                 }
             }

Reply via email to