Updated Branches: refs/heads/master b4de7ba76 -> 97a2b8ee0
Refactored task scheduler in cloud controller Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/8d1c338b Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/8d1c338b Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/8d1c338b Branch: refs/heads/master Commit: 8d1c338b547ce032ff538c1d027e36df74a9af3e Parents: f252644 Author: Imesh Gunaratne <[email protected]> Authored: Wed Dec 25 16:34:43 2013 +0530 Committer: Imesh Gunaratne <[email protected]> Committed: Wed Dec 25 16:34:43 2013 +0530 ---------------------------------------------------------------------- .../impl/CloudControllerServiceImpl.java | 122 ---------------- .../internal/CloudControllerDSComponent.java | 9 +- .../controller/internal/TaskScheduler.java | 145 +++++++++++++++++++ 3 files changed, 151 insertions(+), 125 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/8d1c338b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java index 7fd2573..6e91035 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java @@ -64,130 +64,8 @@ public class CloudControllerServiceImpl implements CloudControllerService { .getInstance(); public CloudControllerServiceImpl() { - // acquire serialized data from registry acquireData(); - - // gets the task service - TaskService taskService = ServiceReferenceHolder - .getInstance().getTaskService(); - - if (dataHolder.getEnableBAMDataPublisher()) { - - // register and schedule, BAM data publisher task - registerAndScheduleDataPublisherTask(taskService); - } - - if (dataHolder.getEnableTopologySync()) { - - // start the topology builder thread - startTopologyBuilder(); - - // register and schedule, topology synchronizer task - registerAndScheduleTopologySyncerTask(taskService); - } - } - - private void registerAndScheduleTopologySyncerTask(TaskService taskService) { - TaskInfo taskInfo; - TaskManager tm = null; - - try { - if(log.isDebugEnabled()) { - log.debug("Scheduling topology synchronization task"); - } - - if (!taskService.getRegisteredTaskTypes().contains(CloudControllerConstants.TOPOLOGY_SYNC_TASK_TYPE)) { - taskService.registerTaskType(CloudControllerConstants.TOPOLOGY_SYNC_TASK_TYPE); - tm = taskService.getTaskManager(CloudControllerConstants.TOPOLOGY_SYNC_TASK_TYPE); - String cron = dataHolder.getTopologyConfig().getProperty(CloudControllerConstants.CRON_ELEMENT); - cron = ( cron == null ? CloudControllerConstants.PUB_CRON_EXPRESSION : cron ); - if(log.isDebugEnabled()) { - log.debug(String.format("Topology synchronization task cron: %s", cron)); - } - TriggerInfo triggerInfo = new TriggerInfo(cron); - taskInfo = new TaskInfo( - CloudControllerConstants.TOPOLOGY_SYNC_TASK_NAME, - TopologySynchronizerTask.class.getName(), - new HashMap<String, String>(), triggerInfo); - tm.registerTask(taskInfo); - if(log.isDebugEnabled()) { - log.debug("Topology synchronization task registered"); - } - } - else { - if(log.isWarnEnabled()) { - log.warn("Topology synchronization task already exists"); - } - } - - } catch (Exception e) { - String msg = "Error scheduling task: " + CloudControllerConstants.TOPOLOGY_SYNC_TASK_NAME; - log.error(msg); - if (tm != null) { - try { - tm.deleteTask(CloudControllerConstants.TOPOLOGY_SYNC_TASK_NAME); - } catch (TaskException e1) { - log.error(e1); - } - } - throw new CloudControllerException(msg, e); - } - } - - private void startTopologyBuilder() { - // initialize TopologyEventMessageProcessor Consumer - Thread delegatorThread = new Thread(new InstanceStatusEventMessageDelegator()); - // start consumer - delegatorThread.start(); - } - - private TaskManager registerAndScheduleDataPublisherTask( - TaskService taskService) { - TaskInfo taskInfo; - TaskManager tm = null; - // initialize and schedule the data publisher task - try { - - if (!taskService.getRegisteredTaskTypes().contains( - CloudControllerConstants.DATA_PUB_TASK_TYPE)) { - - taskService - .registerTaskType(CloudControllerConstants.DATA_PUB_TASK_TYPE); - - tm = taskService - .getTaskManager(CloudControllerConstants.DATA_PUB_TASK_TYPE); - - if (!tm.isTaskScheduled(CloudControllerConstants.DATA_PUB_TASK_NAME)) { - - TriggerInfo triggerInfo = new TriggerInfo( - dataHolder.getDataPubConfig().getDataPublisherCron()); - taskInfo = new TaskInfo( - CloudControllerConstants.DATA_PUB_TASK_NAME, - CartridgeInstanceDataPublisherTask.class.getName(), - new HashMap<String, String>(), triggerInfo); - tm.registerTask(taskInfo); - - // Following code is currently not required, due to an issue - // in TS API. - // tm.scheduleTask(taskInfo.getName()); - } - } - - } catch (Exception e) { - String msg = "Error scheduling task: " - + CloudControllerConstants.DATA_PUB_TASK_NAME; - log.error(msg, e); - if (tm != null) { - try { - tm.deleteTask(CloudControllerConstants.DATA_PUB_TASK_NAME); - } catch (TaskException e1) { - log.error(e1); - } - } - throw new CloudControllerException(msg, e); - } - return tm; } private void acquireData() { http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/8d1c338b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java index c3fe066..466e569 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java @@ -108,9 +108,12 @@ public class CloudControllerDSComponent { // initialize the topic publishers BundleContext bundleContext = context.getBundleContext(); - bundleContext.registerService(CloudControllerService.class.getName(), - new CloudControllerServiceImpl(), null); - + bundleContext.registerService(CloudControllerService.class.getName(), new CloudControllerServiceImpl(), null); + + if(log.isInfoEnabled()) { + log.info("Scheduling tasks"); + } + TaskScheduler.schedule(); log.debug("******* Cloud Controller Service bundle is activated ******* "); } catch (Throwable e) { http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/8d1c338b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/TaskScheduler.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/TaskScheduler.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/TaskScheduler.java new file mode 100644 index 0000000..f7c7be2 --- /dev/null +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/TaskScheduler.java @@ -0,0 +1,145 @@ +package org.apache.stratos.cloud.controller.internal; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.cloud.controller.exception.CloudControllerException; +import org.apache.stratos.cloud.controller.publisher.CartridgeInstanceDataPublisherTask; +import org.apache.stratos.cloud.controller.runtime.FasterLookUpDataHolder; +import org.apache.stratos.cloud.controller.topic.instance.status.InstanceStatusEventMessageDelegator; +import org.apache.stratos.cloud.controller.topology.TopologySynchronizerTask; +import org.apache.stratos.cloud.controller.util.CloudControllerConstants; +import org.apache.stratos.cloud.controller.util.ServiceReferenceHolder; +import org.wso2.carbon.ntask.common.TaskException; +import org.wso2.carbon.ntask.core.TaskInfo; +import org.wso2.carbon.ntask.core.TaskManager; +import org.wso2.carbon.ntask.core.service.TaskService; + +import java.util.HashMap; + +/** + * Topology synchronizer task scheduler + */ +public class TaskScheduler { + private static final Log log = LogFactory.getLog(TaskScheduler.class); + + public static void schedule() { + FasterLookUpDataHolder dataHolder = FasterLookUpDataHolder.getInstance(); + TaskService taskService = ServiceReferenceHolder.getInstance().getTaskService(); + + if (dataHolder.getEnableBAMDataPublisher()) { + // register and schedule, BAM data publisher task + registerAndScheduleDataPublisherTask(taskService); + } + + if (dataHolder.getEnableTopologySync()) { + // start the topology builder thread + startTopologyBuilder(); + + // register and schedule, topology synchronizer task + registerAndScheduleTopologySyncTask(taskService); + } + } + + private static void registerAndScheduleTopologySyncTask(TaskService taskService) { + TaskInfo taskInfo; + TaskManager tm = null; + + try { + if(log.isDebugEnabled()) { + log.debug("Scheduling topology synchronization task"); + } + + if (!taskService.getRegisteredTaskTypes().contains(CloudControllerConstants.TOPOLOGY_SYNC_TASK_TYPE)) { + taskService.registerTaskType(CloudControllerConstants.TOPOLOGY_SYNC_TASK_TYPE); + tm = taskService.getTaskManager(CloudControllerConstants.TOPOLOGY_SYNC_TASK_TYPE); + FasterLookUpDataHolder dataHolder = FasterLookUpDataHolder.getInstance(); + String cron = dataHolder.getTopologyConfig().getProperty(CloudControllerConstants.CRON_ELEMENT); + cron = ( cron == null ? CloudControllerConstants.PUB_CRON_EXPRESSION : cron ); + if(log.isDebugEnabled()) { + log.debug(String.format("Topology synchronization task cron: %s", cron)); + } + TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo(cron); + taskInfo = new TaskInfo( + CloudControllerConstants.TOPOLOGY_SYNC_TASK_NAME, + TopologySynchronizerTask.class.getName(), + new HashMap<String, String>(), triggerInfo); + tm.registerTask(taskInfo); + if(log.isDebugEnabled()) { + log.debug("Topology synchronization task registered"); + } + } + else { + if(log.isWarnEnabled()) { + log.warn("Topology synchronization task already exists"); + } + } + + } catch (Exception e) { + String msg = "Error scheduling task: " + CloudControllerConstants.TOPOLOGY_SYNC_TASK_NAME; + log.error(msg); + if (tm != null) { + try { + tm.deleteTask(CloudControllerConstants.TOPOLOGY_SYNC_TASK_NAME); + } catch (TaskException e1) { + log.error(e1); + } + } + throw new CloudControllerException(msg, e); + } + } + + private static void startTopologyBuilder() { + // initialize TopologyEventMessageProcessor Consumer + Thread delegatorThread = new Thread(new InstanceStatusEventMessageDelegator()); + // start consumer + delegatorThread.start(); + } + + private static TaskManager registerAndScheduleDataPublisherTask( + TaskService taskService) { + TaskInfo taskInfo; + TaskManager tm = null; + // initialize and schedule the data publisher task + try { + + if (!taskService.getRegisteredTaskTypes().contains( + CloudControllerConstants.DATA_PUB_TASK_TYPE)) { + + taskService + .registerTaskType(CloudControllerConstants.DATA_PUB_TASK_TYPE); + + tm = taskService + .getTaskManager(CloudControllerConstants.DATA_PUB_TASK_TYPE); + + if (!tm.isTaskScheduled(CloudControllerConstants.DATA_PUB_TASK_NAME)) { + FasterLookUpDataHolder dataHolder = FasterLookUpDataHolder.getInstance(); + TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo( + dataHolder.getDataPubConfig().getDataPublisherCron()); + taskInfo = new TaskInfo( + CloudControllerConstants.DATA_PUB_TASK_NAME, + CartridgeInstanceDataPublisherTask.class.getName(), + new HashMap<String, String>(), triggerInfo); + tm.registerTask(taskInfo); + + // Following code is currently not required, due to an issue + // in TS API. + // tm.scheduleTask(taskInfo.getName()); + } + } + + } catch (Exception e) { + String msg = "Error scheduling task: " + + CloudControllerConstants.DATA_PUB_TASK_NAME; + log.error(msg, e); + if (tm != null) { + try { + tm.deleteTask(CloudControllerConstants.DATA_PUB_TASK_NAME); + } catch (TaskException e1) { + log.error(e1); + } + } + throw new CloudControllerException(msg, e); + } + return tm; + } +}
