Updated Branches: refs/heads/master 724207c65 -> 449723952
Fixed periodical topology message publisher Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/44972395 Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/44972395 Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/44972395 Branch: refs/heads/master Commit: 44972395270d3cd8541890a2f99bc7a9b8495d8e Parents: 724207c Author: Imesh Gunaratne <[email protected]> Authored: Wed Dec 25 19:02:46 2013 +0530 Committer: Imesh Gunaratne <[email protected]> Committed: Wed Dec 25 19:02:46 2013 +0530 ---------------------------------------------------------------------- .../internal/CloudControllerDSComponent.java | 5 +- .../controller/internal/TaskScheduler.java | 145 ------------------- .../TopologySynchronizerTaskScheduler.java | 102 +++++++++++++ 3 files changed, 105 insertions(+), 147 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/44972395/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java index 9102c5b..c458d49 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java @@ -26,13 +26,13 @@ import org.apache.commons.logging.LogFactory; import org.apache.stratos.cloud.controller.exception.CloudControllerException; import org.apache.stratos.cloud.controller.impl.CloudControllerServiceImpl; import org.apache.stratos.cloud.controller.interfaces.CloudControllerService; +import org.apache.stratos.cloud.controller.publisher.TopologySynchronizerTaskScheduler; import org.apache.stratos.cloud.controller.runtime.FasterLookUpDataHolder; import org.apache.stratos.cloud.controller.topic.instance.status.InstanceStatusEventMessageListener; import org.apache.stratos.cloud.controller.util.CloudControllerConstants; import org.apache.stratos.cloud.controller.util.ServiceReferenceHolder; import org.apache.stratos.messaging.broker.publish.EventPublisher; import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber; -import org.apache.stratos.messaging.util.Constants; import org.osgi.framework.BundleContext; import org.osgi.service.component.ComponentContext; import org.wso2.carbon.ntask.core.service.TaskService; @@ -84,7 +84,8 @@ public class CloudControllerDSComponent { if(log.isInfoEnabled()) { log.info("Scheduling tasks"); } - TaskScheduler.schedule(); + + TopologySynchronizerTaskScheduler.schedule(ServiceReferenceHolder.getInstance().getTaskService()); log.debug("******* Cloud Controller Service bundle is activated ******* "); } catch (Throwable e) { http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/44972395/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/TaskScheduler.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/TaskScheduler.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/TaskScheduler.java deleted file mode 100644 index 0db9176..0000000 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/TaskScheduler.java +++ /dev/null @@ -1,145 +0,0 @@ -package org.apache.stratos.cloud.controller.internal; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.cloud.controller.exception.CloudControllerException; -import org.apache.stratos.cloud.controller.publisher.CartridgeInstanceDataPublisherTask; -import org.apache.stratos.cloud.controller.runtime.FasterLookUpDataHolder; -import org.apache.stratos.cloud.controller.topic.instance.status.InstanceStatusEventMessageDelegator; -import org.apache.stratos.cloud.controller.topology.TopologySynchronizerTask; -import org.apache.stratos.cloud.controller.util.CloudControllerConstants; -import org.apache.stratos.cloud.controller.util.ServiceReferenceHolder; -import org.wso2.carbon.ntask.common.TaskException; -import org.wso2.carbon.ntask.core.TaskInfo; -import org.wso2.carbon.ntask.core.TaskManager; -import org.wso2.carbon.ntask.core.service.TaskService; - -import java.util.HashMap; - -/** - * Cloud Controller task scheduler - */ -public class TaskScheduler { - private static final Log log = LogFactory.getLog(TaskScheduler.class); - - public static void schedule() { - FasterLookUpDataHolder dataHolder = FasterLookUpDataHolder.getInstance(); - TaskService taskService = ServiceReferenceHolder.getInstance().getTaskService(); - - if (dataHolder.getEnableBAMDataPublisher()) { - // register and schedule, BAM data publisher task - registerAndScheduleDataPublisherTask(taskService); - } - - if (dataHolder.getEnableTopologySync()) { - // start the topology builder thread - startTopologyBuilder(); - - // register and schedule, topology synchronizer task - registerAndScheduleTopologySyncTask(taskService); - } - } - - private static void registerAndScheduleTopologySyncTask(TaskService taskService) { - TaskInfo taskInfo; - TaskManager tm = null; - - try { - if(log.isDebugEnabled()) { - log.debug("Scheduling topology synchronization task"); - } - - if (!taskService.getRegisteredTaskTypes().contains(CloudControllerConstants.TOPOLOGY_SYNC_TASK_TYPE)) { - taskService.registerTaskType(CloudControllerConstants.TOPOLOGY_SYNC_TASK_TYPE); - tm = taskService.getTaskManager(CloudControllerConstants.TOPOLOGY_SYNC_TASK_TYPE); - FasterLookUpDataHolder dataHolder = FasterLookUpDataHolder.getInstance(); - String cron = dataHolder.getTopologyConfig().getProperty(CloudControllerConstants.CRON_ELEMENT); - cron = ( cron == null ? CloudControllerConstants.TOPOLOGY_SYNC_CRON : cron ); - if(log.isDebugEnabled()) { - log.debug(String.format("Topology synchronization task cron: %s", cron)); - } - TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo(cron); - taskInfo = new TaskInfo( - CloudControllerConstants.TOPOLOGY_SYNC_TASK_NAME, - TopologySynchronizerTask.class.getName(), - new HashMap<String, String>(), triggerInfo); - tm.registerTask(taskInfo); - if(log.isDebugEnabled()) { - log.debug("Topology synchronization task registered"); - } - } - else { - if(log.isWarnEnabled()) { - log.warn("Topology synchronization task already exists"); - } - } - - } catch (Exception e) { - String msg = "Error scheduling task: " + CloudControllerConstants.TOPOLOGY_SYNC_TASK_NAME; - log.error(msg); - if (tm != null) { - try { - tm.deleteTask(CloudControllerConstants.TOPOLOGY_SYNC_TASK_NAME); - } catch (TaskException e1) { - log.error(e1); - } - } - throw new CloudControllerException(msg, e); - } - } - - private static void startTopologyBuilder() { - // initialize TopologyEventMessageProcessor Consumer - Thread delegatorThread = new Thread(new InstanceStatusEventMessageDelegator()); - // start consumer - delegatorThread.start(); - } - - private static TaskManager registerAndScheduleDataPublisherTask( - TaskService taskService) { - TaskInfo taskInfo; - TaskManager tm = null; - // initialize and schedule the data publisher task - try { - - if (!taskService.getRegisteredTaskTypes().contains( - CloudControllerConstants.DATA_PUB_TASK_TYPE)) { - - taskService - .registerTaskType(CloudControllerConstants.DATA_PUB_TASK_TYPE); - - tm = taskService - .getTaskManager(CloudControllerConstants.DATA_PUB_TASK_TYPE); - - if (!tm.isTaskScheduled(CloudControllerConstants.DATA_PUB_TASK_NAME)) { - FasterLookUpDataHolder dataHolder = FasterLookUpDataHolder.getInstance(); - TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo( - dataHolder.getDataPubConfig().getDataPublisherCron()); - taskInfo = new TaskInfo( - CloudControllerConstants.DATA_PUB_TASK_NAME, - CartridgeInstanceDataPublisherTask.class.getName(), - new HashMap<String, String>(), triggerInfo); - tm.registerTask(taskInfo); - - // Following code is currently not required, due to an issue - // in TS API. - // tm.scheduleTask(taskInfo.getName()); - } - } - - } catch (Exception e) { - String msg = "Error scheduling task: " - + CloudControllerConstants.DATA_PUB_TASK_NAME; - log.error(msg, e); - if (tm != null) { - try { - tm.deleteTask(CloudControllerConstants.DATA_PUB_TASK_NAME); - } catch (TaskException e1) { - log.error(e1); - } - } - throw new CloudControllerException(msg, e); - } - return tm; - } -} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/44972395/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/publisher/TopologySynchronizerTaskScheduler.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/publisher/TopologySynchronizerTaskScheduler.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/publisher/TopologySynchronizerTaskScheduler.java new file mode 100644 index 0000000..49cae09 --- /dev/null +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/publisher/TopologySynchronizerTaskScheduler.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.cloud.controller.publisher; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.cloud.controller.topology.TopologySynchronizerTask; +import org.wso2.carbon.ntask.common.TaskException; +import org.wso2.carbon.ntask.core.TaskInfo; +import org.wso2.carbon.ntask.core.TaskManager; +import org.wso2.carbon.ntask.core.service.TaskService; + +import java.util.HashMap; + +/** + * Topology synchronizer task scheduler for scheduling the topology synchronizer task + * using carbon task service. + */ +public class TopologySynchronizerTaskScheduler { + + private static final Log log = LogFactory.getLog(TopologySynchronizerTaskScheduler.class); + + private static final String TOPOLOGY_SYNC_TASK_TYPE = "TOPOLOGY_SYNC_TASK_TYPE"; + private static final String TOPOLOGY_SYNC_TASK_NAME = "TOPOLOGY_SYNC_TASK"; + private static final String DEFAULT_CRON = "1 * * * * ? *"; + + public static void schedule(TaskService taskService) { + // TODO: Replace this with task scheduler + Thread thread = new Thread(new TaskRunnable()); + thread.start(); + } + + private static class TaskRunnable implements Runnable { + @Override + public void run() { + while (true) { + try { + log.debug("Running topology synchronizer task"); + TopologySynchronizerTask task = new TopologySynchronizerTask(); + task.execute(); + try { + Thread.sleep(60 * 1000); + } catch (InterruptedException e) { + } + } catch (Exception e) { + log.error(e); + } + } + } + } + +// public static void schedule(TaskService taskService) { +// TaskManager taskManager = null; +// try { +// +// if (!taskService.getRegisteredTaskTypes().contains(TOPOLOGY_SYNC_TASK_TYPE)) { +// // Register task type +// taskService.registerTaskType(TOPOLOGY_SYNC_TASK_TYPE); +// +// // Register task +// taskManager = taskService.getTaskManager(TOPOLOGY_SYNC_TASK_TYPE); +// TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo(DEFAULT_CRON); +// TaskInfo taskInfo = new TaskInfo(TOPOLOGY_SYNC_TASK_NAME, +// TopologySynchronizerTask.class.getName(), +// new HashMap<String, String>(), triggerInfo); +// taskManager.registerTask(taskInfo); +// if(log.isDebugEnabled()) { +// log.debug(String.format("Topology synchronization task scheduled: %s", TOPOLOGY_SYNC_TASK_NAME)); +// } +// } +// +// } catch (Exception e) { +// if (taskManager != null) { +// try { +// taskManager.deleteTask(TOPOLOGY_SYNC_TASK_NAME); +// } catch (TaskException te) { +// if (log.isErrorEnabled()) { +// log.error(te); +// } +// } +// } +// throw new RuntimeException(String.format("Could not schedule topology synchronization task: %s", TOPOLOGY_SYNC_TASK_NAME), e); +// } +// } +}
