Updated Branches:
  refs/heads/master b4de7ba76 -> 97a2b8ee0

Refactored task scheduler in cloud controller


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/8d1c338b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/8d1c338b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/8d1c338b

Branch: refs/heads/master
Commit: 8d1c338b547ce032ff538c1d027e36df74a9af3e
Parents: f252644
Author: Imesh Gunaratne <[email protected]>
Authored: Wed Dec 25 16:34:43 2013 +0530
Committer: Imesh Gunaratne <[email protected]>
Committed: Wed Dec 25 16:34:43 2013 +0530

----------------------------------------------------------------------
 .../impl/CloudControllerServiceImpl.java        | 122 ----------------
 .../internal/CloudControllerDSComponent.java    |   9 +-
 .../controller/internal/TaskScheduler.java      | 145 +++++++++++++++++++
 3 files changed, 151 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/8d1c338b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java
index 7fd2573..6e91035 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java
@@ -64,130 +64,8 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
                        .getInstance();
 
        public CloudControllerServiceImpl() {
-
                // acquire serialized data from registry
                acquireData();
-               
-               // gets the task service
-               TaskService taskService = ServiceReferenceHolder
-                               .getInstance().getTaskService();
-
-               if (dataHolder.getEnableBAMDataPublisher()) {
-
-                       // register and schedule, BAM data publisher task
-                       registerAndScheduleDataPublisherTask(taskService);
-               }
-
-               if (dataHolder.getEnableTopologySync()) {
-
-                       // start the topology builder thread
-                       startTopologyBuilder();
-
-                       // register and schedule, topology synchronizer task
-                       registerAndScheduleTopologySyncerTask(taskService);
-               }
-       }
-
-       private void registerAndScheduleTopologySyncerTask(TaskService 
taskService) {
-        TaskInfo taskInfo;
-               TaskManager tm = null;
-
-        try {
-            if(log.isDebugEnabled()) {
-                log.debug("Scheduling topology synchronization task");
-            }
-
-                       if 
(!taskService.getRegisteredTaskTypes().contains(CloudControllerConstants.TOPOLOGY_SYNC_TASK_TYPE))
 {
-                               
taskService.registerTaskType(CloudControllerConstants.TOPOLOGY_SYNC_TASK_TYPE);
-                               tm = 
taskService.getTaskManager(CloudControllerConstants.TOPOLOGY_SYNC_TASK_TYPE);
-                               String cron = 
dataHolder.getTopologyConfig().getProperty(CloudControllerConstants.CRON_ELEMENT);
-                               cron = ( cron == null ? 
CloudControllerConstants.PUB_CRON_EXPRESSION : cron ); 
-                               if(log.isDebugEnabled()) {
-                    log.debug(String.format("Topology synchronization task 
cron: %s", cron));
-                }
-                               TriggerInfo triggerInfo = new TriggerInfo(cron);
-                               taskInfo = new TaskInfo(
-                                               
CloudControllerConstants.TOPOLOGY_SYNC_TASK_NAME,
-                                               
TopologySynchronizerTask.class.getName(),
-                                               new HashMap<String, String>(), 
triggerInfo);
-                               tm.registerTask(taskInfo);
-                if(log.isDebugEnabled()) {
-                    log.debug("Topology synchronization task registered");
-                }
-                       }
-            else {
-                if(log.isWarnEnabled()) {
-                    log.warn("Topology synchronization task already exists");
-                }
-            }
-
-               } catch (Exception e) {
-                       String msg = "Error scheduling task: " + 
CloudControllerConstants.TOPOLOGY_SYNC_TASK_NAME;
-                       log.error(msg);
-                       if (tm != null) {
-                               try {
-                                       
tm.deleteTask(CloudControllerConstants.TOPOLOGY_SYNC_TASK_NAME);
-                               } catch (TaskException e1) {
-                                       log.error(e1);
-                               }
-                       }
-                       throw new CloudControllerException(msg, e);
-               }
-       }
-
-       private void startTopologyBuilder() {
-               // initialize TopologyEventMessageProcessor Consumer
-               Thread delegatorThread = new Thread(new 
InstanceStatusEventMessageDelegator());
-               // start consumer
-               delegatorThread.start();
-       }
-
-       private TaskManager registerAndScheduleDataPublisherTask(
-                       TaskService taskService) {
-               TaskInfo taskInfo;
-               TaskManager tm = null;
-               // initialize and schedule the data publisher task
-               try {
-
-                       if (!taskService.getRegisteredTaskTypes().contains(
-                                       
CloudControllerConstants.DATA_PUB_TASK_TYPE)) {
-
-                               taskService
-                                               
.registerTaskType(CloudControllerConstants.DATA_PUB_TASK_TYPE);
-
-                               tm = taskService
-                                               
.getTaskManager(CloudControllerConstants.DATA_PUB_TASK_TYPE);
-
-                               if 
(!tm.isTaskScheduled(CloudControllerConstants.DATA_PUB_TASK_NAME)) {
-
-                                       TriggerInfo triggerInfo = new 
TriggerInfo(
-                                                       
dataHolder.getDataPubConfig().getDataPublisherCron());
-                                       taskInfo = new TaskInfo(
-                                                       
CloudControllerConstants.DATA_PUB_TASK_NAME,
-                                                       
CartridgeInstanceDataPublisherTask.class.getName(),
-                                                       new HashMap<String, 
String>(), triggerInfo);
-                                       tm.registerTask(taskInfo);
-
-                                       // Following code is currently not 
required, due to an issue
-                                       // in TS API.
-                                       // tm.scheduleTask(taskInfo.getName());
-                               }
-                       }
-
-               } catch (Exception e) {
-                       String msg = "Error scheduling task: "
-                                       + 
CloudControllerConstants.DATA_PUB_TASK_NAME;
-                       log.error(msg, e);
-                       if (tm != null) {
-                               try {
-                                       
tm.deleteTask(CloudControllerConstants.DATA_PUB_TASK_NAME);
-                               } catch (TaskException e1) {
-                                       log.error(e1);
-                               }
-                       }
-                       throw new CloudControllerException(msg, e);
-               }
-               return tm;
        }
 
        private void acquireData() {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/8d1c338b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java
index c3fe066..466e569 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java
@@ -108,9 +108,12 @@ public class CloudControllerDSComponent {
                
                // initialize the topic publishers
             BundleContext bundleContext = context.getBundleContext();
-            
bundleContext.registerService(CloudControllerService.class.getName(),
-                                          new CloudControllerServiceImpl(), 
null);
-            
+            
bundleContext.registerService(CloudControllerService.class.getName(), new 
CloudControllerServiceImpl(), null);
+
+            if(log.isInfoEnabled()) {
+                log.info("Scheduling tasks");
+            }
+            TaskScheduler.schedule();
 
             log.debug("******* Cloud Controller Service bundle is activated 
******* ");
         } catch (Throwable e) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/8d1c338b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/TaskScheduler.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/TaskScheduler.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/TaskScheduler.java
new file mode 100644
index 0000000..f7c7be2
--- /dev/null
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/TaskScheduler.java
@@ -0,0 +1,145 @@
+package org.apache.stratos.cloud.controller.internal;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cloud.controller.exception.CloudControllerException;
+import 
org.apache.stratos.cloud.controller.publisher.CartridgeInstanceDataPublisherTask;
+import org.apache.stratos.cloud.controller.runtime.FasterLookUpDataHolder;
+import 
org.apache.stratos.cloud.controller.topic.instance.status.InstanceStatusEventMessageDelegator;
+import org.apache.stratos.cloud.controller.topology.TopologySynchronizerTask;
+import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
+import org.apache.stratos.cloud.controller.util.ServiceReferenceHolder;
+import org.wso2.carbon.ntask.common.TaskException;
+import org.wso2.carbon.ntask.core.TaskInfo;
+import org.wso2.carbon.ntask.core.TaskManager;
+import org.wso2.carbon.ntask.core.service.TaskService;
+
+import java.util.HashMap;
+
+/**
+ * Topology synchronizer task scheduler
+ */
+public class TaskScheduler {
+    private static final Log log = LogFactory.getLog(TaskScheduler.class);
+
+    public static void schedule() {
+        FasterLookUpDataHolder dataHolder = 
FasterLookUpDataHolder.getInstance();
+        TaskService taskService = 
ServiceReferenceHolder.getInstance().getTaskService();
+
+        if (dataHolder.getEnableBAMDataPublisher()) {
+            // register and schedule, BAM data publisher task
+            registerAndScheduleDataPublisherTask(taskService);
+        }
+
+        if (dataHolder.getEnableTopologySync()) {
+            // start the topology builder thread
+            startTopologyBuilder();
+
+            // register and schedule, topology synchronizer task
+            registerAndScheduleTopologySyncTask(taskService);
+        }
+    }
+
+    private static void registerAndScheduleTopologySyncTask(TaskService 
taskService) {
+        TaskInfo taskInfo;
+        TaskManager tm = null;
+
+        try {
+            if(log.isDebugEnabled()) {
+                log.debug("Scheduling topology synchronization task");
+            }
+
+            if 
(!taskService.getRegisteredTaskTypes().contains(CloudControllerConstants.TOPOLOGY_SYNC_TASK_TYPE))
 {
+                
taskService.registerTaskType(CloudControllerConstants.TOPOLOGY_SYNC_TASK_TYPE);
+                tm = 
taskService.getTaskManager(CloudControllerConstants.TOPOLOGY_SYNC_TASK_TYPE);
+                FasterLookUpDataHolder dataHolder = 
FasterLookUpDataHolder.getInstance();
+                String cron = 
dataHolder.getTopologyConfig().getProperty(CloudControllerConstants.CRON_ELEMENT);
+                cron = ( cron == null ? 
CloudControllerConstants.PUB_CRON_EXPRESSION : cron );
+                if(log.isDebugEnabled()) {
+                    log.debug(String.format("Topology synchronization task 
cron: %s", cron));
+                }
+                TaskInfo.TriggerInfo triggerInfo = new 
TaskInfo.TriggerInfo(cron);
+                taskInfo = new TaskInfo(
+                        CloudControllerConstants.TOPOLOGY_SYNC_TASK_NAME,
+                        TopologySynchronizerTask.class.getName(),
+                        new HashMap<String, String>(), triggerInfo);
+                tm.registerTask(taskInfo);
+                if(log.isDebugEnabled()) {
+                    log.debug("Topology synchronization task registered");
+                }
+            }
+            else {
+                if(log.isWarnEnabled()) {
+                    log.warn("Topology synchronization task already exists");
+                }
+            }
+
+        } catch (Exception e) {
+            String msg = "Error scheduling task: " + 
CloudControllerConstants.TOPOLOGY_SYNC_TASK_NAME;
+            log.error(msg);
+            if (tm != null) {
+                try {
+                    
tm.deleteTask(CloudControllerConstants.TOPOLOGY_SYNC_TASK_NAME);
+                } catch (TaskException e1) {
+                    log.error(e1);
+                }
+            }
+            throw new CloudControllerException(msg, e);
+        }
+    }
+
+    private static void startTopologyBuilder() {
+        // initialize TopologyEventMessageProcessor Consumer
+        Thread delegatorThread = new Thread(new 
InstanceStatusEventMessageDelegator());
+        // start consumer
+        delegatorThread.start();
+    }
+
+    private static TaskManager registerAndScheduleDataPublisherTask(
+            TaskService taskService) {
+        TaskInfo taskInfo;
+        TaskManager tm = null;
+        // initialize and schedule the data publisher task
+        try {
+
+            if (!taskService.getRegisteredTaskTypes().contains(
+                    CloudControllerConstants.DATA_PUB_TASK_TYPE)) {
+
+                taskService
+                        
.registerTaskType(CloudControllerConstants.DATA_PUB_TASK_TYPE);
+
+                tm = taskService
+                        
.getTaskManager(CloudControllerConstants.DATA_PUB_TASK_TYPE);
+
+                if 
(!tm.isTaskScheduled(CloudControllerConstants.DATA_PUB_TASK_NAME)) {
+                    FasterLookUpDataHolder dataHolder = 
FasterLookUpDataHolder.getInstance();
+                    TaskInfo.TriggerInfo triggerInfo = new 
TaskInfo.TriggerInfo(
+                            
dataHolder.getDataPubConfig().getDataPublisherCron());
+                    taskInfo = new TaskInfo(
+                            CloudControllerConstants.DATA_PUB_TASK_NAME,
+                            CartridgeInstanceDataPublisherTask.class.getName(),
+                            new HashMap<String, String>(), triggerInfo);
+                    tm.registerTask(taskInfo);
+
+                    // Following code is currently not required, due to an 
issue
+                    // in TS API.
+                    // tm.scheduleTask(taskInfo.getName());
+                }
+            }
+
+        } catch (Exception e) {
+            String msg = "Error scheduling task: "
+                    + CloudControllerConstants.DATA_PUB_TASK_NAME;
+            log.error(msg, e);
+            if (tm != null) {
+                try {
+                    tm.deleteTask(CloudControllerConstants.DATA_PUB_TASK_NAME);
+                } catch (TaskException e1) {
+                    log.error(e1);
+                }
+            }
+            throw new CloudControllerException(msg, e);
+        }
+        return tm;
+    }
+}

Reply via email to