Cloud controller clustering support initial implementation

Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/403d5a45
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/403d5a45
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/403d5a45

Branch: refs/heads/master
Commit: 403d5a45dab5ba2bacab51ecf417313972c39c1b
Parents: 830785f
Author: Imesh Gunaratne <[email protected]>
Authored: Sun Nov 30 23:25:16 2014 +0530
Committer: Imesh Gunaratne <[email protected]>
Committed: Sun Nov 30 23:25:16 2014 +0530

----------------------------------------------------------------------
 .../clustering/DistributedObjectHandler.java    | 140 ++++
 .../context/CloudControllerContext.java         | 638 ++++++++++---------
 .../deployers/CloudControllerDeployer.java      |   8 +-
 .../CloudControllerServiceComponent.java        |  20 +-
 .../messaging/topology/TopologyBuilder.java     |   9 +-
 .../controller/registry/RegistryManager.java    | 127 ++--
 .../cloud/controller/registry/Serializer.java   |   6 +-
 .../impl/CloudControllerServiceImpl.java        | 183 ++----
 .../controller/util/CloudControllerUtil.java    |   6 +-
 .../controller/util/PodActivationWatcher.java   |   7 +-
 .../axiom/CloudControllerContextTest.java       |  20 +-
 11 files changed, 604 insertions(+), 560 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/403d5a45/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/clustering/DistributedObjectHandler.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/clustering/DistributedObjectHandler.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/clustering/DistributedObjectHandler.java
new file mode 100644
index 0000000..2b4437e
--- /dev/null
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/clustering/DistributedObjectHandler.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.cloud.controller.clustering;
+
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IList;
+import com.hazelcast.core.ILock;
+import com.hazelcast.core.IMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * An object handler for managing objects in distributed and non-distributed 
environments.
+ */
+public class DistributedObjectHandler {
+    private static final Log log = 
LogFactory.getLog(DistributedObjectHandler.class);
+
+    private final boolean clustered;
+    private final HazelcastInstance hazelcastInstance;
+
+    public DistributedObjectHandler(boolean clustered, HazelcastInstance 
hazelcastInstance) {
+        this.clustered = clustered;
+        this.hazelcastInstance = hazelcastInstance;
+    }
+
+    private com.hazelcast.core.ILock acquireDistributedLock(Object object) {
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Acquiring distributed lock for %s...", 
object.getClass().getSimpleName()));
+        }
+        ILock lock = hazelcastInstance.getLock(object);
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Distributed lock acquired for %s", 
object.getClass().getSimpleName()));
+        }
+        return lock;
+    }
+
+    private void releaseDistributedLock(ILock lock) {
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Releasing distributed lock for %s...", 
lock.getKey()));
+        }
+        lock.forceUnlock();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Distributed lock released for %s", 
lock.getKey()));
+        }
+    }
+
+    public Map getMap(String key) {
+        if(clustered) {
+            return hazelcastInstance.getMap(key);
+        } else {
+            return new ConcurrentHashMap<Object, Object>();
+        }
+    }
+
+    public List getList(String name) {
+        if(clustered) {
+            return hazelcastInstance.getList(name);
+        } else {
+            return new ArrayList();
+        }
+    }
+
+    public void putToMap(Map map, Object key, Object value) {
+         if(clustered) {
+             ILock lock = null;
+             try {
+                 lock = acquireDistributedLock(map);
+                 ((IMap)map).set(key, value);
+             } finally {
+                 releaseDistributedLock(lock);
+             }
+         } else {
+            map.put(key, value);
+         }
+    }
+
+    public void removeFromMap(Map map, Object key) {
+        if(clustered) {
+            ILock lock = null;
+            try {
+                lock = acquireDistributedLock(map);
+                ((IMap)map).delete(key);
+            } finally {
+                releaseDistributedLock(lock);
+            }
+        } else {
+            map.remove(key);
+        }
+    }
+
+    public void addToList(List list, Object value) {
+        if(clustered) {
+            ILock lock = null;
+            try {
+                lock = acquireDistributedLock(list);
+                ((IList)list).add(value);
+            } finally {
+                releaseDistributedLock(lock);
+            }
+        } else {
+            list.add(value);
+        }
+    }
+
+    public void removeFromList(List list, Object value) {
+        if(clustered) {
+            ILock lock = null;
+            try {
+                lock = acquireDistributedLock(list);
+                ((IList)list).remove(value);
+            } finally {
+                releaseDistributedLock(lock);
+            }
+        } else {
+            list.remove(value);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/403d5a45/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
index 9a1cdc2..b589ecc 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
@@ -18,11 +18,18 @@
  */
 package org.apache.stratos.cloud.controller.context;
 
+import org.apache.axis2.clustering.ClusteringAgent;
+import org.apache.axis2.engine.AxisConfiguration;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cloud.controller.clustering.DistributedObjectHandler;
 import org.apache.stratos.cloud.controller.domain.*;
+import org.apache.stratos.cloud.controller.internal.ServiceReferenceHolder;
+import org.apache.stratos.cloud.controller.registry.Deserializer;
 import org.apache.stratos.cloud.controller.registry.RegistryManager;
+import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
 import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher;
+import org.wso2.carbon.registry.core.exceptions.RegistryException;
 
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -37,384 +44,389 @@ import java.util.concurrent.ScheduledFuture;
 /**
  * This object holds all runtime data and provides faster access. This is a 
Singleton class.
  */
-public class CloudControllerContext implements Serializable{
+public class CloudControllerContext implements Serializable {
 
     private static final long serialVersionUID = -2662307358852779897L;
     private static final Log log = 
LogFactory.getLog(CloudControllerContext.class);
 
+    public static final String CC_CLUSTER_ID_TO_MEMBER_CTX = 
"CC_CLUSTER_ID_TO_MEMBER_CTX";
+    public static final String CC_MEMBER_ID_TO_MEMBER_CTX = 
"CC_MEMBER_ID_TO_MEMBER_CTX";
+    public static final String CC_MEMBER_ID_TO_SCH_TASK = 
"CC_MEMBER_ID_TO_SCH_TASK";
+    public static final String CC_KUB_CLUSTER_ID_TO_KUB_CLUSTER_CTX = 
"CC_KUB_CLUSTER_ID_TO_KUB_CLUSTER_CTX";
+    public static final String CC_CLUSTER_ID_TO_CLUSTER_CTX = 
"CC_CLUSTER_ID_TO_CLUSTER_CTX";
+    public static final String CC_CARTRIDGE_TYPE_TO_PARTITION_IDS = 
"CC_CARTRIDGE_TYPE_TO_PARTITION_IDS";
+    public static final String CC_CARTRIDGES = "CC_CARTRIDGES";
+    public static final String CC_SERVICE_GROUPS = "CC_SERVICE_GROUPS";
+
     private static volatile CloudControllerContext instance;
 
+    private final DistributedObjectHandler distributedObjectHandler;
+
        /* We keep following maps in order to make the look up time, small. */
-       
-       /**
+
+    /**
      * Key - cluster id
      * Value - list of {@link MemberContext}
      */
-    private Map<String, List<MemberContext>> clusterIdToMemberContext;
+    private Map<String, List<MemberContext>> clusterIdToMemberContextListMap;
+
+    /**
+     * Key - member id
+     * Value - {@link MemberContext}
+     */
+    private Map<String, MemberContext> memberIdToMemberContextMap;
 
     /**
-        * Key - member id
-        * Value - {@link MemberContext}
-        */
-       private Map<String, MemberContext> memberIdToContext;
-       
-       /**
      * Key - member id
      * Value - ScheduledFuture task
      */
-    private transient Map<String, ScheduledFuture<?>> memberIdToScheduledTask;
-       
-       /**
-        * Key - Kubernetes cluster id
-        * Value - {@link 
org.apache.stratos.cloud.controller.domain.KubernetesClusterContext}
-        */
-       private Map<String, KubernetesClusterContext> 
kubClusterIdToKubClusterContext;
-       
-       /**
-        * Key - cluster id
-        * Value - {@link 
org.apache.stratos.cloud.controller.domain.ClusterContext}
-        */
-       private Map<String, ClusterContext> clusterIdToContext;
-       
-       /**
-        * This works as a cache to hold already validated partitions against a 
cartridge type.
-        * Key - cartridge type
-        * Value - list of partition ids
-        */
-       private Map<String, List<String>> cartridgeTypeToPartitionIds = new 
ConcurrentHashMap<String, List<String>>();
-       
-       /**
+    private transient Map<String, ScheduledFuture<?>> 
memberIdToScheduledTaskMap;
+
+    /**
+     * Key - Kubernetes cluster id
+     * Value - {@link 
org.apache.stratos.cloud.controller.domain.KubernetesClusterContext}
+     */
+    private Map<String, KubernetesClusterContext> 
kubClusterIdToKubClusterContextMap;
+
+    /**
+     * Key - cluster id
+     * Value - {@link 
org.apache.stratos.cloud.controller.domain.ClusterContext}
+     */
+    private Map<String, ClusterContext> clusterIdToContextMap;
+
+    /**
+     * This works as a cache to hold already validated partitions against a 
cartridge type.
+     * Key - cartridge type
+     * Value - list of partition ids
+     */
+    private Map<String, List<String>> cartridgeTypeToPartitionIdsMap = new 
ConcurrentHashMap<String, List<String>>();
+
+    /**
      * Thread pool used in this task to execute parallel tasks.
      */
-    private transient ExecutorService executor = 
Executors.newFixedThreadPool(20);
-       
-       /**
-        * List of registered {@link 
org.apache.stratos.cloud.controller.domain.Cartridge}s
-        */
-       private List<Cartridge> cartridges;
-       
-       /**
-        * List of deployed service groups
-        */
-       private List<ServiceGroup> serviceGroups;
-
-       private String serializationDir;
-       private String streamId;
-       private boolean isPublisherRunning;
-       private boolean isTopologySyncRunning;
+    private transient ExecutorService executorService = 
Executors.newFixedThreadPool(20);
+
+    /**
+     * List of registered {@link 
org.apache.stratos.cloud.controller.domain.Cartridge}s
+     */
+    private List<Cartridge> cartridges;
+
+    /**
+     * List of deployed service groups
+     */
+    private List<ServiceGroup> serviceGroups;
+
+    private String streamId;
+    private boolean isPublisherRunning;
+    private boolean isTopologySyncRunning;
     private boolean clustered;
 
     private transient AsyncDataPublisher dataPublisher;
 
     private CloudControllerContext() {
-        // Initialize cloud controller context
-        clusterIdToMemberContext = new ConcurrentHashMap<String, 
List<MemberContext>>();
-        memberIdToContext = new ConcurrentHashMap<String, MemberContext>();
-        memberIdToScheduledTask = new ConcurrentHashMap<String, 
ScheduledFuture<?>>();
-        kubClusterIdToKubClusterContext = new ConcurrentHashMap<String, 
KubernetesClusterContext>();
-        clusterIdToContext = new ConcurrentHashMap<String, ClusterContext>();
-        cartridgeTypeToPartitionIds = new ConcurrentHashMap<String, 
List<String>>();
-        cartridges = new ArrayList<Cartridge>();
-        serviceGroups = new ArrayList<ServiceGroup>();
-
-        if (log.isInfoEnabled()) {
-            log.info("Cloud controller context initialized locally");
+        // Check clustering status
+        AxisConfiguration axisConfiguration = 
ServiceReferenceHolder.getInstance().getAxisConfiguration();
+        if ((axisConfiguration != null) && 
(axisConfiguration.getClusteringAgent() != null)) {
+            clustered = true;
         }
+
+        // Initialize distributed object handler
+        distributedObjectHandler = new DistributedObjectHandler(isClustered(),
+                ServiceReferenceHolder.getInstance().getHazelcastInstance());
+
+        // Initialize objects
+        clusterIdToMemberContextListMap = 
distributedObjectHandler.getMap(CC_CLUSTER_ID_TO_MEMBER_CTX);
+        memberIdToMemberContextMap = 
distributedObjectHandler.getMap(CC_MEMBER_ID_TO_MEMBER_CTX);
+        memberIdToScheduledTaskMap = 
distributedObjectHandler.getMap(CC_MEMBER_ID_TO_SCH_TASK);
+        kubClusterIdToKubClusterContextMap = 
distributedObjectHandler.getMap(CC_KUB_CLUSTER_ID_TO_KUB_CLUSTER_CTX);
+        clusterIdToContextMap = 
distributedObjectHandler.getMap(CC_CLUSTER_ID_TO_CLUSTER_CTX);
+        cartridgeTypeToPartitionIdsMap = 
distributedObjectHandler.getMap(CC_CARTRIDGE_TYPE_TO_PARTITION_IDS);
+        cartridges = distributedObjectHandler.getList(CC_CARTRIDGES);
+        serviceGroups = distributedObjectHandler.getList(CC_SERVICE_GROUPS);
+
+        // Update context from the registry
+        updateContextFromRegistry();
     }
 
-       public static CloudControllerContext getInstance() {
-               if (instance == null) {
-                       synchronized (CloudControllerContext.class) {
-                               if (instance == null && 
RegistryManager.getInstance() != null) {
-                                       Object obj = 
RegistryManager.getInstance().retrieve();
-                                       if (obj != null) {
-                                               if (obj instanceof 
CloudControllerContext) {
-                                                       instance = 
(CloudControllerContext) obj;
-                                               }
-                                       }
-                               }
-                               if(instance == null) {
-                                       instance = new CloudControllerContext();
-                               }
-                       }
-               }
-               return instance;
-       }
-
-       public List<Cartridge> getCartridges() {
-               return cartridges;
-       }
-       
-       public void setCartridges(List<Cartridge> cartridges) {
-           this.cartridges = cartridges;
-       }
-       
-       public void setServiceGroups(List<ServiceGroup> serviceGroups) {
-               this.serviceGroups = serviceGroups;
-       }
-       
-       public List<ServiceGroup> getServiceGroups() {
-               return this.serviceGroups;
-       }
-
-
-       public Cartridge getCartridge(String cartridgeType) {
-               for (Cartridge cartridge : cartridges) {
-                       if (cartridge.getType().equals(cartridgeType)) {
-                               return cartridge;
-                       }
-               }
-               return null;
-       }
-       
-       public void addCartridge(Cartridge newCartridges) {
-               cartridges.add(newCartridges);
-       }
-
-       public void removeCartridges(List<Cartridge> cartridges) {
-               if (this.cartridges != null) {
-                       this.cartridges.removeAll(cartridges);
-               }
-
-       }
-       
-       public ServiceGroup getServiceGroup(String name) {
-               for (ServiceGroup serviceGroup : serviceGroups) {
-                       if (serviceGroup.getName().equals(name)) {
-                               return serviceGroup;
-                       }
-               }
-
-               return null;
-       }
-       
-       public void addServiceGroup(ServiceGroup newServiceGroup) {
-               this.serviceGroups.add(newServiceGroup);
-       }
-       
-       public void removeServiceGroup(List<ServiceGroup> serviceGroup) {
-               if (this.serviceGroups != null) {
-                       this.serviceGroups.removeAll(serviceGroup);
-               }
-       }
-
-       public String getSerializationDir() {
-               return serializationDir;
-       }
-
-       public void setSerializationDir(String serializationDir) {
-               this.serializationDir = serializationDir;
-       }
-
-       public AsyncDataPublisher getDataPublisher() {
-               return dataPublisher;
-       }
-
-       public void setDataPublisher(AsyncDataPublisher dataPublisher) {
-               this.dataPublisher = dataPublisher;
-       }
-
-       public String getStreamId() {
-               return streamId;
-       }
-
-       public void setStreamId(String streamId) {
-               this.streamId = streamId;
-       }
-
-       public boolean isPublisherRunning() {
-               return isPublisherRunning;
-       }
-
-       public void setPublisherRunning(boolean isPublisherRunning) {
-               this.isPublisherRunning = isPublisherRunning;
-       }
-
-       public boolean isTopologySyncRunning() {
-           return isTopologySyncRunning;
-    }
-
-       public void setTopologySyncRunning(boolean isTopologySyncRunning) {
-           this.isTopologySyncRunning = isTopologySyncRunning;
-    }
-
-    public void addMemberContext(MemberContext ctxt) {
-        memberIdToContext.put(ctxt.getMemberId(), ctxt);
-        
-        List<MemberContext> ctxts;
-        
-        if((ctxts = clusterIdToMemberContext.get(ctxt.getClusterId())) == 
null) {
-            ctxts = new ArrayList<MemberContext>();
-        } 
-        if(ctxts.contains(ctxt)) {
-               ctxts.remove(ctxt);
+    public static CloudControllerContext getInstance() {
+        if (instance == null) {
+            synchronized (CloudControllerContext.class) {
+                if (instance == null) {
+                    instance = new CloudControllerContext();
+                }
+            }
+        }
+        return instance;
+    }
+
+    public List<Cartridge> getCartridges() {
+        return cartridges;
+    }
+
+    public void setCartridges(List<Cartridge> cartridges) {
+        this.cartridges = cartridges;
+    }
+
+    public void setServiceGroups(List<ServiceGroup> serviceGroups) {
+        this.serviceGroups = serviceGroups;
+    }
+
+    public List<ServiceGroup> getServiceGroups() {
+        return this.serviceGroups;
+    }
+
+    public Cartridge getCartridge(String cartridgeType) {
+        for (Cartridge cartridge : cartridges) {
+            if (cartridge.getType().equals(cartridgeType)) {
+                return cartridge;
+            }
+        }
+        return null;
+    }
+
+    public void addCartridge(Cartridge newCartridges) {
+        distributedObjectHandler.addToList(cartridges, newCartridges);
+    }
+
+    public ServiceGroup getServiceGroup(String name) {
+        for (ServiceGroup serviceGroup : serviceGroups) {
+            if (serviceGroup.getName().equals(name)) {
+                return serviceGroup;
+            }
         }
-        ctxts.add(ctxt);
-        clusterIdToMemberContext.put(ctxt.getClusterId(), ctxts);
-        if(log.isDebugEnabled()) {
-               
-               log.debug("Added Member Context to the information model. 
"+ctxt);
+        return null;
+    }
+
+    public void addServiceGroup(ServiceGroup newServiceGroup) {
+        distributedObjectHandler.addToList(serviceGroups, newServiceGroup);
+    }
+
+    public void removeServiceGroup(List<ServiceGroup> serviceGroup) {
+        if (this.serviceGroups != null) {
+            this.serviceGroups.removeAll(serviceGroup);
         }
     }
-    
+
+    public AsyncDataPublisher getDataPublisher() {
+        return dataPublisher;
+    }
+
+    public void setDataPublisher(AsyncDataPublisher dataPublisher) {
+        this.dataPublisher = dataPublisher;
+    }
+
+    public String getStreamId() {
+        return streamId;
+    }
+
+    public void setStreamId(String streamId) {
+        this.streamId = streamId;
+    }
+
+    public boolean isPublisherRunning() {
+        return isPublisherRunning;
+    }
+
+    public void setPublisherRunning(boolean isPublisherRunning) {
+        this.isPublisherRunning = isPublisherRunning;
+    }
+
+    public boolean isTopologySyncRunning() {
+        return isTopologySyncRunning;
+    }
+
+    public void setTopologySyncRunning(boolean isTopologySyncRunning) {
+        this.isTopologySyncRunning = isTopologySyncRunning;
+    }
+
+    public void addMemberContext(MemberContext memberContext) {
+        distributedObjectHandler.putToMap(memberIdToMemberContextMap, 
memberContext.getMemberId(), memberContext);
+
+        List<MemberContext> memberContextList;
+        if ((memberContextList = 
clusterIdToMemberContextListMap.get(memberContext.getClusterId())) == null) {
+            memberContextList = new ArrayList<MemberContext>();
+        }
+        if (memberContextList.contains(memberContext)) {
+            
distributedObjectHandler.removeFromList(memberContextList,memberContext);
+        }
+        distributedObjectHandler.addToList(memberContextList, memberContext);
+        distributedObjectHandler.putToMap(clusterIdToMemberContextListMap, 
memberContext.getClusterId(),
+                memberContextList);
+        if (log.isDebugEnabled()) {
+            log.debug("Added member context to the cloud controller context: " 
+ memberContext);
+        }
+    }
+
     public void addScheduledFutureJob(String memberId, ScheduledFuture<?> job) 
{
-        memberIdToScheduledTask.put(memberId, job);
+        distributedObjectHandler.putToMap(memberIdToScheduledTaskMap, 
memberId, job);
     }
-    
+
     public List<MemberContext> removeMemberContextsOfCluster(String clusterId) 
{
-        List<MemberContext> ctxts = clusterIdToMemberContext.remove(clusterId);
-        if(ctxts == null) {
+        List<MemberContext> memberContextList = 
clusterIdToMemberContextListMap.get(clusterId);
+        
distributedObjectHandler.removeFromMap(clusterIdToMemberContextListMap, 
clusterId);
+        if (memberContextList == null) {
             return new ArrayList<MemberContext>();
         }
-        for (MemberContext memberContext : ctxts) {
+        for (MemberContext memberContext : memberContextList) {
             String memberId = memberContext.getMemberId();
-            memberIdToContext.remove(memberId);
-            stopTask(memberIdToScheduledTask.remove(memberId));
-        }
-        if(log.isDebugEnabled()) {
-               
-               log.debug("Removed Member Context from the information model. 
"+ instance);
+            distributedObjectHandler.removeFromMap(memberIdToMemberContextMap, 
memberId);
+            ScheduledFuture<?> task = memberIdToScheduledTaskMap.get(memberId);
+            distributedObjectHandler.removeFromMap(memberIdToScheduledTaskMap, 
memberId);
+            stopTask(task);
+
+            if (log.isDebugEnabled()) {
+                log.debug("Removed member context from cloud controller 
context: " +
+                        "[member-id] " + memberId);
+            }
         }
-        return ctxts;
+        return memberContextList;
     }
-    
+
     public MemberContext removeMemberContext(String memberId, String 
clusterId) {
-       MemberContext returnedCtxt = memberIdToContext.remove(memberId);
-        List<MemberContext> ctxts = clusterIdToMemberContext.get(clusterId);
-
-        if (ctxts != null) {
-            
-            List<MemberContext> newCtxts =  new 
ArrayList<MemberContext>(ctxts);
-            
-            for (Iterator<MemberContext> iterator = newCtxts.iterator(); 
iterator.hasNext();) {
-                MemberContext memberContext = (MemberContext) iterator.next();
-                if(memberId.equals(memberContext.getMemberId())) {
-                    if(log.isDebugEnabled()) {
-                        
-                        log.debug("MemberContext [id]: "+memberId+" removed 
from information model.");
+        MemberContext removedMemberContext = 
memberIdToMemberContextMap.get(memberId);
+        distributedObjectHandler.removeFromMap(memberIdToMemberContextMap, 
memberId);
+
+        List<MemberContext> memberContextList = 
clusterIdToMemberContextListMap.get(clusterId);
+        if (memberContextList != null) {
+            List<MemberContext> newCtxts = new 
ArrayList<MemberContext>(memberContextList);
+            for (Iterator<MemberContext> iterator = newCtxts.iterator(); 
iterator.hasNext(); ) {
+                MemberContext memberContext = iterator.next();
+                if (memberId.equals(memberContext.getMemberId())) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Member context removed from cloud 
controller context: [member-id] " + memberId);
                     }
                     iterator.remove();
                 }
             }
-            
-            clusterIdToMemberContext.put(clusterId, newCtxts);
+            distributedObjectHandler.putToMap(clusterIdToMemberContextListMap, 
clusterId, newCtxts);
         }
-        
-        stopTask(memberIdToScheduledTask.remove(memberId));
-        
-        return returnedCtxt;
+        ScheduledFuture<?> task = memberIdToScheduledTaskMap.get(memberId);
+        distributedObjectHandler.removeFromMap(memberIdToScheduledTaskMap, 
memberId);
+        stopTask(task);
+        return removedMemberContext;
     }
-    
+
     private void stopTask(ScheduledFuture<?> task) {
         if (task != null) {
-            
             task.cancel(true);
-            log.info("Scheduled Pod Activation Watcher task canceled.");
+            log.info("Scheduled pod activation watcher task canceled");
         }
     }
-    
+
     public MemberContext getMemberContextOfMemberId(String memberId) {
-        return memberIdToContext.get(memberId);
+        return memberIdToMemberContextMap.get(memberId);
     }
-    
+
     public List<MemberContext> getMemberContextsOfClusterId(String clusterId) {
-        return clusterIdToMemberContext.get(clusterId);
+        return clusterIdToMemberContextListMap.get(clusterId);
+    }
+
+    public void addClusterContext(ClusterContext ctxt) {
+        distributedObjectHandler.putToMap(clusterIdToContextMap, 
ctxt.getClusterId(), ctxt);
     }
 
-    public Map<String, List<MemberContext>> getClusterIdToMemberContext() {
-        return clusterIdToMemberContext;
+    public ClusterContext getClusterContext(String clusterId) {
+        return clusterIdToContextMap.get(clusterId);
     }
-    
-    public void setClusterIdToMemberContext(Map<String, List<MemberContext>> 
clusterIdToMemberContext) {
-        this.clusterIdToMemberContext = clusterIdToMemberContext;
+
+    public ClusterContext removeClusterContext(String clusterId) {
+        ClusterContext removed = clusterIdToContextMap.get(clusterId);
+        distributedObjectHandler.removeFromMap(clusterIdToContextMap, 
clusterId);
+        return removed;
     }
 
-    public Map<String, MemberContext> getMemberIdToContext() {
-        return memberIdToContext;
+    public ExecutorService getExecutorService() {
+        return executorService;
     }
 
-    public void setMemberIdToContext(Map<String, MemberContext> 
memberIdToContext) {
-        this.memberIdToContext = memberIdToContext;
+    public List<String> getPartitionIds(String cartridgeType) {
+        return cartridgeTypeToPartitionIdsMap.get(cartridgeType);
     }
 
-    public void addClusterContext(ClusterContext ctxt) {
-        clusterIdToContext.put(ctxt.getClusterId(), ctxt);
+    public void addToCartridgeTypeToPartitionIdMap(String cartridgeType, 
String partitionId) {
+        List<String> list = 
this.cartridgeTypeToPartitionIdsMap.get(cartridgeType);
+        if (list == null) {
+            list = new ArrayList<String>();
+        }
+        list.add(partitionId);
+        distributedObjectHandler.putToMap(cartridgeTypeToPartitionIdsMap, 
cartridgeType, list);
     }
-    
-    public ClusterContext getClusterContext(String clusterId) {
-        return clusterIdToContext.get(clusterId);
+
+    public void removeFromCartridgeTypeToPartitionIds(String cartridgeType) {
+        distributedObjectHandler.removeFromMap(cartridgeTypeToPartitionIdsMap, 
cartridgeType);
     }
-    
-    public ClusterContext removeClusterContext(String clusterId) {
-        return clusterIdToContext.remove(clusterId);
-    }
-    
-    public Map<String, ClusterContext> getClusterIdToContext() {
-        return clusterIdToContext;
-    }
-
-    public void setClusterIdToContext(Map<String, ClusterContext> 
clusterIdToContext) {
-        this.clusterIdToContext = clusterIdToContext;
-    }
-
-       public ExecutorService getExecutor() {
-               return executor;
-       }
-
-       public void setExecutor(ExecutorService executor) {
-               this.executor = executor;
-       }
-
-       public Map<String, List<String>> getCartridgeTypeToPartitionIds() {
-               return cartridgeTypeToPartitionIds;
-       }
-
-       public void setCartridgeTypeToPartitionIds(
-                       Map<String, List<String>> cartridgeTypeToPartitionIds) {
-               this.cartridgeTypeToPartitionIds = cartridgeTypeToPartitionIds;
-       }
-       
-       public void addToCartridgeTypeToPartitionIdMap(String cartridgeType, 
String partitionId) {
-               List<String> list = 
this.cartridgeTypeToPartitionIds.get(cartridgeType);
-               
-               if(list == null) {
-                       list = new ArrayList<String>();
-               }
-               
-               list.add(partitionId);
-               this.cartridgeTypeToPartitionIds.put(cartridgeType, list);
-       }
-       
-       public void removeFromCartridgeTypeToPartitionIds(String cartridgeType) 
{
-               this.cartridgeTypeToPartitionIds.remove(cartridgeType);
-       }
-
-       public Map<String, KubernetesClusterContext> 
getKubClusterIdToKubClusterContext() {
-               return kubClusterIdToKubClusterContext;
-       }
-       
-       public KubernetesClusterContext getKubernetesClusterContext(String 
kubClusterId) {
-               return kubClusterIdToKubClusterContext.get(kubClusterId);
-       }
-       
-       public void addKubernetesClusterContext(KubernetesClusterContext ctxt) {
-               
this.kubClusterIdToKubClusterContext.put(ctxt.getKubernetesClusterId(), ctxt);
-       }
-
-       public void setKubClusterIdToKubClusterContext(
-                       Map<String, KubernetesClusterContext> 
kubClusterIdToKubClusterContext) {
-               this.kubClusterIdToKubClusterContext = 
kubClusterIdToKubClusterContext;
-       }
-
-    public Map<String, ScheduledFuture<?>> getMemberIdToScheduledTask() {
-        return memberIdToScheduledTask;
-    }
-
-    public void setMemberIdToScheduledTask(Map<String, ScheduledFuture<?>> 
memberIdToScheduledTask) {
-        this.memberIdToScheduledTask = memberIdToScheduledTask;
+
+    public KubernetesClusterContext getKubernetesClusterContext(String 
kubClusterId) {
+        return kubClusterIdToKubClusterContextMap.get(kubClusterId);
+    }
+
+    public void addKubernetesClusterContext(KubernetesClusterContext 
kubernetesClusterContext) {
+        distributedObjectHandler.putToMap(kubClusterIdToKubClusterContextMap,
+                kubernetesClusterContext.getKubernetesClusterId(),
+                kubernetesClusterContext);
     }
 
     public boolean isClustered() {
         return clustered;
     }
+
+    public boolean isCoordinator() {
+        AxisConfiguration axisConfiguration = 
ServiceReferenceHolder.getInstance().getAxisConfiguration();
+        ClusteringAgent clusteringAgent = 
axisConfiguration.getClusteringAgent();
+        return ((axisConfiguration != null) && (clusteringAgent != null) && 
(clusteringAgent.isCoordinator()));
+    }
+
+    public void persist() throws RegistryException {
+        if ((!isClustered()) || (isCoordinator())) {
+            
RegistryManager.getInstance().persist(CloudControllerConstants.DATA_RESOURCE, 
this);
+        }
+    }
+
+    private void updateContextFromRegistry() {
+        if ((!isClustered()) || (isCoordinator())) {
+            try {
+                Object obj = 
RegistryManager.getInstance().read(CloudControllerConstants.DATA_RESOURCE);
+                if (obj != null) {
+                    Object dataObj = 
Deserializer.deserializeFromByteArray((byte[]) obj);
+                    if (dataObj instanceof CloudControllerContext) {
+                        CloudControllerContext serializedObj = 
(CloudControllerContext) dataObj;
+
+                        copyMap(clusterIdToMemberContextListMap, 
serializedObj.clusterIdToMemberContextListMap);
+                        copyMap(memberIdToMemberContextMap, 
serializedObj.memberIdToMemberContextMap);
+                        copyMap(memberIdToScheduledTaskMap, 
serializedObj.memberIdToScheduledTaskMap);
+                        copyMap(kubClusterIdToKubClusterContextMap, 
serializedObj.kubClusterIdToKubClusterContextMap);
+                        copyMap(clusterIdToContextMap, 
serializedObj.clusterIdToContextMap);
+                        copyMap(cartridgeTypeToPartitionIdsMap, 
serializedObj.cartridgeTypeToPartitionIdsMap);
+
+                        copyList(cartridges, serializedObj.getCartridges());
+                        copyList(serviceGroups, 
serializedObj.getServiceGroups());
+
+                        if (log.isDebugEnabled()) {
+                            log.debug("Cloud controller context is read from 
the registry");
+                        }
+                    } else {
+                        if (log.isDebugEnabled()) {
+                            log.debug("Cloud controller context could not be 
found in the registry");
+                        }
+                    }
+                }
+            } catch (Exception e) {
+                String msg = "Unable to read cloud controller context from the 
registry. " +
+                        "Hence, any historical data will not be reflected";
+                log.warn(msg, e);
+            }
+        }
+    }
+
+    private void copyMap(Map sourceMap, Map destinationMap) {
+        for(Object key : sourceMap.keySet()) {
+            distributedObjectHandler.putToMap(destinationMap, key, 
sourceMap.get(key));
+        }
+    }
+
+    private void copyList(List sourceList, List destinationList) {
+        for(Object item : sourceList) {
+            distributedObjectHandler.addToList(destinationList, item);
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/403d5a45/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/deployers/CloudControllerDeployer.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/deployers/CloudControllerDeployer.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/deployers/CloudControllerDeployer.java
index 6c299bb..8af29ad 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/deployers/CloudControllerDeployer.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/deployers/CloudControllerDeployer.java
@@ -85,18 +85,12 @@ public class CloudControllerDeployer extends 
AbstractDeployer {
     }
 
     public void undeploy(String file) throws DeploymentException {
-
         if (file.contains(FILE_NAME)) {
-            // reset
-            CloudControllerContext context = 
CloudControllerContext.getInstance();
-            context.setSerializationDir("");
-
             // grab the entry from Map
             if (fileToIaasProviderListMap.containsKey(file)) {
                 // remove 'em
                 
CloudControllerConfig.getInstance().getIaasProviders().removeAll(fileToIaasProviderListMap.get(file));
-
-                log.info("Successfully undeployed the cloud-controller XML 
file specified at " +
+                log.info("Successfully un-deployed the cloud-controller XML 
file specified at " +
                          file);
             }
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/403d5a45/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
index 9d8a7c5..7337813 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
@@ -23,7 +23,6 @@ package org.apache.stratos.cloud.controller.internal;
 
 import com.hazelcast.core.HazelcastInstance;
 
-import org.apache.axis2.clustering.ClusteringAgent;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.cloud.controller.context.CloudControllerContext;
@@ -74,7 +73,7 @@ public class CloudControllerServiceComponent {
             tApplicationTopicReceiver.start();
 
             if (log.isInfoEnabled()) {
-                log.info("Application Receiver thread started");
+                log.info("Application event receiver thread started");
             }
 
             clusterStatusTopicReceiver = new ClusterStatusTopicReceiver();
@@ -82,7 +81,7 @@ public class CloudControllerServiceComponent {
             tClusterStatusTopicReceiver.start();
 
             if (log.isInfoEnabled()) {
-                log.info("Cluster status Receiver thread started");
+                log.info("Cluster status receiver thread started");
             }
 
             instanceStatusTopicReceiver = new InstanceStatusTopicReceiver();
@@ -100,13 +99,16 @@ public class CloudControllerServiceComponent {
             if(log.isInfoEnabled()) {
                 log.info("Scheduling tasks");
             }
-            
-                       TopologySynchronizerTaskScheduler
-                                               
.schedule(ServiceReferenceHolder.getInstance()
-                                                               
.getTaskService());
-                       
+
+            if ((!CloudControllerContext.getInstance().isClustered()) ||
+                    (CloudControllerContext.getInstance().isCoordinator())) {
+                
TopologySynchronizerTaskScheduler.schedule(ServiceReferenceHolder.getInstance().getTaskService());
+                if(log.isInfoEnabled()) {
+                    log.info("Topology synchronizer task scheduled");
+                }
+            }
         } catch (Throwable e) {
-            log.error("******* Cloud Controller Service bundle is failed to 
activate ****", e);
+            log.error("**** Cloud controller service bundle is failed to 
activate ****", e);
         }
     }
     

http://git-wip-us.apache.org/repos/asf/stratos/blob/403d5a45/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
index 739469b..a195708 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
@@ -222,18 +222,15 @@ public class TopologyBuilder {
      */
     private static void persist(CloudControllerContext context) {
         try {
-            RegistryManager.getInstance().persist(
-                    context);
+            context.persist();
         } catch (RegistryException e) {
-
-            String msg = "Failed to persist the Cloud Controller data in 
registry. Further, transaction roll back also failed.";
-            log.fatal(msg);
+            String msg = "Failed to persist the cloud controller context in 
registry.";
+            log.error(msg);
             throw new CloudControllerException(msg, e);
         }
     }
 
     public static void handleClusterReset(ClusterStatusClusterResetEvent 
event) {
-
         TopologyManager.acquireWriteLock();
 
         try {

http://git-wip-us.apache.org/repos/asf/stratos/blob/403d5a45/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/registry/RegistryManager.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/registry/RegistryManager.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/registry/RegistryManager.java
index cd05fdd..3b589ff 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/registry/RegistryManager.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/registry/RegistryManager.java
@@ -35,37 +35,30 @@ import 
org.wso2.carbon.registry.core.exceptions.RegistryException;
 import org.wso2.carbon.registry.core.exceptions.ResourceNotFoundException;
 import org.wso2.carbon.utils.multitenancy.MultitenantConstants;
 
+import java.io.Serializable;
+
 /**
- *
+ * Registry manager provides functionality for persisting resources in the 
registry and reading them back.
  */
 public class RegistryManager {
     private final static Log log = LogFactory.getLog(RegistryManager.class);
-    private static Registry registryService;
 
     private static class Holder {
-        static final RegistryManager INSTANCE = new RegistryManager();
+        static final RegistryManager instance = new RegistryManager();
     }
 
     public static RegistryManager getInstance() {
-        registryService = ServiceReferenceHolder.getInstance().getRegistry();
-        if (registryService == null) {
-            log.warn("Registry Service is null. Hence unable to fetch data 
from registry.");
-            return null;
-        }
-
-        return Holder.INSTANCE;
+        return Holder.instance;
     }
     
     private RegistryManager() {
         try {
-
-            if 
(!registryService.resourceExists(CloudControllerConstants.CLOUD_CONTROLLER_RESOURCE))
 {
-                
registryService.put(CloudControllerConstants.CLOUD_CONTROLLER_RESOURCE,
-                        registryService.newCollection());
+            Registry registry = 
ServiceReferenceHolder.getInstance().getRegistry();
+            if ((registry != null) && 
(!registry.resourceExists(CloudControllerConstants.CLOUD_CONTROLLER_RESOURCE))) 
{
+                
registry.put(CloudControllerConstants.CLOUD_CONTROLLER_RESOURCE, 
registry.newCollection());
             }
         } catch (RegistryException e) {
-            String msg =
-                    "Failed to create the registry resource " +
+            String msg = "Failed to create the registry resource " +
                             CloudControllerConstants.CLOUD_CONTROLLER_RESOURCE;
             log.error(msg, e);
             throw new CloudControllerException(msg, e);
@@ -73,108 +66,62 @@ public class RegistryManager {
     }
 
     /**
-     * Persist an object in the local registry.
+     * Persist an object in the registry.
      *
-     * @param dataObj object to be persisted.
+     * @param serializableObject object to be persisted.
      */
-    public synchronized void persist(CloudControllerContext dataObj) throws 
RegistryException {
-        try {
-               
-               PrivilegedCarbonContext ctx = 
PrivilegedCarbonContext.getThreadLocalCarbonContext();
-               ctx.setTenantId(MultitenantConstants.SUPER_TENANT_ID);
-               
ctx.setTenantDomain(MultitenantConstants.SUPER_TENANT_DOMAIN_NAME);
-               
-            registryService.beginTransaction();
-
-            Resource nodeResource = registryService.newResource();
-
-            nodeResource.setContent(Serializer.serializeToByteArray(dataObj));
-
-            
registryService.put(CloudControllerConstants.CLOUD_CONTROLLER_RESOURCE + 
CloudControllerConstants.DATA_RESOURCE, nodeResource);
-
-            registryService.commitTransaction();
-
-        } catch (Exception e) {
-            String msg = "Failed to persist the cloud controller data in 
registry.";
-            registryService.rollbackTransaction();
-            log.error(msg, e);
-            throw new CloudControllerException(msg, e);
-
+    public synchronized void persist(String resourcePath, Serializable 
serializableObject) throws RegistryException {
+        String absoluteResourcePath = 
CloudControllerConstants.CLOUD_CONTROLLER_RESOURCE + resourcePath;
+        if(log.isDebugEnabled()) {
+            log.debug(String.format("Persisting resource in registry: 
[resource-path] %s", absoluteResourcePath));
         }
-    }
+        Registry registry = ServiceReferenceHolder.getInstance().getRegistry();
 
-    public synchronized void persistTopology(Topology topology) throws 
RegistryException {
         try {
-               
                PrivilegedCarbonContext ctx = 
PrivilegedCarbonContext.getThreadLocalCarbonContext();
                ctx.setTenantId(MultitenantConstants.SUPER_TENANT_ID);
                
ctx.setTenantDomain(MultitenantConstants.SUPER_TENANT_DOMAIN_NAME);
-               
-            registryService.beginTransaction();
-
-            Resource nodeResource = registryService.newResource();
 
-            nodeResource.setContent(Serializer.serializeToByteArray(topology));
+            registry.beginTransaction();
 
-            
registryService.put(CloudControllerConstants.CLOUD_CONTROLLER_RESOURCE + 
CloudControllerConstants.TOPOLOGY_RESOURCE, nodeResource);
+            Resource nodeResource = registry.newResource();
+            
nodeResource.setContent(Serializer.serializeToByteArray(serializableObject));
+            registry.put(absoluteResourcePath, nodeResource);
 
-            registryService.commitTransaction();
+            registry.commitTransaction();
 
+            if(log.isDebugEnabled()) {
+                log.debug(String.format("Resource persisted successfully in 
registry: [resource-path] %s",
+                        absoluteResourcePath));
+            }
         } catch (Exception e) {
-            String msg = "Failed to persist the cloud controller data in 
registry.";
-            registryService.rollbackTransaction();
+            String msg = "Failed to persist resource in registry: " + 
absoluteResourcePath;
+            registry.rollbackTransaction();
             log.error(msg, e);
             throw new CloudControllerException(msg, e);
-
         }
     }
 
-
-    public synchronized Object retrieve() {
-
+    public synchronized Object read(String resourcePath) {
         try {
-               PrivilegedCarbonContext ctx = 
PrivilegedCarbonContext.getThreadLocalCarbonContext();
+            Registry registry = 
ServiceReferenceHolder.getInstance().getRegistry();
+            if(registry == null) {
+                return null;
+            }
+
+            PrivilegedCarbonContext ctx = 
PrivilegedCarbonContext.getThreadLocalCarbonContext();
                ctx.setTenantId(MultitenantConstants.SUPER_TENANT_ID);
                
ctx.setTenantDomain(MultitenantConstants.SUPER_TENANT_DOMAIN_NAME);
-            Resource resource = registryService.get(
-                       CloudControllerConstants.CLOUD_CONTROLLER_RESOURCE + 
CloudControllerConstants.DATA_RESOURCE);
-
+            Resource resource = 
registry.get(CloudControllerConstants.CLOUD_CONTROLLER_RESOURCE + resourcePath);
             return resource.getContent();
-
         } catch (ResourceNotFoundException ignore) {
             // this means, we've never persisted CC info in registry
             return null;
         } catch (RegistryException e) {
-            String msg = "Failed to retrieve cloud controller data from 
registry.";
+            String msg = "Failed to read resource from registry: " +
+                    CloudControllerConstants.CLOUD_CONTROLLER_RESOURCE + 
resourcePath;
             log.error(msg, e);
             throw new CloudControllerException(msg, e);
         }
-
     }
-
-    public synchronized Object retrieveTopology() {
-
-        try {
-                       PrivilegedCarbonContext ctx = PrivilegedCarbonContext
-                                       .getThreadLocalCarbonContext();
-                       ctx.setTenantId(MultitenantConstants.SUPER_TENANT_ID);
-                       
ctx.setTenantDomain(MultitenantConstants.SUPER_TENANT_DOMAIN_NAME);
-
-                       Resource resource = registryService
-                                       
.get(CloudControllerConstants.CLOUD_CONTROLLER_RESOURCE
-                                                       + 
CloudControllerConstants.TOPOLOGY_RESOURCE);
-
-            return resource.getContent();
-
-        } catch (ResourceNotFoundException ignore) {
-            // this means, we've never persisted CC info in registry
-            return null;
-        } catch (RegistryException e) {
-            String msg = "Failed to retrieve cloud controller data from 
registry.";
-            log.error(msg, e);
-            throw new CloudControllerException(msg, e);
-        } 
-
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/403d5a45/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/registry/Serializer.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/registry/Serializer.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/registry/Serializer.java
index bf0122a..1759902 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/registry/Serializer.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/registry/Serializer.java
@@ -64,17 +64,17 @@ public class Serializer {
     
     /**
      * Serialize a {@link 
org.apache.stratos.cloud.controller.context.CloudControllerContext} to a byte 
array.
-     * @param serializableObj
+     * @param serializableObject
      * @return byte[] 
      * @throws IOException
      */
-    public static byte[] serializeToByteArray(CloudControllerContext 
serializableObj) throws IOException {
+    public static byte[]  serializeToByteArray(Serializable 
serializableObject) throws IOException {
 
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutput out = null;
        try {
          out = new ObjectOutputStream(bos);   
-         out.writeObject(serializableObj);
+         out.writeObject(serializableObject);
          
          return bos.toByteArray();
          

http://git-wip-us.apache.org/repos/asf/stratos/blob/403d5a45/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
index d1ef730..ae3a7ae 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
@@ -38,9 +38,7 @@ import 
org.apache.stratos.cloud.controller.functions.ContainerClusterContextToKu
 import 
org.apache.stratos.cloud.controller.functions.ContainerClusterContextToReplicationController;
 import org.apache.stratos.cloud.controller.functions.PodToMemberContext;
 import org.apache.stratos.cloud.controller.iaas.Iaas;
-import org.apache.stratos.cloud.controller.registry.Deserializer;
 import 
org.apache.stratos.cloud.controller.messaging.publisher.CartridgeInstanceDataPublisher;
-import org.apache.stratos.cloud.controller.registry.RegistryManager;
 import org.apache.stratos.cloud.controller.services.CloudControllerService;
 import org.apache.stratos.cloud.controller.messaging.topology.TopologyBuilder;
 import 
org.apache.stratos.cloud.controller.messaging.topology.TopologyEventPublisher;
@@ -82,51 +80,10 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
        private static final Log LOG = 
LogFactory.getLog(CloudControllerServiceImpl.class);
        public static final String IS_LOAD_BALANCER = "load.balancer";
 
-    private CloudControllerContext dataHolder = CloudControllerContext
+    private CloudControllerContext cloudControllerContext = 
CloudControllerContext
             .getInstance();
 
     public CloudControllerServiceImpl() {
-        // acquire serialized data from registry
-        acquireData();
-    }
-
-    private void acquireData() {
-
-        Object obj = RegistryManager.getInstance().retrieve();
-        if (obj != null) {
-            try {
-                Object dataObj = Deserializer
-                        .deserializeFromByteArray((byte[]) obj);
-                if (dataObj instanceof CloudControllerContext) {
-                    CloudControllerContext serializedObj = 
(CloudControllerContext) dataObj;
-                    CloudControllerContext currentData = CloudControllerContext
-                            .getInstance();
-
-                    // assign necessary data
-                    
currentData.setClusterIdToContext(serializedObj.getClusterIdToContext());
-                    
currentData.setMemberIdToContext(serializedObj.getMemberIdToContext());
-                    
currentData.setClusterIdToMemberContext(serializedObj.getClusterIdToMemberContext());
-                    currentData.setCartridges(serializedObj.getCartridges());
-                    
currentData.setKubClusterIdToKubClusterContext(serializedObj.getKubClusterIdToKubClusterContext());
-                    
currentData.setServiceGroups(serializedObj.getServiceGroups());
-
-                    if (LOG.isDebugEnabled()) {
-
-                        LOG.debug("Cloud Controller Data is retrieved from 
registry.");
-                    }
-                } else {
-                    if (LOG.isDebugEnabled()) {
-
-                        LOG.debug("Cloud Controller Data cannot be found in 
registry.");
-                    }
-                }
-            } catch (Exception e) {
-
-                String msg = "Unable to acquire data from Registry. Hence, any 
historical data will not get reflected.";
-                LOG.warn(msg, e);
-            }
-
-        }
     }
 
     public void deployCartridgeDefinition(CartridgeConfig cartridgeConfig) 
throws InvalidCartridgeDefinitionException,
@@ -177,8 +134,8 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
 
         // TODO transaction begins
         String cartridgeType = cartridge.getType();
-        if (dataHolder.getCartridge(cartridgeType) != null) {
-            Cartridge cartridgeToBeRemoved = 
dataHolder.getCartridge(cartridgeType);
+        if (cloudControllerContext.getCartridge(cartridgeType) != null) {
+            Cartridge cartridgeToBeRemoved = 
cloudControllerContext.getCartridge(cartridgeType);
             // undeploy
             try {
                 undeployCartridgeDefinition(cartridgeToBeRemoved.getType());
@@ -188,7 +145,7 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
             populateNewCartridge(cartridge, cartridgeToBeRemoved);
         }
 
-        dataHolder.addCartridge(cartridge);
+        cloudControllerContext.addCartridge(cartridge);
 
         // persist
         persist();
@@ -228,10 +185,10 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
     public void undeployCartridgeDefinition(String cartridgeType) throws 
InvalidCartridgeTypeException {
 
         Cartridge cartridge = null;
-        if ((cartridge = dataHolder.getCartridge(cartridgeType)) != null) {
-            if (dataHolder.getCartridges().remove(cartridge)) {
+        if ((cartridge = cloudControllerContext.getCartridge(cartridgeType)) 
!= null) {
+            if (cloudControllerContext.getCartridges().remove(cartridge)) {
                 // invalidate partition validation cache
-                
dataHolder.removeFromCartridgeTypeToPartitionIds(cartridgeType);
+                
cloudControllerContext.removeFromCartridgeTypeToPartitionIds(cartridgeType);
 
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Partition cache invalidated for cartridge " + 
cartridgeType);
@@ -301,7 +258,7 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
             }
         }
 
-        dataHolder.addServiceGroup(servicegroup);
+        cloudControllerContext.addServiceGroup(servicegroup);
 
         this.persist();
 
@@ -314,10 +271,10 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
 
         ServiceGroup serviceGroup = null;
 
-        serviceGroup = dataHolder.getServiceGroup(name);
+        serviceGroup = cloudControllerContext.getServiceGroup(name);
 
         if (serviceGroup != null) {
-            if (dataHolder.getServiceGroups().remove(serviceGroup)) {
+            if 
(cloudControllerContext.getServiceGroups().remove(serviceGroup)) {
                 persist();
                 if (LOG.isInfoEnabled()) {
                     LOG.info("Successfully undeployed the Service Group 
definition: " + serviceGroup);
@@ -339,7 +296,7 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
             LOG.debug("getServiceGroupDefinition:" + name);
         }
 
-        ServiceGroup serviceGroup = this.dataHolder.getServiceGroup(name);
+        ServiceGroup serviceGroup = 
this.cloudControllerContext.getServiceGroup(name);
 
         if (serviceGroup == null) {
             if (LOG.isDebugEnabled()) {
@@ -405,13 +362,13 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
                 memberContext);
 
         String partitionId = partition.getId();
-        ClusterContext ctxt = dataHolder.getClusterContext(clusterId);
+        ClusterContext ctxt = 
cloudControllerContext.getClusterContext(clusterId);
 
         handleNullObject(ctxt, "Instance start-up failed. Invalid cluster id. 
" + memberContext);
 
         String cartridgeType = ctxt.getCartridgeType();
 
-        Cartridge cartridge = dataHolder.getCartridge(cartridgeType);
+        Cartridge cartridge = 
cloudControllerContext.getCartridge(cartridgeType);
 
         if (cartridge == null) {
             String msg =
@@ -591,11 +548,9 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
      */
     private void persist() {
         try {
-            RegistryManager.getInstance().persist(
-                    dataHolder);
+            cloudControllerContext.persist();
         } catch (RegistryException e) {
-
-            String msg = "Failed to persist the Cloud Controller data in 
registry. Further, transaction roll back also failed.";
+            String msg = "Failed to persist the cloud controller context in 
registry.";
             LOG.fatal(msg);
             throw new CloudControllerException(msg, e);
         }
@@ -611,7 +566,7 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
 
         handleNullObject(memberId, "Termination failed. Null member id.");
 
-        MemberContext ctxt = dataHolder.getMemberContextOfMemberId(memberId);
+        MemberContext ctxt = 
cloudControllerContext.getMemberContextOfMemberId(memberId);
 
         if (ctxt == null) {
             String msg = "Termination failed. Invalid Member Id: " + memberId;
@@ -767,7 +722,7 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
 
             try {
                 // these will never be null, since we do not add null values 
for these.
-                Cartridge cartridge = dataHolder.getCartridge(cartridgeType);
+                Cartridge cartridge = 
cloudControllerContext.getCartridge(cartridgeType);
 
                 LOG.info("Starting to terminate an instance with member id : " 
+ memberId +
                         " in partition id: " + partitionId + " of cluster id: 
" + clusterId +
@@ -832,7 +787,7 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
 
             String clusterId = memberContext.getClusterId();
             Partition partition = memberContext.getPartition();
-            ClusterContext ctxt = dataHolder.getClusterContext(clusterId);
+            ClusterContext ctxt = 
cloudControllerContext.getClusterContext(clusterId);
             Iaas iaas = iaasProvider.getIaas();
             String publicIp = null;
 
@@ -1015,7 +970,7 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
                     }
                 }
 
-                dataHolder.addMemberContext(memberContext);
+                cloudControllerContext.addMemberContext(memberContext);
 
                 // persist in registry
                 persist();
@@ -1067,7 +1022,7 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
 
         handleNullObject(clusterId, "Instance termination failed. Cluster id 
is null.");
 
-        List<MemberContext> ctxts = 
dataHolder.getMemberContextsOfClusterId(clusterId);
+        List<MemberContext> ctxts = 
cloudControllerContext.getMemberContextsOfClusterId(clusterId);
 
         if (ctxts == null) {
             String msg = "Instance termination failed. No members found for 
cluster id: " + clusterId;
@@ -1129,7 +1084,7 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
 
     private void detachVolume(IaasProvider iaasProvider, MemberContext ctxt) {
         String clusterId = ctxt.getClusterId();
-        ClusterContext clusterCtxt = dataHolder.getClusterContext(clusterId);
+        ClusterContext clusterCtxt = 
cloudControllerContext.getClusterContext(clusterId);
         if (clusterCtxt.getVolumes() != null) {
             for (Volume volume : clusterCtxt.getVolumes()) {
                 try {
@@ -1171,7 +1126,7 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
                 null);
 
         // update data holders
-        dataHolder.removeMemberContext(memberContext.getMemberId(), 
memberContext.getClusterId());
+        
cloudControllerContext.removeMemberContext(memberContext.getMemberId(), 
memberContext.getClusterId());
 
         // persist
         persist();
@@ -1195,7 +1150,7 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
         handleNullObject(hostName, "Service registration failed. Hostname is 
null.");
 
         Cartridge cartridge = null;
-        if ((cartridge = dataHolder.getCartridge(cartridgeType)) == null) {
+        if ((cartridge = cloudControllerContext.getCartridge(cartridgeType)) 
== null) {
 
             String msg = "Registration of cluster: " + clusterId +
                     " failed. - Unregistered Cartridge type: " + cartridgeType;
@@ -1212,7 +1167,7 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
         payload, hostName, props, isLb, registrant.getPersistence());
 
 
-        dataHolder.addClusterContext(ctxt);*/
+        cloudControllerContext.addClusterContext(ctxt);*/
         TopologyBuilder.handleClusterCreated(registrant, isLb);
 
         persist();
@@ -1254,7 +1209,7 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
     @Override
     public String[] getRegisteredCartridges() {
         // get the list of cartridges registered
-        List<Cartridge> cartridges = dataHolder
+        List<Cartridge> cartridges = cloudControllerContext
                 .getCartridges();
 
         if (cartridges == null) {
@@ -1282,7 +1237,7 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
     @Override
     public CartridgeInfo getCartridgeInfo(String cartridgeType)
             throws UnregisteredCartridgeException {
-        Cartridge cartridge = dataHolder
+        Cartridge cartridge = cloudControllerContext
                 .getCartridge(cartridgeType);
 
         if (cartridge != null) {
@@ -1301,13 +1256,13 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
     public void unregisterService(String clusterId) throws 
UnregisteredClusterException {
         final String clusterId_ = clusterId;
 
-        ClusterContext ctxt = dataHolder.getClusterContext(clusterId_);
+        ClusterContext ctxt = 
cloudControllerContext.getClusterContext(clusterId_);
 
         handleNullObject(ctxt, "Service unregistration failed. Invalid cluster 
id: " + clusterId);
 
         String cartridgeType = ctxt.getCartridgeType();
 
-        Cartridge cartridge = dataHolder.getCartridge(cartridgeType);
+        Cartridge cartridge = 
cloudControllerContext.getCartridge(cartridgeType);
 
         if (cartridge == null) {
             String msg =
@@ -1322,12 +1277,12 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
 
         } else {
 
-//             
TopologyBuilder.handleClusterMaintenanceMode(dataHolder.getClusterContext(clusterId_));
+//             
TopologyBuilder.handleClusterMaintenanceMode(cloudControllerContext.getClusterContext(clusterId_));
 
             Runnable terminateInTimeout = new Runnable() {
                 @Override
                 public void run() {
-                    ClusterContext ctxt = 
dataHolder.getClusterContext(clusterId_);
+                    ClusterContext ctxt = 
cloudControllerContext.getClusterContext(clusterId_);
                     if (ctxt == null) {
                         String msg = "Service unregistration failed. Cluster 
not found: " + clusterId_;
                         LOG.error(msg);
@@ -1366,7 +1321,7 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
             };
             Runnable unregister = new Runnable() {
                 public void run() {
-                    ClusterContext ctxt = 
dataHolder.getClusterContext(clusterId_);
+                    ClusterContext ctxt = 
cloudControllerContext.getClusterContext(clusterId_);
                     if (ctxt == null) {
                         String msg = "Service unregistration failed. Cluster 
not found: " + clusterId_;
                         LOG.error(msg);
@@ -1389,12 +1344,12 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
 
                 private void deleteVolumes(ClusterContext ctxt) {
                     if (ctxt.isVolumeRequired()) {
-                        Cartridge cartridge = 
dataHolder.getCartridge(ctxt.getCartridgeType());
+                        Cartridge cartridge = 
cloudControllerContext.getCartridge(ctxt.getCartridgeType());
                         if (cartridge != null && cartridge.getIaases() != null 
&& ctxt.getVolumes() != null) {
                             for (Volume volume : ctxt.getVolumes()) {
                                 if (volume.getId() != null) {
                                     String iaasType = volume.getIaasType();
-                                    //Iaas iaas = 
dataHolder.getIaasProvider(iaasType).getIaas();
+                                    //Iaas iaas = 
cloudControllerContext.getIaasProvider(iaasType).getIaas();
                                     Iaas iaas = 
cartridge.getIaasProvider(iaasType).getIaas();
                                     if (iaas != null) {
                                         try {
@@ -1442,17 +1397,13 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
     public boolean validateDeploymentPolicy(String cartridgeType, Partition[] 
partitions)
             throws InvalidPartitionException, InvalidCartridgeTypeException {
 
-        Map<String, List<String>> validatedCache = 
dataHolder.getCartridgeTypeToPartitionIds();
-        List<String> validatedPartitions = new ArrayList<String>();
-
-        if (validatedCache.containsKey(cartridgeType)) {
+        List<String> validatedPartitions = 
CloudControllerContext.getInstance().getPartitionIds(cartridgeType);
+        if (validatedPartitions != null) {
             // cache hit for this cartridge
             // get list of partitions
-            validatedPartitions = validatedCache.get(cartridgeType);
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Partition validation cache hit for cartridge type: 
" + cartridgeType);
             }
-
         }
 
         Map<String, IaasProvider> partitionToIaasProviders =
@@ -1462,7 +1413,7 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
             LOG.debug("Deployment policy validation started for cartridge 
type: " + cartridgeType);
         }
 
-        Cartridge cartridge = dataHolder.getCartridge(cartridgeType);
+        Cartridge cartridge = 
cloudControllerContext.getCartridge(cartridgeType);
 
         if (cartridge == null) {
             String msg = "Invalid Cartridge Type: " + cartridgeType;
@@ -1482,7 +1433,7 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
             Callable<IaasProvider> worker = new PartitionValidatorCallable(
                     partition, cartridge);
             Future<IaasProvider> job = CloudControllerContext.getInstance()
-                    .getExecutor().submit(worker);
+                    .getExecutorService().submit(worker);
             jobList.put(partition.getId(), job);
         }
 
@@ -1498,7 +1449,7 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
                 partitionToIaasProviders.put(partitionId, job.get());
 
                 // add to cache
-                
this.dataHolder.addToCartridgeTypeToPartitionIdMap(cartridgeType, partitionId);
+                
this.cloudControllerContext.addToCartridgeTypeToPartitionIdMap(cartridgeType, 
partitionId);
 
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Partition " + partitionId + " added to the 
cache against cartridge type: " + cartridgeType);
@@ -1522,10 +1473,10 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
     }
 
     private void onClusterRemoval(final String clusterId) {
-        ClusterContext ctxt = dataHolder.getClusterContext(clusterId);
+        ClusterContext ctxt = 
cloudControllerContext.getClusterContext(clusterId);
         TopologyBuilder.handleClusterRemoved(ctxt);
-        dataHolder.removeClusterContext(clusterId);
-        dataHolder.removeMemberContextsOfCluster(clusterId);
+        cloudControllerContext.removeClusterContext(clusterId);
+        cloudControllerContext.removeMemberContextsOfCluster(clusterId);
         persist();
     }
 
@@ -1570,7 +1521,7 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
 
     public ClusterContext getClusterContext(String clusterId) {
 
-        return dataHolder.getClusterContext(clusterId);
+        return cloudControllerContext.getClusterContext(clusterId);
     }
 
     @Override
@@ -1590,12 +1541,12 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
             LOG.debug("Received a container spawn request : " + 
containerClusterContext.toString());
         }
 
-        ClusterContext ctxt = dataHolder.getClusterContext(clusterId);
+        ClusterContext ctxt = 
cloudControllerContext.getClusterContext(clusterId);
         handleNullObject(ctxt, "Container start-up failed. Invalid cluster id. 
" + containerClusterContext.toString());
 
         String cartridgeType = ctxt.getCartridgeType();
 
-        Cartridge cartridge = dataHolder.getCartridge(cartridgeType);
+        Cartridge cartridge = 
cloudControllerContext.getCartridge(cartridgeType);
 
         if (cartridge == null) {
             String msg =
@@ -1647,7 +1598,7 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
             
allocatedServiceHostPortProp.setName(StratosConstants.ALLOCATED_SERVICE_HOST_PORT);
             
allocatedServiceHostPortProp.setValue(String.valueOf(service.getPort()));
             ctxt.getProperties().addProperty(allocatedServiceHostPortProp);
-            dataHolder.addClusterContext(ctxt);
+            cloudControllerContext.addClusterContext(ctxt);
 
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Cloud Controller successfully started the service "
@@ -1700,14 +1651,14 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
                                 .getProperties(), 
StratosConstants.ALLOCATED_SERVICE_HOST_PORT,
                         String.valueOf(service.getPort())));
 
-                dataHolder.addMemberContext(context);
+                cloudControllerContext.addMemberContext(context);
 
                 // wait till Pod status turns to running and send member 
spawned.
                 ScheduledThreadExecutor exec = 
ScheduledThreadExecutor.getInstance();
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Cloud Controller is starting the instance start 
up thread.");
                 }
-                dataHolder.addScheduledFutureJob(context.getMemberId(), 
exec.schedule(new PodActivationWatcher(pod.getId(), context, kubApi), 5000));
+                
cloudControllerContext.addScheduledFutureJob(context.getMemberId(), 
exec.schedule(new PodActivationWatcher(pod.getId(), context, kubApi), 5000));
 
                 memberContexts.add(context);
             }
@@ -1745,18 +1696,18 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
             String kubernetesClusterId, String kubernetesMasterIp,
             String kubernetesPortRange) {
 
-        KubernetesClusterContext origCtxt = 
dataHolder.getKubernetesClusterContext(kubernetesClusterId);
+        KubernetesClusterContext origCtxt = 
cloudControllerContext.getKubernetesClusterContext(kubernetesClusterId);
         KubernetesClusterContext newCtxt = new 
KubernetesClusterContext(kubernetesClusterId, kubernetesPortRange, 
kubernetesMasterIp);
 
         if (origCtxt == null) {
-            dataHolder.addKubernetesClusterContext(newCtxt);
+            cloudControllerContext.addKubernetesClusterContext(newCtxt);
             return newCtxt;
         }
 
         if (!origCtxt.equals(newCtxt)) {
             // if for some reason master IP etc. have changed
             newCtxt.setAvailableHostPorts(origCtxt.getAvailableHostPorts());
-            dataHolder.addKubernetesClusterContext(newCtxt);
+            cloudControllerContext.addKubernetesClusterContext(newCtxt);
             return newCtxt;
         } else {
             return origCtxt;
@@ -1767,7 +1718,7 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
     public MemberContext[] terminateAllContainers(String clusterId)
             throws InvalidClusterException {
 
-        ClusterContext ctxt = dataHolder.getClusterContext(clusterId);
+        ClusterContext ctxt = 
cloudControllerContext.getClusterContext(clusterId);
         handleNullObject(ctxt, "Kubernetes units temrination failed. Invalid 
cluster id. " + clusterId);
 
         String kubernetesClusterId = 
CloudControllerUtil.getProperty(ctxt.getProperties(),
@@ -1775,7 +1726,7 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
         handleNullObject(kubernetesClusterId, "Kubernetes units termination 
failed. Cannot find '" +
                 StratosConstants.KUBERNETES_CLUSTER_ID + "'. " + ctxt);
 
-        KubernetesClusterContext kubClusterContext = 
dataHolder.getKubernetesClusterContext(kubernetesClusterId);
+        KubernetesClusterContext kubClusterContext = 
cloudControllerContext.getKubernetesClusterContext(kubernetesClusterId);
         handleNullObject(kubClusterContext, "Kubernetes units termination 
failed. Cannot find a matching Kubernetes Cluster for cluster id: "
                 + kubernetesClusterId);
 
@@ -1838,7 +1789,7 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
                     + StratosConstants.ALLOCATED_SERVICE_HOST_PORT);
         }
 
-        List<MemberContext> membersToBeRemoved = 
dataHolder.getMemberContextsOfClusterId(clusterId);
+        List<MemberContext> membersToBeRemoved = 
cloudControllerContext.getMemberContextsOfClusterId(clusterId);
 
         for (MemberContext memberContext : membersToBeRemoved) {
             logTermination(memberContext);
@@ -1858,12 +1809,12 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
             LOG.debug("CloudControllerServiceImpl:updateContainers for cluster 
: " + clusterId);
         }
 
-        ClusterContext ctxt = dataHolder.getClusterContext(clusterId);
+        ClusterContext ctxt = 
cloudControllerContext.getClusterContext(clusterId);
         handleNullObject(ctxt, "Container update failed. Invalid cluster id. " 
+ clusterId);
 
         String cartridgeType = ctxt.getCartridgeType();
 
-        Cartridge cartridge = dataHolder.getCartridge(cartridgeType);
+        Cartridge cartridge = 
cloudControllerContext.getCartridge(cartridgeType);
 
         if (cartridge == null) {
             String msg =
@@ -1876,7 +1827,7 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
         try {
             String kubernetesClusterId = 
validateProperty(StratosConstants.KUBERNETES_CLUSTER_ID, ctxt);
 
-            KubernetesClusterContext kubClusterContext = 
dataHolder.getKubernetesClusterContext(kubernetesClusterId);
+            KubernetesClusterContext kubClusterContext = 
cloudControllerContext.getKubernetesClusterContext(kubernetesClusterId);
 
             if (kubClusterContext == null) {
                 String msg =
@@ -1936,7 +1887,7 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
             for (Pod pod : allPods) {
                 MemberContext context;
                 // if member context does not exist -> a new member (scale up)
-                if ((context = 
dataHolder.getMemberContextOfMemberId(pod.getId())) == null) {
+                if ((context = 
cloudControllerContext.getMemberContextOfMemberId(pod.getId())) == null) {
 
                     context = podToMemberContextFunc.apply(pod);
                     context.setCartridgeType(cartridgeType);
@@ -1952,7 +1903,7 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("Cloud Controller is starting the instance 
start up thread.");
                     }
-                    dataHolder.addScheduledFutureJob(context.getMemberId(), 
exec.schedule(new PodActivationWatcher(pod.getId(), context, kubApi), 5000));
+                    
cloudControllerContext.addScheduledFutureJob(context.getMemberId(), 
exec.schedule(new PodActivationWatcher(pod.getId(), context, kubApi), 5000));
 
                     memberContexts.add(context);
 
@@ -1969,7 +1920,7 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
                 List<Pod> difference = 
ListUtils.subtract(Arrays.asList(previousStatePods), Arrays.asList(allPods));
                 for (Pod pod : difference) {
                     if (pod != null) {
-                        MemberContext context = 
dataHolder.getMemberContextOfMemberId(pod.getId());
+                        MemberContext context = 
cloudControllerContext.getMemberContextOfMemberId(pod.getId());
                         logTermination(context);
                         memberContexts.add(context);
                     }
@@ -2001,7 +1952,7 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
 
         handleNullObject(memberId, "Failed to terminate member. Invalid Member 
id. [Member id] " + memberId);
 
-        MemberContext memberContext = 
dataHolder.getMemberContextOfMemberId(memberId);
+        MemberContext memberContext = 
cloudControllerContext.getMemberContextOfMemberId(memberId);
 
         handleNullObject(memberContext, "Failed to terminate member. Member id 
not found. [Member id] " + memberId);
 
@@ -2009,7 +1960,7 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
 
         handleNullObject(clusterId, "Failed to terminate member. Cluster id is 
null. [Member id] " + memberId);
 
-        ClusterContext ctxt = dataHolder.getClusterContext(clusterId);
+        ClusterContext ctxt = 
cloudControllerContext.getClusterContext(clusterId);
 
         handleNullObject(ctxt,
                 String.format("Failed to terminate member [Member id] %s. 
Invalid cluster id %s ", memberId, clusterId));
@@ -2020,7 +1971,7 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
         handleNullObject(kubernetesClusterId, String.format("Failed to 
terminate member [Member id] %s. Cannot find '" +
                 StratosConstants.KUBERNETES_CLUSTER_ID + "' in [cluster 
context] %s ", memberId, ctxt));
 
-        KubernetesClusterContext kubClusterContext = 
dataHolder.getKubernetesClusterContext(kubernetesClusterId);
+        KubernetesClusterContext kubClusterContext = 
cloudControllerContext.getKubernetesClusterContext(kubernetesClusterId);
 
         handleNullObject(kubClusterContext, String.format("Failed to terminate 
member [Member id] %s. Cannot find a matching Kubernetes Cluster in [cluster 
context] %s ", memberId, ctxt));
 
@@ -2030,7 +1981,7 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
             // member id = pod id
             kubApi.deletePod(memberId);
 
-            MemberContext memberToBeRemoved = 
dataHolder.getMemberContextOfMemberId(memberId);
+            MemberContext memberToBeRemoved = 
cloudControllerContext.getMemberContextOfMemberId(memberId);
 
             logTermination(memberToBeRemoved);
 
@@ -2064,7 +2015,7 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
 
 
         for (ApplicationClusterContext appClusterCtxt : appClustersContexts) {
-            dataHolder.addClusterContext(new 
ClusterContext(appClusterCtxt.getClusterId(),
+            cloudControllerContext.addClusterContext(new 
ClusterContext(appClusterCtxt.getClusterId(),
                     appClusterCtxt.getCartridgeType(), 
appClusterCtxt.getTextPayload(),
                     appClusterCtxt.getHostName(), 
appClusterCtxt.isLbCluster(), appClusterCtxt.getProperties()));
             // create Cluster objects
@@ -2074,7 +2025,7 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
             newCluster.setTenantRange(appClusterCtxt.getTenantRange());
             //newCluster.setStatus(ClusterStatus.Created, null);
             
newCluster.setHostNames(Arrays.asList(appClusterCtxt.getHostName()));
-            Cartridge cartridge = 
dataHolder.getCartridge(appClusterCtxt.getCartridgeType());
+            Cartridge cartridge = 
cloudControllerContext.getCartridge(appClusterCtxt.getCartridgeType());
             if(cartridge.getDeployerType() != null &&
                     
cartridge.getDeployerType().equals(StratosConstants.KUBERNETES_DEPLOYER_TYPE)) {
                 newCluster.setKubernetesCluster(true);
@@ -2107,7 +2058,7 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
 //
 //        // Create a Cluster Context obj. for each of the Clusters in the 
Application
 //        for (ApplicationClusterContext applicationClusterContext : 
applicationParser.getApplicationClusterContexts()) {
-//            dataHolder.addClusterContext(new 
ClusterContext(applicationClusterContext.getClusterId(),
+//            cloudControllerContext.addClusterContext(new 
ClusterContext(applicationClusterContext.getClusterId(),
 //                    applicationClusterContext.getCartridgeType(), 
applicationClusterContext.getTextPayload(),
 //                    applicationClusterContext.getHostName(), 
applicationClusterContext.isLbCluster()));
 //        }
@@ -2129,7 +2080,7 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
 //
 //        // Create a Cluster Context obj. for each of the Clusters in the 
Application
 //        for (ApplicationClusterContext applicationClusterContext : 
applicationParser.getApplicationClusterContexts()) {
-//            dataHolder.addClusterContext(new 
ClusterContext(applicationClusterContext.getClusterId(),
+//            cloudControllerContext.addClusterContext(new 
ClusterContext(applicationClusterContext.getClusterId(),
 //                    applicationClusterContext.getCartridgeType(), 
applicationClusterContext.getTextPayload(),
 //                    applicationClusterContext.getHostName(), 
applicationClusterContext.isLbCluster()));
 //        }

http://git-wip-us.apache.org/repos/asf/stratos/blob/403d5a45/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerUtil.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerUtil.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerUtil.java
index 8b88301..40b2459 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerUtil.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerUtil.java
@@ -343,16 +343,15 @@ public class CloudControllerUtil {
     
     public static void persistTopology(Topology topology) {
       try {
-          RegistryManager.getInstance().persistTopology(topology);
+          
RegistryManager.getInstance().persist(CloudControllerConstants.TOPOLOGY_RESOURCE,
 topology);
       } catch (RegistryException e) {
-
           String msg = "Failed to persist the Topology in registry. ";
           log.fatal(msg, e);
       }
     }
     
     public static Topology retrieveTopology() {        
-          Object obj = RegistryManager.getInstance().retrieveTopology();
+          Object obj = 
RegistryManager.getInstance().read(CloudControllerConstants.TOPOLOGY_RESOURCE);
           if (obj != null) {
               try {
                   Object dataObj = Deserializer
@@ -367,7 +366,6 @@ public class CloudControllerUtil {
                 log.warn(msg, e);
             }
           }
-          
           return null;
     }
 

Reply via email to