Updated Branches:
  refs/heads/master 724207c65 -> 449723952

Fixed periodical topology message publisher


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/44972395
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/44972395
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/44972395

Branch: refs/heads/master
Commit: 44972395270d3cd8541890a2f99bc7a9b8495d8e
Parents: 724207c
Author: Imesh Gunaratne <[email protected]>
Authored: Wed Dec 25 19:02:46 2013 +0530
Committer: Imesh Gunaratne <[email protected]>
Committed: Wed Dec 25 19:02:46 2013 +0530

----------------------------------------------------------------------
 .../internal/CloudControllerDSComponent.java    |   5 +-
 .../controller/internal/TaskScheduler.java      | 145 -------------------
 .../TopologySynchronizerTaskScheduler.java      | 102 +++++++++++++
 3 files changed, 105 insertions(+), 147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/44972395/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java
index 9102c5b..c458d49 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java
@@ -26,13 +26,13 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.cloud.controller.exception.CloudControllerException;
 import org.apache.stratos.cloud.controller.impl.CloudControllerServiceImpl;
 import org.apache.stratos.cloud.controller.interfaces.CloudControllerService;
+import 
org.apache.stratos.cloud.controller.publisher.TopologySynchronizerTaskScheduler;
 import org.apache.stratos.cloud.controller.runtime.FasterLookUpDataHolder;
 import 
org.apache.stratos.cloud.controller.topic.instance.status.InstanceStatusEventMessageListener;
 import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
 import org.apache.stratos.cloud.controller.util.ServiceReferenceHolder;
 import org.apache.stratos.messaging.broker.publish.EventPublisher;
 import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
-import org.apache.stratos.messaging.util.Constants;
 import org.osgi.framework.BundleContext;
 import org.osgi.service.component.ComponentContext;
 import org.wso2.carbon.ntask.core.service.TaskService;
@@ -84,7 +84,8 @@ public class CloudControllerDSComponent {
             if(log.isInfoEnabled()) {
                 log.info("Scheduling tasks");
             }
-            TaskScheduler.schedule();
+
+            
TopologySynchronizerTaskScheduler.schedule(ServiceReferenceHolder.getInstance().getTaskService());
 
             log.debug("******* Cloud Controller Service bundle is activated 
******* ");
         } catch (Throwable e) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/44972395/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/TaskScheduler.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/TaskScheduler.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/TaskScheduler.java
deleted file mode 100644
index 0db9176..0000000
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/TaskScheduler.java
+++ /dev/null
@@ -1,145 +0,0 @@
-package org.apache.stratos.cloud.controller.internal;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.cloud.controller.exception.CloudControllerException;
-import 
org.apache.stratos.cloud.controller.publisher.CartridgeInstanceDataPublisherTask;
-import org.apache.stratos.cloud.controller.runtime.FasterLookUpDataHolder;
-import 
org.apache.stratos.cloud.controller.topic.instance.status.InstanceStatusEventMessageDelegator;
-import org.apache.stratos.cloud.controller.topology.TopologySynchronizerTask;
-import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
-import org.apache.stratos.cloud.controller.util.ServiceReferenceHolder;
-import org.wso2.carbon.ntask.common.TaskException;
-import org.wso2.carbon.ntask.core.TaskInfo;
-import org.wso2.carbon.ntask.core.TaskManager;
-import org.wso2.carbon.ntask.core.service.TaskService;
-
-import java.util.HashMap;
-
-/**
- * Cloud Controller task scheduler
- */
-public class TaskScheduler {
-    private static final Log log = LogFactory.getLog(TaskScheduler.class);
-
-    public static void schedule() {
-        FasterLookUpDataHolder dataHolder = 
FasterLookUpDataHolder.getInstance();
-        TaskService taskService = 
ServiceReferenceHolder.getInstance().getTaskService();
-
-        if (dataHolder.getEnableBAMDataPublisher()) {
-            // register and schedule, BAM data publisher task
-            registerAndScheduleDataPublisherTask(taskService);
-        }
-
-        if (dataHolder.getEnableTopologySync()) {
-            // start the topology builder thread
-            startTopologyBuilder();
-
-            // register and schedule, topology synchronizer task
-            registerAndScheduleTopologySyncTask(taskService);
-        }
-    }
-
-    private static void registerAndScheduleTopologySyncTask(TaskService 
taskService) {
-        TaskInfo taskInfo;
-        TaskManager tm = null;
-
-        try {
-            if(log.isDebugEnabled()) {
-                log.debug("Scheduling topology synchronization task");
-            }
-
-            if 
(!taskService.getRegisteredTaskTypes().contains(CloudControllerConstants.TOPOLOGY_SYNC_TASK_TYPE))
 {
-                
taskService.registerTaskType(CloudControllerConstants.TOPOLOGY_SYNC_TASK_TYPE);
-                tm = 
taskService.getTaskManager(CloudControllerConstants.TOPOLOGY_SYNC_TASK_TYPE);
-                FasterLookUpDataHolder dataHolder = 
FasterLookUpDataHolder.getInstance();
-                String cron = 
dataHolder.getTopologyConfig().getProperty(CloudControllerConstants.CRON_ELEMENT);
-                cron = ( cron == null ? 
CloudControllerConstants.TOPOLOGY_SYNC_CRON : cron );
-                if(log.isDebugEnabled()) {
-                    log.debug(String.format("Topology synchronization task 
cron: %s", cron));
-                }
-                TaskInfo.TriggerInfo triggerInfo = new 
TaskInfo.TriggerInfo(cron);
-                taskInfo = new TaskInfo(
-                        CloudControllerConstants.TOPOLOGY_SYNC_TASK_NAME,
-                        TopologySynchronizerTask.class.getName(),
-                        new HashMap<String, String>(), triggerInfo);
-                tm.registerTask(taskInfo);
-                if(log.isDebugEnabled()) {
-                    log.debug("Topology synchronization task registered");
-                }
-            }
-            else {
-                if(log.isWarnEnabled()) {
-                    log.warn("Topology synchronization task already exists");
-                }
-            }
-
-        } catch (Exception e) {
-            String msg = "Error scheduling task: " + 
CloudControllerConstants.TOPOLOGY_SYNC_TASK_NAME;
-            log.error(msg);
-            if (tm != null) {
-                try {
-                    
tm.deleteTask(CloudControllerConstants.TOPOLOGY_SYNC_TASK_NAME);
-                } catch (TaskException e1) {
-                    log.error(e1);
-                }
-            }
-            throw new CloudControllerException(msg, e);
-        }
-    }
-
-    private static void startTopologyBuilder() {
-        // initialize TopologyEventMessageProcessor Consumer
-        Thread delegatorThread = new Thread(new 
InstanceStatusEventMessageDelegator());
-        // start consumer
-        delegatorThread.start();
-    }
-
-    private static TaskManager registerAndScheduleDataPublisherTask(
-            TaskService taskService) {
-        TaskInfo taskInfo;
-        TaskManager tm = null;
-        // initialize and schedule the data publisher task
-        try {
-
-            if (!taskService.getRegisteredTaskTypes().contains(
-                    CloudControllerConstants.DATA_PUB_TASK_TYPE)) {
-
-                taskService
-                        
.registerTaskType(CloudControllerConstants.DATA_PUB_TASK_TYPE);
-
-                tm = taskService
-                        
.getTaskManager(CloudControllerConstants.DATA_PUB_TASK_TYPE);
-
-                if 
(!tm.isTaskScheduled(CloudControllerConstants.DATA_PUB_TASK_NAME)) {
-                    FasterLookUpDataHolder dataHolder = 
FasterLookUpDataHolder.getInstance();
-                    TaskInfo.TriggerInfo triggerInfo = new 
TaskInfo.TriggerInfo(
-                            
dataHolder.getDataPubConfig().getDataPublisherCron());
-                    taskInfo = new TaskInfo(
-                            CloudControllerConstants.DATA_PUB_TASK_NAME,
-                            CartridgeInstanceDataPublisherTask.class.getName(),
-                            new HashMap<String, String>(), triggerInfo);
-                    tm.registerTask(taskInfo);
-
-                    // Following code is currently not required, due to an 
issue
-                    // in TS API.
-                    // tm.scheduleTask(taskInfo.getName());
-                }
-            }
-
-        } catch (Exception e) {
-            String msg = "Error scheduling task: "
-                    + CloudControllerConstants.DATA_PUB_TASK_NAME;
-            log.error(msg, e);
-            if (tm != null) {
-                try {
-                    tm.deleteTask(CloudControllerConstants.DATA_PUB_TASK_NAME);
-                } catch (TaskException e1) {
-                    log.error(e1);
-                }
-            }
-            throw new CloudControllerException(msg, e);
-        }
-        return tm;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/44972395/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/publisher/TopologySynchronizerTaskScheduler.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/publisher/TopologySynchronizerTaskScheduler.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/publisher/TopologySynchronizerTaskScheduler.java
new file mode 100644
index 0000000..49cae09
--- /dev/null
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/publisher/TopologySynchronizerTaskScheduler.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.cloud.controller.publisher;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cloud.controller.topology.TopologySynchronizerTask;
+import org.wso2.carbon.ntask.common.TaskException;
+import org.wso2.carbon.ntask.core.TaskInfo;
+import org.wso2.carbon.ntask.core.TaskManager;
+import org.wso2.carbon.ntask.core.service.TaskService;
+
+import java.util.HashMap;
+
+/**
+ * Topology synchronizer task scheduler for scheduling the topology 
synchronizer task
+ * using carbon task service.
+ */
+public class TopologySynchronizerTaskScheduler {
+
+    private static final Log log = 
LogFactory.getLog(TopologySynchronizerTaskScheduler.class);
+
+    private static final String TOPOLOGY_SYNC_TASK_TYPE = 
"TOPOLOGY_SYNC_TASK_TYPE";
+    private static final String TOPOLOGY_SYNC_TASK_NAME = "TOPOLOGY_SYNC_TASK";
+    private static final String DEFAULT_CRON = "1 * * * * ? *";
+
+    public static void schedule(TaskService taskService) {
+        // TODO: Replace this with task scheduler
+        Thread thread = new Thread(new TaskRunnable());
+        thread.start();
+    }
+
+    private static class TaskRunnable implements Runnable {
+        @Override
+        public void run() {
+            while (true) {
+                try {
+                    log.debug("Running topology synchronizer task");
+                    TopologySynchronizerTask task = new 
TopologySynchronizerTask();
+                    task.execute();
+                    try {
+                        Thread.sleep(60 * 1000);
+                    } catch (InterruptedException e) {
+                    }
+                } catch (Exception e) {
+                    log.error(e);
+                }
+            }
+        }
+    }
+
+//    public static void schedule(TaskService taskService) {
+//        TaskManager taskManager = null;
+//        try {
+//
+//            if 
(!taskService.getRegisteredTaskTypes().contains(TOPOLOGY_SYNC_TASK_TYPE)) {
+//                // Register task type
+//                taskService.registerTaskType(TOPOLOGY_SYNC_TASK_TYPE);
+//
+//                // Register task
+//                taskManager = 
taskService.getTaskManager(TOPOLOGY_SYNC_TASK_TYPE);
+//                TaskInfo.TriggerInfo triggerInfo = new 
TaskInfo.TriggerInfo(DEFAULT_CRON);
+//                TaskInfo taskInfo = new TaskInfo(TOPOLOGY_SYNC_TASK_NAME,
+//                        TopologySynchronizerTask.class.getName(),
+//                        new HashMap<String, String>(), triggerInfo);
+//                taskManager.registerTask(taskInfo);
+//                if(log.isDebugEnabled()) {
+//                    log.debug(String.format("Topology synchronization task 
scheduled: %s", TOPOLOGY_SYNC_TASK_NAME));
+//                }
+//            }
+//
+//        } catch (Exception e) {
+//            if (taskManager != null) {
+//                try {
+//                    taskManager.deleteTask(TOPOLOGY_SYNC_TASK_NAME);
+//                } catch (TaskException te) {
+//                    if (log.isErrorEnabled()) {
+//                        log.error(te);
+//                    }
+//                }
+//            }
+//            throw new RuntimeException(String.format("Could not schedule 
topology synchronization task: %s", TOPOLOGY_SYNC_TASK_NAME), e);
+//        }
+//    }
+}

Reply via email to