adding complete application publisher task
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/0d726c8f Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/0d726c8f Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/0d726c8f Branch: refs/heads/docker-grouping-merge Commit: 0d726c8fe2ebc43576ed60d78997df2a5f921d9f Parents: eb523d7 Author: reka <[email protected]> Authored: Fri Oct 31 17:00:03 2014 +0530 Committer: reka <[email protected]> Committed: Fri Oct 31 17:00:03 2014 +0530 ---------------------------------------------------------------------- .../apache/stratos/autoscaler/Constants.java | 4 + .../ApplicationSynchronizeTask.java | 51 ++++++++++++ .../ApplicationSynchronizerTask.java | 50 ----------- .../ApplicationSynchronizerTaskScheduler.java | 87 ++++++++++++++++++++ .../topic/ApplicationsEventPublisher.java | 9 ++ .../internal/AutoscalerServerComponent.java | 41 +++++++-- .../autoscaler/util/ServiceReferenceHolder.java | 12 ++- 7 files changed, 196 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/0d726c8f/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/Constants.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/Constants.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/Constants.java index 3e9e5e2..ef7be70 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/Constants.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/Constants.java @@ -34,6 +34,10 @@ public class Constants { public static final int SCHEDULE_DEFAULT_INITIAL_DELAY = 30; public static final int SCHEDULE_DEFAULT_PERIOD = 15; + public static final String APPLICATION_SYNC_CRON = "1 * * * * ? *"; + public static final String APPLICATION_SYNC_TASK_NAME = "TOPOLOGY_SYNC_TASK"; + public static final String APPLICATION_SYNC_TASK_TYPE = "TOPOLOGY_SYNC_TASK_TYPE"; + public static final String AUTOSCALER_CONFIG_FILE_NAME = "autoscaler.xml"; public static final String CLOUD_CONTROLLER_SERVICE_SFX = "services/CloudControllerService"; http://git-wip-us.apache.org/repos/asf/stratos/blob/0d726c8f/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizeTask.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizeTask.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizeTask.java new file mode 100644 index 0000000..e18cd12 --- /dev/null +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizeTask.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.autoscaler.applications; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.autoscaler.applications.topic.ApplicationsEventPublisher; +import org.wso2.carbon.ntask.core.Task; + +import java.util.Map; + +public class ApplicationSynchronizeTask implements Task { + private static final Log log = LogFactory.getLog(ApplicationSynchronizeTask.class); + + @Override + public void execute() { + if (log.isDebugEnabled()) { + log.debug("Executing topology synchronization task"); + } + // publish to the topic + if (ApplicationHolder.getApplications() != null) { + ApplicationsEventPublisher.sendCompleteTopologyEvent(ApplicationHolder.getApplications()); + } + } + + @Override + public void init() { + log.info("Applications Complete Event publisher task has been started..."); + + } + + @Override + public void setProperties(Map<String, String> arg0) {} +} http://git-wip-us.apache.org/repos/asf/stratos/blob/0d726c8f/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizerTask.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizerTask.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizerTask.java deleted file mode 100644 index a8ce042..0000000 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizerTask.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.stratos.autoscaler.applications; - -import org.apache.stratos.autoscaler.applications.topic.ApplicationsEventPublisher; -import org.apache.stratos.messaging.domain.applications.Applications; -import org.apache.stratos.messaging.message.receiver.applications.ApplicationManager; -import org.wso2.carbon.ntask.core.Task; - -import java.util.Map; - -public class ApplicationSynchronizerTask implements Task { - - @Override - public void setProperties(Map<String, String> stringStringMap) { - - } - - @Override - public void init() { - - } - - @Override - public void execute() { - - Applications applications = ApplicationManager.getApplications(); - if (applications != null) { - // publish complete Applications event - ApplicationsEventPublisher.sendCompleteApplicationsEvent(applications); - } - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/0d726c8f/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizerTaskScheduler.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizerTaskScheduler.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizerTaskScheduler.java new file mode 100644 index 0000000..7d9eb1c --- /dev/null +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/ApplicationSynchronizerTaskScheduler.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.autoscaler.applications; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.autoscaler.Constants; +import org.wso2.carbon.ntask.common.TaskException; +import org.wso2.carbon.ntask.core.TaskInfo; +import org.wso2.carbon.ntask.core.TaskManager; +import org.wso2.carbon.ntask.core.service.TaskService; + +import java.util.HashMap; + +/** + * Topology synchronizer task scheduler for scheduling the topology synchronizer task + * using carbon task service. + */ +public class ApplicationSynchronizerTaskScheduler { + + private static final Log log = LogFactory.getLog(ApplicationSynchronizerTaskScheduler.class); + + public static void schedule(TaskService taskService) { + TaskManager taskManager = null; + try { + + //if (!taskService.getRegisteredTaskTypes().contains(CloudControllerConstants.TOPOLOGY_SYNC_TASK_TYPE)) { + // Register task type + taskService.registerTaskType(Constants.APPLICATION_SYNC_TASK_TYPE); + + /*// Register task + taskManager = taskService.getTaskManager(CloudControllerConstants.TOPOLOGY_SYNC_TASK_TYPE); + String cronProp = dataHolder.getTopologyConfig().getProperty(CloudControllerConstants.CRON_PROPERTY); + String cron = cronProp != null ? cronProp :CloudControllerConstants.TOPOLOGY_SYNC_CRON ; + TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo(cron); + TaskInfo taskInfo = new TaskInfo(CloudControllerConstants.TOPOLOGY_SYNC_TASK_NAME, + TopologySynchronizerTask.class.getName(), + new HashMap<String, String>(), triggerInfo); + taskManager.registerTask(taskInfo);*/ + + taskManager = taskService.getTaskManager(Constants.APPLICATION_SYNC_TASK_TYPE); + String cronProp = Constants.APPLICATION_SYNC_CRON; + TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo(cronProp); + TaskInfo taskInfo = new TaskInfo(Constants.APPLICATION_SYNC_TASK_NAME, + ApplicationSynchronizeTask.class.getName(), + new HashMap<String, String>(), triggerInfo); + taskManager.registerTask(taskInfo); + if (log.isDebugEnabled()) { + log.debug(String.format("Topology synchronization task scheduled: %s", Constants.APPLICATION_SYNC_TASK_NAME)); + } + //} + + } catch (Exception e) { + if (taskManager != null) { + try { + taskManager.deleteTask(Constants.APPLICATION_SYNC_TASK_NAME); + } catch (TaskException te) { + if (log.isErrorEnabled()) { + log.error(te); + } + } + } + + String msg = String.format("Could not schedule topology synchronization task: %s", + Constants.APPLICATION_SYNC_TASK_NAME); + log.error(msg, e); + throw new RuntimeException(msg, e); + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/0d726c8f/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java index bbeeca4..7a1203a 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/applications/topic/ApplicationsEventPublisher.java @@ -23,6 +23,15 @@ public class ApplicationsEventPublisher { publishEvent(new CompleteApplicationsEvent(completeApplications)); } + public static void sendCompleteTopologyEvent(Applications applications) { + CompleteApplicationsEvent applicationsEvent = new CompleteApplicationsEvent(applications); + + if(log.isDebugEnabled()) { + log.debug(String.format("Publishing complete Applications event")); + } + publishEvent(applicationsEvent); + } + public static void sendGroupCreatedEvent(String appId, String groupId) { try { ApplicationManager.acquireReadLockForApplication(appId); http://git-wip-us.apache.org/repos/asf/stratos/blob/0d726c8f/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java index 4823057..5ada0b7 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java @@ -21,6 +21,7 @@ package org.apache.stratos.autoscaler.internal; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.autoscaler.NetworkPartitionLbHolder; +import org.apache.stratos.autoscaler.applications.ApplicationSynchronizerTaskScheduler; import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy; import org.apache.stratos.autoscaler.exception.AutoScalerException; import org.apache.stratos.autoscaler.message.receiver.health.AutoscalerHealthStatEventReceiver; @@ -32,6 +33,7 @@ import org.apache.stratos.autoscaler.registry.RegistryManager; import org.apache.stratos.autoscaler.util.ServiceReferenceHolder; import org.apache.stratos.cloud.controller.stub.deployment.partition.Partition; import org.osgi.service.component.ComponentContext; +import org.wso2.carbon.ntask.core.service.TaskService; import org.wso2.carbon.registry.api.RegistryException; import org.wso2.carbon.registry.core.service.RegistryService; @@ -41,6 +43,9 @@ import java.util.List; /** * @scr.component name=org.apache.stratos.autoscaler.internal.AutoscalerServerComponent" * immediate="true" + * @scr.reference name="ntask.component" interface="org.wso2.carbon.ntask.core.service.TaskService" + * cardinality="1..1" policy="dynamic" bind="setTaskService" + * unbind="unsetTaskService" * @scr.reference name="registry.service" * interface= * "org.wso2.carbon.registry.core.service.RegistryService" @@ -52,13 +57,13 @@ public class AutoscalerServerComponent { private static final Log log = LogFactory.getLog(AutoscalerServerComponent.class); AutoscalerTopologyEventReceiver asTopologyReceiver; -// TopicSubscriber healthStatTopicSubscriber; + // TopicSubscriber healthStatTopicSubscriber; AutoscalerHealthStatEventReceiver autoscalerHealthStatEventReceiver; protected void activate(ComponentContext componentContext) throws Exception { try { // Start topology receiver - asTopologyReceiver = new AutoscalerTopologyEventReceiver(); + asTopologyReceiver = new AutoscalerTopologyEventReceiver(); Thread topologyTopicSubscriberThread = new Thread(asTopologyReceiver); topologyTopicSubscriberThread.start(); if (log.isDebugEnabled()) { @@ -88,7 +93,7 @@ public class AutoscalerServerComponent { Partition partition = partitionIterator.next(); PartitionManager.getInstance().addPartitionToInformationModel(partition); } - + // Adding the network partitions stored in registry to the information model List<NetworkPartitionLbHolder> nwPartitionHolders = RegistryManager.getInstance().retrieveNetworkPartitionLbHolders(); Iterator<NetworkPartitionLbHolder> nwPartitionIterator = nwPartitionHolders.iterator(); @@ -96,7 +101,7 @@ public class AutoscalerServerComponent { NetworkPartitionLbHolder nwPartition = nwPartitionIterator.next(); PartitionManager.getInstance().addNetworkPartitionLbHolder(nwPartition); } - + List<AutoscalePolicy> asPolicies = RegistryManager.getInstance().retrieveASPolicies(); Iterator<AutoscalePolicy> asPolicyIterator = asPolicies.iterator(); while (asPolicyIterator.hasNext()) { @@ -112,6 +117,14 @@ public class AutoscalerServerComponent { } if (log.isInfoEnabled()) { + log.info("Scheduling tasks to publish applications"); + } + + ApplicationSynchronizerTaskScheduler + .schedule(ServiceReferenceHolder.getInstance() + .getTaskService()); + + if (log.isInfoEnabled()) { log.info("Autoscaler Server Component activated"); } } catch (Throwable e) { @@ -120,10 +133,10 @@ public class AutoscalerServerComponent { } protected void deactivate(ComponentContext context) { - asTopologyReceiver.terminate(); - autoscalerHealthStatEventReceiver.terminate(); + asTopologyReceiver.terminate(); + autoscalerHealthStatEventReceiver.terminate(); } - + protected void setRegistryService(RegistryService registryService) { if (log.isDebugEnabled()) { log.debug("Setting the Registry Service"); @@ -143,4 +156,18 @@ public class AutoscalerServerComponent { } ServiceReferenceHolder.getInstance().setRegistry(null); } + + protected void setTaskService(TaskService taskService) { + if (log.isDebugEnabled()) { + log.debug("Setting the Task Service"); + } + ServiceReferenceHolder.getInstance().setTaskService(taskService); + } + + protected void unsetTaskService(TaskService taskService) { + if (log.isDebugEnabled()) { + log.debug("Unsetting the Task Service"); + } + ServiceReferenceHolder.getInstance().setTaskService(null); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/0d726c8f/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/ServiceReferenceHolder.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/ServiceReferenceHolder.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/ServiceReferenceHolder.java index 00629dc..9040f74 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/ServiceReferenceHolder.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/ServiceReferenceHolder.java @@ -21,13 +21,15 @@ package org.apache.stratos.autoscaler.util; */ +import org.wso2.carbon.ntask.core.service.TaskService; import org.wso2.carbon.registry.core.Registry; import org.wso2.carbon.registry.core.session.UserRegistry; public class ServiceReferenceHolder { private static ServiceReferenceHolder instance; - private Registry registry; + private Registry registry; + private TaskService taskService; private ServiceReferenceHolder() { } @@ -46,4 +48,12 @@ public class ServiceReferenceHolder { public Registry getRegistry() { return registry; } + + public TaskService getTaskService() { + return taskService; + } + + public void setTaskService(TaskService taskService) { + this.taskService = taskService; + } }
