Updated Branches:
  refs/heads/master 4cd05fe38 -> c03a42a6e

Fixed cloud controller topology manager singleton model


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/8c72f145
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/8c72f145
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/8c72f145

Branch: refs/heads/master
Commit: 8c72f1452c666b31e4f44b55a4bea4f27fe7b8d2
Parents: dfc250e
Author: Imesh Gunaratne <[email protected]>
Authored: Wed Dec 25 15:24:33 2013 +0530
Committer: Imesh Gunaratne <[email protected]>
Committed: Wed Dec 25 15:24:33 2013 +0530

----------------------------------------------------------------------
 .../impl/CloudControllerServiceImpl.java        |   4 +-
 .../internal/CloudControllerDSComponent.java    |   2 +-
 .../topic/TopologySynchronizerTask.java         |  64 -----------
 .../InstanceStatusEventMessageDelegator.java    |  68 ++++++++++++
 .../InstanceStatusEventMessageListener.java     |  43 ++++++++
 .../status/InstanceStatusEventMessageQueue.java |  44 ++++++++
 .../InstanceStatusEventMessageDelegator.java    |  66 -----------
 .../InstanceStatusEventMessageListener.java     |  42 -------
 .../controller/topology/TopologyBuilder.java    |  68 ++++++------
 .../controller/topology/TopologyManager.java    | 109 +++++++++----------
 .../topology/TopologySynchronizerTask.java      |  62 +++++++++++
 11 files changed, 306 insertions(+), 266 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/8c72f145/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java
index 2f8e278..2340abe 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java
@@ -31,8 +31,8 @@ import org.apache.stratos.cloud.controller.pojo.*;
 import 
org.apache.stratos.cloud.controller.publisher.CartridgeInstanceDataPublisherTask;
 import org.apache.stratos.cloud.controller.registry.RegistryManager;
 import org.apache.stratos.cloud.controller.runtime.FasterLookUpDataHolder;
-import org.apache.stratos.cloud.controller.topic.TopologySynchronizerTask;
-import 
org.apache.stratos.cloud.controller.topology.InstanceStatusEventMessageDelegator;
+import org.apache.stratos.cloud.controller.topology.TopologySynchronizerTask;
+import 
org.apache.stratos.cloud.controller.topic.instance.status.InstanceStatusEventMessageDelegator;
 import org.apache.stratos.cloud.controller.topology.TopologyBuilder;
 import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
 import org.apache.stratos.cloud.controller.util.CloudControllerUtil;

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/8c72f145/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java
index 23d07b5..c3fe066 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java
@@ -27,7 +27,7 @@ import 
org.apache.stratos.cloud.controller.exception.CloudControllerException;
 import org.apache.stratos.cloud.controller.impl.CloudControllerServiceImpl;
 import org.apache.stratos.cloud.controller.interfaces.CloudControllerService;
 import org.apache.stratos.cloud.controller.runtime.FasterLookUpDataHolder;
-import 
org.apache.stratos.cloud.controller.topology.InstanceStatusEventMessageListener;
+import 
org.apache.stratos.cloud.controller.topic.instance.status.InstanceStatusEventMessageListener;
 import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
 import org.apache.stratos.cloud.controller.util.ServiceReferenceHolder;
 import org.apache.stratos.messaging.broker.publish.EventPublisher;

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/8c72f145/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/TopologySynchronizerTask.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/TopologySynchronizerTask.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/TopologySynchronizerTask.java
deleted file mode 100644
index 6f3dc45..0000000
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/TopologySynchronizerTask.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one 
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
- * KIND, either express or implied.  See the License for the 
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.cloud.controller.topic;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.cloud.controller.runtime.FasterLookUpDataHolder;
-import org.apache.stratos.cloud.controller.topology.TopologyEventSender;
-import org.apache.stratos.cloud.controller.topology.TopologyManager;
-import org.wso2.carbon.ntask.core.Task;
-
-import java.util.Map;
-
-public class TopologySynchronizerTask implements Task{
-    private static final Log log = 
LogFactory.getLog(TopologySynchronizerTask.class);
-
-    @Override
-    public void execute() {
-       if(FasterLookUpDataHolder.getInstance().isTopologySyncRunning()||
-                       // this is a temporary fix to avoid task execution - 
limitation with ntask
-                       
!FasterLookUpDataHolder.getInstance().getEnableTopologySync()){
-            return;
-        }
-       
-        if (log.isDebugEnabled()) {
-            log.debug("TopologySynchronizerTask ...");
-        }
-        
-       // publish to the topic 
-        if (TopologyManager.getInstance().getTopology() != null) {
-            
TopologyEventSender.sendCompleteTopologyEvent(TopologyManager.getInstance().getTopology());
-        }
-    }
-    
-    @Override
-    public void init() {
-
-       // this is a temporary fix to avoid task execution - limitation with 
ntask
-               
if(!FasterLookUpDataHolder.getInstance().getEnableTopologySync()){
-                       log.debug("Topology Sync is disabled.");
-                       return;
-               }
-    }
-
-    @Override
-    public void setProperties(Map<String, String> arg0) {}
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/8c72f145/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageDelegator.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageDelegator.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageDelegator.java
new file mode 100644
index 0000000..1b94bbd
--- /dev/null
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageDelegator.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.cloud.controller.topic.instance.status;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cloud.controller.topology.TopologyBuilder;
+import org.apache.stratos.cloud.controller.topology.TopologyManager;
+import 
org.apache.stratos.messaging.event.instance.status.InstanceActivatedEvent;
+import org.apache.stratos.messaging.event.instance.status.InstanceStartedEvent;
+import org.apache.stratos.messaging.util.Constants;
+import org.apache.stratos.messaging.util.Util;
+
+import javax.jms.TextMessage;
+
+public class InstanceStatusEventMessageDelegator implements Runnable {
+    private static final Log log = 
LogFactory.getLog(InstanceStatusEventMessageDelegator.class);
+
+    @Override
+    public void run() {
+        log.info("Instance status event message delegator started");
+
+        while (true) {
+            try {
+                TextMessage message = 
InstanceStatusEventMessageQueue.getInstance().take();
+
+                // retrieve the header
+                String type = 
message.getStringProperty(Constants.EVENT_CLASS_NAME);
+                log.info(String.format("Instance status event message received 
from queue: %s", type));
+
+                if (InstanceStartedEvent.class.getName().equals(type)) {
+                    // retrieve the actual message
+                    String json = message.getText();
+                    TopologyBuilder.handleMemberStarted((InstanceStartedEvent) 
Util.
+                            jsonToObject(json, InstanceStartedEvent.class));
+                } else if 
(InstanceActivatedEvent.class.getName().equals(type)) {
+                    // retrieve the actual message
+                    String json = message.getText();
+                    
TopologyBuilder.handleMemberActivated((InstanceActivatedEvent) Util.
+                            jsonToObject(json, InstanceActivatedEvent.class));
+                } else {
+                    log.warn("Event message received is not 
InstanceStartedEvent or InstanceActivatedEvent");
+                }
+            } catch (Exception e) {
+                String error = "Failed to retrieve the instance status event 
message";
+                log.error(error, e);
+                // Commenting throwing the error. Otherwise thread will not 
execute if an exception is thrown.
+                //throw new RuntimeException(error, e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/8c72f145/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageListener.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageListener.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageListener.java
new file mode 100644
index 0000000..7f40036
--- /dev/null
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageListener.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.cloud.controller.topic.instance.status;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cloud.controller.topology.TopologyManager;
+
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.TextMessage;
+
+/**
+ * this is to handle the topology subscription
+ */
+public class InstanceStatusEventMessageListener implements MessageListener{
+    private static final Log log = 
LogFactory.getLog(InstanceStatusEventMessageListener.class);
+
+    @Override
+    public void onMessage(Message message) {
+        TextMessage receivedMessage = (TextMessage) message;
+        InstanceStatusEventMessageQueue.getInstance().add(receivedMessage);
+        if(log.isDebugEnabled()) {
+            log.debug(String.format("Instance status message added to queue: 
%s", message));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/8c72f145/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageQueue.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageQueue.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageQueue.java
new file mode 100644
index 0000000..5c767ce
--- /dev/null
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageQueue.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.cloud.controller.topic.instance.status;
+
+import javax.jms.TextMessage;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Implements a blocking queue for managing instance status event messages.
+ */
+public class InstanceStatusEventMessageQueue extends 
LinkedBlockingQueue<TextMessage>{
+    private static volatile InstanceStatusEventMessageQueue instance;
+
+    private InstanceStatusEventMessageQueue(){
+    }
+
+    public static synchronized InstanceStatusEventMessageQueue getInstance() {
+        if (instance == null) {
+            synchronized (InstanceStatusEventMessageQueue.class){
+                if (instance == null) {
+                    instance = new InstanceStatusEventMessageQueue();
+                }
+            }
+        }
+        return instance;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/8c72f145/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/InstanceStatusEventMessageDelegator.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/InstanceStatusEventMessageDelegator.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/InstanceStatusEventMessageDelegator.java
deleted file mode 100644
index af94659..0000000
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/InstanceStatusEventMessageDelegator.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.cloud.controller.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import 
org.apache.stratos.messaging.event.instance.status.InstanceActivatedEvent;
-import org.apache.stratos.messaging.event.instance.status.InstanceStartedEvent;
-import org.apache.stratos.messaging.util.Constants;
-import org.apache.stratos.messaging.util.Util;
-
-import javax.jms.TextMessage;
-
-public class InstanceStatusEventMessageDelegator implements Runnable {
-    private static final Log log = 
LogFactory.getLog(InstanceStatusEventMessageDelegator.class);
-
-    @Override
-    public void run() {
-        log.info("Instance status event message delegator started");
-
-        while (true) {
-            try {
-                TextMessage message = 
TopologyManager.getInstance().getInstanceStatusMessageQueue().take();
-
-                // retrieve the header
-                String type = 
message.getStringProperty(Constants.EVENT_CLASS_NAME);
-                log.info(String.format("Instance status event message received 
from queue: %s", type));
-
-                if (InstanceStartedEvent.class.getName().equals(type)) {
-                    // retrieve the actual message
-                    String json = message.getText();
-                    TopologyBuilder.handleMemberStarted((InstanceStartedEvent) 
Util.
-                            jsonToObject(json, InstanceStartedEvent.class));
-                } else if 
(InstanceActivatedEvent.class.getName().equals(type)) {
-                    // retrieve the actual message
-                    String json = message.getText();
-                    
TopologyBuilder.handleMemberActivated((InstanceActivatedEvent) Util.
-                            jsonToObject(json, InstanceActivatedEvent.class));
-                } else {
-                    log.warn("Event message received is not 
InstanceStartedEvent or InstanceActivatedEvent");
-                }
-            } catch (Exception e) {
-                String error = "Failed to retrieve the instance status event 
message";
-                log.error(error, e);
-                // Commenting throwing the error. Otherwise thread will not 
execute if an exception is thrown.
-                //throw new RuntimeException(error, e);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/8c72f145/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/InstanceStatusEventMessageListener.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/InstanceStatusEventMessageListener.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/InstanceStatusEventMessageListener.java
deleted file mode 100644
index f363997..0000000
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/InstanceStatusEventMessageListener.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.cloud.controller.topology;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.TextMessage;
-
-/**
- * this is to handle the topology subscription
- */
-public class InstanceStatusEventMessageListener implements MessageListener{
-    private static final Log log = 
LogFactory.getLog(InstanceStatusEventMessageListener.class);
-
-    @Override
-    public void onMessage(Message message) {
-        TextMessage receivedMessage = (TextMessage) message;
-        
TopologyManager.getInstance().getInstanceStatusMessageQueue().add(receivedMessage);
-        if(log.isDebugEnabled()) {
-            log.debug(String.format("Instance status message added to queue: 
%s", message));
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/8c72f145/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
index ba6a49a..bc4da0f 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
@@ -46,13 +46,13 @@ public class TopologyBuilder {
 
     public static void handleServiceCreated(List<Cartridge> cartridgeList) {
         Service service;
-        Topology topology = TopologyManager.getInstance().getTopology();
+        Topology topology = TopologyManager.getTopology();
         if (cartridgeList == null) {
             throw new RuntimeException(String.format("Cartridge list is 
empty"));
         }
         try {
 
-            TopologyManager.getInstance().acquireWriteLock();
+            TopologyManager.acquireWriteLock();
             for (Cartridge cartridge : cartridgeList) {
                 if (!topology.serviceExists(cartridge.getType())) {
                     service = new Service(cartridge.getType(), 
cartridge.isMultiTenant() ? ServiceType.MultiTenant : ServiceType.SingleTenant);
@@ -66,28 +66,28 @@ public class TopologyBuilder {
                         service.addPort(port);
                     }
                     topology.addService(service);
-                    TopologyManager.getInstance().updateTopology(topology);
+                    TopologyManager.updateTopology(topology);
                 }
             }
         } finally {
-            TopologyManager.getInstance().releaseWriteLock();
+            TopologyManager.releaseWriteLock();
         }
         TopologyEventSender.sendServiceCreateEvent(cartridgeList);
 
     }
 
     public static void handleServiceRemoved(List<Cartridge> cartridgeList) {
-        Topology topology = TopologyManager.getInstance().getTopology();
+        Topology topology = TopologyManager.getTopology();
 
         for (Cartridge cartridge : cartridgeList) {
             if (topology.getService(cartridge.getType()).getClusters().size() 
== 0) {
                 if (topology.serviceExists(cartridge.getType())) {
                     try {
-                        TopologyManager.getInstance().acquireWriteLock();
+                        TopologyManager.acquireWriteLock();
                         topology.removeService(cartridge.getType());
-                        TopologyManager.getInstance().updateTopology(topology);
+                        TopologyManager.updateTopology(topology);
                     } finally {
-                        TopologyManager.getInstance().releaseWriteLock();
+                        TopologyManager.releaseWriteLock();
                     }
                     TopologyEventSender.sendServiceRemovedEvent(cartridgeList);
                 } else {
@@ -101,10 +101,10 @@ public class TopologyBuilder {
     }
 
     public static void handleClusterCreated(Registrant registrant) {
-        Topology topology = TopologyManager.getInstance().getTopology();
+        Topology topology = TopologyManager.getTopology();
         Service service;
         try {
-            TopologyManager.getInstance().acquireWriteLock();
+            TopologyManager.acquireWriteLock();
             String cartridgeType = registrant.getCartridgeType();
             service = topology.getService(cartridgeType);
             Properties props = 
CloudControllerUtil.toJavaUtilProperties(registrant.getProperties());
@@ -134,16 +134,16 @@ public class TopologyBuilder {
                 cluster.setLbCluster(isLb);
                 service.addCluster(cluster);
             }
-            TopologyManager.getInstance().updateTopology(topology);
+            TopologyManager.updateTopology(topology);
             TopologyEventSender.sendClusterCreatedEvent(cartridgeType, 
clusterId, cluster);
 
         } finally {
-            TopologyManager.getInstance().releaseWriteLock();
+            TopologyManager.releaseWriteLock();
         }
     }
 
     public static void handleClusterRemoved(ClusterContext ctxt) {
-        Topology topology = TopologyManager.getInstance().getTopology();
+        Topology topology = TopologyManager.getTopology();
         Service service = topology.getService(ctxt.getCartridgeType());
         if (service == null) {
             throw new RuntimeException(String.format("Service %s does not 
exist",
@@ -157,11 +157,11 @@ public class TopologyBuilder {
         }
 
         try {
-            TopologyManager.getInstance().acquireWriteLock();
+            TopologyManager.acquireWriteLock();
             service.removeCluster(ctxt.getClusterId());
-            TopologyManager.getInstance().updateTopology(topology);
+            TopologyManager.updateTopology(topology);
         } finally {
-            TopologyManager.getInstance().releaseWriteLock();
+            TopologyManager.releaseWriteLock();
         }
         TopologyEventSender.sendClusterRemovedEvent(ctxt);
     }
@@ -169,7 +169,7 @@ public class TopologyBuilder {
     public static void handleMemberSpawned(String memberId, String 
serviceName, String clusterId,
                                            String networkPartitionId, String 
partitionId, String privateIp, String lbClusterId) {
         //adding the new member to the cluster after it is successfully 
started in IaaS.
-        Topology topology = TopologyManager.getInstance().getTopology();
+        Topology topology = TopologyManager.getTopology();
         Service service = topology.getService(serviceName);
         Cluster cluster = service.getCluster(clusterId);
 
@@ -178,21 +178,21 @@ public class TopologyBuilder {
         }
 
         try {
-            TopologyManager.getInstance().acquireWriteLock();
+            TopologyManager.acquireWriteLock();
             Member member = new Member(serviceName, clusterId, 
networkPartitionId, partitionId, memberId);
             member.setStatus(MemberStatus.Created);
             member.setMemberIp(privateIp);
             member.setLbClusterId(lbClusterId);
             cluster.addMember(member);
-            TopologyManager.getInstance().updateTopology(topology);
+            TopologyManager.updateTopology(topology);
         } finally {
-            TopologyManager.getInstance().releaseWriteLock();
+            TopologyManager.releaseWriteLock();
         }
         TopologyEventSender.sendInstanceSpawnedEvent(serviceName, clusterId, 
networkPartitionId, partitionId, memberId, lbClusterId);
     }
 
     public static void handleMemberStarted(InstanceStartedEvent 
instanceStartedEvent) {
-        Topology topology = TopologyManager.getInstance().getTopology();
+        Topology topology = TopologyManager.getTopology();
         Service service = 
topology.getService(instanceStartedEvent.getServiceName());
         if (service == null) {
             throw new RuntimeException(String.format("Service %s does not 
exist",
@@ -211,20 +211,20 @@ public class TopologyBuilder {
                     instanceStartedEvent.getMemberId()));
         }
         try {
-            TopologyManager.getInstance().acquireWriteLock();
+            TopologyManager.acquireWriteLock();
             member.setStatus(MemberStatus.Starting);
             log.info("member started event adding status started");
 
-            TopologyManager.getInstance().updateTopology(topology);
+            TopologyManager.updateTopology(topology);
         } finally {
-            TopologyManager.getInstance().releaseWriteLock();
+            TopologyManager.releaseWriteLock();
         }
         //memberStartedEvent.
         TopologyEventSender.sendMemberStartedEvent(instanceStartedEvent);
     }
 
     public static void handleMemberActivated(InstanceActivatedEvent 
instanceActivatedEvent) {
-        Topology topology = TopologyManager.getInstance().getTopology();
+        Topology topology = TopologyManager.getTopology();
         Service service = 
topology.getService(instanceActivatedEvent.getServiceName());
         if (service == null) {
             throw new RuntimeException(String.format("Service %s does not 
exist",
@@ -247,7 +247,7 @@ public class TopologyBuilder {
                         instanceActivatedEvent.getClusterId(), 
instanceActivatedEvent.getNetworkPartitionId(), 
instanceActivatedEvent.getPartitionId(), instanceActivatedEvent.getMemberId());
 
         try {
-            TopologyManager.getInstance().acquireWriteLock();
+            TopologyManager.acquireWriteLock();
             member.setStatus(MemberStatus.Activated);
             log.info("member started event adding status activated");
             Cartridge cartridge = FasterLookUpDataHolder.getInstance().
@@ -265,16 +265,16 @@ public class TopologyBuilder {
             }
 
             memberActivatedEvent.setMemberIp(member.getMemberIp());
-            TopologyManager.getInstance().updateTopology(topology);
+            TopologyManager.updateTopology(topology);
 
         } finally {
-            TopologyManager.getInstance().releaseWriteLock();
+            TopologyManager.releaseWriteLock();
         }
         TopologyEventSender.sendMemberActivatedEvent(memberActivatedEvent);
     }
 
     public static void handleMemberTerminated(String serviceName, String 
clusterId, String networkPartitionId, String partitionId, String memberId) {
-        Topology topology = TopologyManager.getInstance().getTopology();
+        Topology topology = TopologyManager.getTopology();
         Service service = topology.getService(serviceName);
         Cluster cluster = service.getCluster(clusterId);
         Member member = cluster.getMember(memberId);
@@ -285,11 +285,11 @@ public class TopologyBuilder {
         }
 
         try {
-            TopologyManager.getInstance().acquireWriteLock();
+            TopologyManager.acquireWriteLock();
             cluster.removeMember(member);
-            TopologyManager.getInstance().updateTopology(topology);
+            TopologyManager.updateTopology(topology);
         } finally {
-            TopologyManager.getInstance().releaseWriteLock();
+            TopologyManager.releaseWriteLock();
         }
         TopologyEventSender.sendMemberTerminatedEvent(serviceName, clusterId, 
networkPartitionId, partitionId, memberId);
     }
@@ -297,9 +297,9 @@ public class TopologyBuilder {
     public static void handleMemberSuspended() {
         //TODO
         try {
-            TopologyManager.getInstance().acquireWriteLock();
+            TopologyManager.acquireWriteLock();
         } finally {
-            TopologyManager.getInstance().releaseWriteLock();
+            TopologyManager.releaseWriteLock();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/8c72f145/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyManager.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyManager.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyManager.java
index cdc63d1..9862b9a 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyManager.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyManager.java
@@ -18,17 +18,12 @@
  */
 package org.apache.stratos.cloud.controller.topology;
 
+import com.google.gson.Gson;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.cloud.controller.util.CloudControllerUtil;
 import org.apache.stratos.messaging.domain.topology.Topology;
 
-import com.google.gson.Gson;
-
-import javax.jms.TextMessage;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
@@ -37,86 +32,86 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 public class TopologyManager {
     private static final Log log = LogFactory.getLog(TopologyManager.class);
 
-    private  volatile ReentrantReadWriteLock lock = new 
ReentrantReadWriteLock(true);
-    private volatile ReentrantReadWriteLock.ReadLock readLock = 
lock.readLock();
-    private volatile ReentrantReadWriteLock.WriteLock writeLock = 
lock.writeLock();
-    private volatile Topology topology;
-    private static TopologyManager instance;
-    private BlockingQueue<TextMessage> instanceStatusMessageQueue = new 
LinkedBlockingQueue<TextMessage>();
-
+    private static volatile ReentrantReadWriteLock lock = new 
ReentrantReadWriteLock(true);
+    private static volatile ReentrantReadWriteLock.ReadLock readLock = 
lock.readLock();
+    private static volatile ReentrantReadWriteLock.WriteLock writeLock = 
lock.writeLock();
+    private static volatile Topology topology;
 
     private TopologyManager() {
     }
 
-    public static TopologyManager getInstance() {
-        synchronized (TopologyManager.class) {
-            if (instance == null) {
-                instance = new TopologyManager();
-            }
-            return instance;
-            
+    public static void acquireReadLock() {
+        if(log.isDebugEnabled()) {
+            log.debug("Read lock acquired");
         }
-    }
-
-    public void acquireReadLock() {
         readLock.lock();
     }
 
-    public void releaseReadLock() {
+    public static void releaseReadLock() {
+        if(log.isDebugEnabled()) {
+            log.debug("Read lock released");
+        }
         readLock.unlock();
     }
 
-    public void acquireWriteLock() {
+    public static void acquireWriteLock() {
+        if(log.isDebugEnabled()) {
+            log.debug("Write lock acquired");
+        }
         writeLock.lock();
     }
 
-    public void releaseWriteLock() {
+    public static void releaseWriteLock() {
+        if(log.isDebugEnabled()) {
+            log.debug("Write lock released");
+        }
         writeLock.unlock();
     }
 
-    public synchronized Topology getTopology() {
-        synchronized (TopologyManager.class) {
-            if(this.topology == null) {
-                //need to initialize the topology
-                this.topology = CloudControllerUtil.retrieveTopology();
-                if (this.topology == null) {
-                    if(log.isDebugEnabled()) {
-                        log.debug("Creating new topology");
+    public static Topology getTopology() {
+        if (topology == null) {
+            synchronized (TopologyManager.class) {
+                if (topology == null) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Trying to retrieve topology from registry");
+                    }
+                    topology = CloudControllerUtil.retrieveTopology();
+                    if (topology == null) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("Topology not found in registry, 
creating new");
+                        }
+                        topology = new Topology();
+                    }
+                    if (log.isDebugEnabled()) {
+                        log.debug("Topology initialized");
                     }
-                    this.topology = new Topology();
                 }
             }
         }
-        return this.topology;
+        return topology;
     }
 
-    public synchronized void updateTopology(Topology topology) {
+    /**
+     * Update in-memory topology and persist it in registry.
+     * @param topology_
+     */
+    public static void updateTopology(Topology topology_) {
         synchronized (TopologyManager.class) {
-             this.topology = topology;
-             CloudControllerUtil.persistTopology(this.topology);
-             if (log.isDebugEnabled()) {
-                 log.debug(String.format("Topology updated: %s", toJson()));
-             }
+            if (log.isDebugEnabled()) {
+                log.debug("Updating topology");
+            }
+            topology = topology_;
+            CloudControllerUtil.persistTopology(topology);
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Topology updated: %s", 
toJson(topology)));
+            }
         }
 
     }
 
-    public String toJson() {
+    private static String toJson(Object object) {
         Gson gson = new Gson();
-        return gson.toJson(topology);
-
-    }
-
-    public void setTopology(Topology topology) {
-        this.topology = topology;
-    }
-
-    public BlockingQueue<TextMessage> getInstanceStatusMessageQueue() {
-        return instanceStatusMessageQueue;
-    }
-
-    public void setInstanceStatusMessageQueue(BlockingQueue<TextMessage> 
instanceStatusMessageQueue) {
-        this.instanceStatusMessageQueue = instanceStatusMessageQueue;
+        return gson.toJson(object);
     }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/8c72f145/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologySynchronizerTask.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologySynchronizerTask.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologySynchronizerTask.java
new file mode 100644
index 0000000..6c5a1c7
--- /dev/null
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologySynchronizerTask.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one 
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
+ * KIND, either express or implied.  See the License for the 
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.cloud.controller.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cloud.controller.runtime.FasterLookUpDataHolder;
+import org.wso2.carbon.ntask.core.Task;
+
+import java.util.Map;
+
+public class TopologySynchronizerTask implements Task{
+    private static final Log log = 
LogFactory.getLog(TopologySynchronizerTask.class);
+
+    @Override
+    public void execute() {
+       if(FasterLookUpDataHolder.getInstance().isTopologySyncRunning()||
+                       // this is a temporary fix to avoid task execution - 
limitation with ntask
+                       
!FasterLookUpDataHolder.getInstance().getEnableTopologySync()){
+            return;
+        }
+       
+        if (log.isDebugEnabled()) {
+            log.debug("TopologySynchronizerTask ...");
+        }
+        
+       // publish to the topic 
+        if (TopologyManager.getTopology() != null) {
+            
TopologyEventSender.sendCompleteTopologyEvent(TopologyManager.getTopology());
+        }
+    }
+    
+    @Override
+    public void init() {
+
+       // this is a temporary fix to avoid task execution - limitation with 
ntask
+               
if(!FasterLookUpDataHolder.getInstance().getEnableTopologySync()){
+                       log.debug("Topology Sync is disabled.");
+                       return;
+               }
+    }
+
+    @Override
+    public void setProperties(Map<String, String> arg0) {}
+    
+}

Reply via email to