Cloud controller clustering support initial implementation
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/403d5a45 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/403d5a45 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/403d5a45 Branch: refs/heads/master Commit: 403d5a45dab5ba2bacab51ecf417313972c39c1b Parents: 830785f Author: Imesh Gunaratne <[email protected]> Authored: Sun Nov 30 23:25:16 2014 +0530 Committer: Imesh Gunaratne <[email protected]> Committed: Sun Nov 30 23:25:16 2014 +0530 ---------------------------------------------------------------------- .../clustering/DistributedObjectHandler.java | 140 ++++ .../context/CloudControllerContext.java | 638 ++++++++++--------- .../deployers/CloudControllerDeployer.java | 8 +- .../CloudControllerServiceComponent.java | 20 +- .../messaging/topology/TopologyBuilder.java | 9 +- .../controller/registry/RegistryManager.java | 127 ++-- .../cloud/controller/registry/Serializer.java | 6 +- .../impl/CloudControllerServiceImpl.java | 183 ++---- .../controller/util/CloudControllerUtil.java | 6 +- .../controller/util/PodActivationWatcher.java | 7 +- .../axiom/CloudControllerContextTest.java | 20 +- 11 files changed, 604 insertions(+), 560 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/403d5a45/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/clustering/DistributedObjectHandler.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/clustering/DistributedObjectHandler.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/clustering/DistributedObjectHandler.java new file mode 100644 index 0000000..2b4437e --- /dev/null +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/clustering/DistributedObjectHandler.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.cloud.controller.clustering; + +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.core.IList; +import com.hazelcast.core.ILock; +import com.hazelcast.core.IMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * An object handler for managing objects in distributed and non-distributed environments. + */ +public class DistributedObjectHandler { + private static final Log log = LogFactory.getLog(DistributedObjectHandler.class); + + private final boolean clustered; + private final HazelcastInstance hazelcastInstance; + + public DistributedObjectHandler(boolean clustered, HazelcastInstance hazelcastInstance) { + this.clustered = clustered; + this.hazelcastInstance = hazelcastInstance; + } + + private com.hazelcast.core.ILock acquireDistributedLock(Object object) { + if (log.isDebugEnabled()) { + log.debug(String.format("Acquiring distributed lock for %s...", object.getClass().getSimpleName())); + } + ILock lock = hazelcastInstance.getLock(object); + if (log.isDebugEnabled()) { + log.debug(String.format("Distributed lock acquired for %s", object.getClass().getSimpleName())); + } + return lock; + } + + private void releaseDistributedLock(ILock lock) { + if (log.isDebugEnabled()) { + log.debug(String.format("Releasing distributed lock for %s...", lock.getKey())); + } + lock.forceUnlock(); + if (log.isDebugEnabled()) { + log.debug(String.format("Distributed lock released for %s", lock.getKey())); + } + } + + public Map getMap(String key) { + if(clustered) { + return hazelcastInstance.getMap(key); + } else { + return new ConcurrentHashMap<Object, Object>(); + } + } + + public List getList(String name) { + if(clustered) { + return hazelcastInstance.getList(name); + } else { + return new ArrayList(); + } + } + + public void putToMap(Map map, Object key, Object value) { + if(clustered) { + ILock lock = null; + try { + lock = acquireDistributedLock(map); + ((IMap)map).set(key, value); + } finally { + releaseDistributedLock(lock); + } + } else { + map.put(key, value); + } + } + + public void removeFromMap(Map map, Object key) { + if(clustered) { + ILock lock = null; + try { + lock = acquireDistributedLock(map); + ((IMap)map).delete(key); + } finally { + releaseDistributedLock(lock); + } + } else { + map.remove(key); + } + } + + public void addToList(List list, Object value) { + if(clustered) { + ILock lock = null; + try { + lock = acquireDistributedLock(list); + ((IList)list).add(value); + } finally { + releaseDistributedLock(lock); + } + } else { + list.add(value); + } + } + + public void removeFromList(List list, Object value) { + if(clustered) { + ILock lock = null; + try { + lock = acquireDistributedLock(list); + ((IList)list).remove(value); + } finally { + releaseDistributedLock(lock); + } + } else { + list.remove(value); + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/403d5a45/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java index 9a1cdc2..b589ecc 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java @@ -18,11 +18,18 @@ */ package org.apache.stratos.cloud.controller.context; +import org.apache.axis2.clustering.ClusteringAgent; +import org.apache.axis2.engine.AxisConfiguration; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.cloud.controller.clustering.DistributedObjectHandler; import org.apache.stratos.cloud.controller.domain.*; +import org.apache.stratos.cloud.controller.internal.ServiceReferenceHolder; +import org.apache.stratos.cloud.controller.registry.Deserializer; import org.apache.stratos.cloud.controller.registry.RegistryManager; +import org.apache.stratos.cloud.controller.util.CloudControllerConstants; import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher; +import org.wso2.carbon.registry.core.exceptions.RegistryException; import java.io.Serializable; import java.util.ArrayList; @@ -37,384 +44,389 @@ import java.util.concurrent.ScheduledFuture; /** * This object holds all runtime data and provides faster access. This is a Singleton class. */ -public class CloudControllerContext implements Serializable{ +public class CloudControllerContext implements Serializable { private static final long serialVersionUID = -2662307358852779897L; private static final Log log = LogFactory.getLog(CloudControllerContext.class); + public static final String CC_CLUSTER_ID_TO_MEMBER_CTX = "CC_CLUSTER_ID_TO_MEMBER_CTX"; + public static final String CC_MEMBER_ID_TO_MEMBER_CTX = "CC_MEMBER_ID_TO_MEMBER_CTX"; + public static final String CC_MEMBER_ID_TO_SCH_TASK = "CC_MEMBER_ID_TO_SCH_TASK"; + public static final String CC_KUB_CLUSTER_ID_TO_KUB_CLUSTER_CTX = "CC_KUB_CLUSTER_ID_TO_KUB_CLUSTER_CTX"; + public static final String CC_CLUSTER_ID_TO_CLUSTER_CTX = "CC_CLUSTER_ID_TO_CLUSTER_CTX"; + public static final String CC_CARTRIDGE_TYPE_TO_PARTITION_IDS = "CC_CARTRIDGE_TYPE_TO_PARTITION_IDS"; + public static final String CC_CARTRIDGES = "CC_CARTRIDGES"; + public static final String CC_SERVICE_GROUPS = "CC_SERVICE_GROUPS"; + private static volatile CloudControllerContext instance; + private final DistributedObjectHandler distributedObjectHandler; + /* We keep following maps in order to make the look up time, small. */ - - /** + + /** * Key - cluster id * Value - list of {@link MemberContext} */ - private Map<String, List<MemberContext>> clusterIdToMemberContext; + private Map<String, List<MemberContext>> clusterIdToMemberContextListMap; + + /** + * Key - member id + * Value - {@link MemberContext} + */ + private Map<String, MemberContext> memberIdToMemberContextMap; /** - * Key - member id - * Value - {@link MemberContext} - */ - private Map<String, MemberContext> memberIdToContext; - - /** * Key - member id * Value - ScheduledFuture task */ - private transient Map<String, ScheduledFuture<?>> memberIdToScheduledTask; - - /** - * Key - Kubernetes cluster id - * Value - {@link org.apache.stratos.cloud.controller.domain.KubernetesClusterContext} - */ - private Map<String, KubernetesClusterContext> kubClusterIdToKubClusterContext; - - /** - * Key - cluster id - * Value - {@link org.apache.stratos.cloud.controller.domain.ClusterContext} - */ - private Map<String, ClusterContext> clusterIdToContext; - - /** - * This works as a cache to hold already validated partitions against a cartridge type. - * Key - cartridge type - * Value - list of partition ids - */ - private Map<String, List<String>> cartridgeTypeToPartitionIds = new ConcurrentHashMap<String, List<String>>(); - - /** + private transient Map<String, ScheduledFuture<?>> memberIdToScheduledTaskMap; + + /** + * Key - Kubernetes cluster id + * Value - {@link org.apache.stratos.cloud.controller.domain.KubernetesClusterContext} + */ + private Map<String, KubernetesClusterContext> kubClusterIdToKubClusterContextMap; + + /** + * Key - cluster id + * Value - {@link org.apache.stratos.cloud.controller.domain.ClusterContext} + */ + private Map<String, ClusterContext> clusterIdToContextMap; + + /** + * This works as a cache to hold already validated partitions against a cartridge type. + * Key - cartridge type + * Value - list of partition ids + */ + private Map<String, List<String>> cartridgeTypeToPartitionIdsMap = new ConcurrentHashMap<String, List<String>>(); + + /** * Thread pool used in this task to execute parallel tasks. */ - private transient ExecutorService executor = Executors.newFixedThreadPool(20); - - /** - * List of registered {@link org.apache.stratos.cloud.controller.domain.Cartridge}s - */ - private List<Cartridge> cartridges; - - /** - * List of deployed service groups - */ - private List<ServiceGroup> serviceGroups; - - private String serializationDir; - private String streamId; - private boolean isPublisherRunning; - private boolean isTopologySyncRunning; + private transient ExecutorService executorService = Executors.newFixedThreadPool(20); + + /** + * List of registered {@link org.apache.stratos.cloud.controller.domain.Cartridge}s + */ + private List<Cartridge> cartridges; + + /** + * List of deployed service groups + */ + private List<ServiceGroup> serviceGroups; + + private String streamId; + private boolean isPublisherRunning; + private boolean isTopologySyncRunning; private boolean clustered; private transient AsyncDataPublisher dataPublisher; private CloudControllerContext() { - // Initialize cloud controller context - clusterIdToMemberContext = new ConcurrentHashMap<String, List<MemberContext>>(); - memberIdToContext = new ConcurrentHashMap<String, MemberContext>(); - memberIdToScheduledTask = new ConcurrentHashMap<String, ScheduledFuture<?>>(); - kubClusterIdToKubClusterContext = new ConcurrentHashMap<String, KubernetesClusterContext>(); - clusterIdToContext = new ConcurrentHashMap<String, ClusterContext>(); - cartridgeTypeToPartitionIds = new ConcurrentHashMap<String, List<String>>(); - cartridges = new ArrayList<Cartridge>(); - serviceGroups = new ArrayList<ServiceGroup>(); - - if (log.isInfoEnabled()) { - log.info("Cloud controller context initialized locally"); + // Check clustering status + AxisConfiguration axisConfiguration = ServiceReferenceHolder.getInstance().getAxisConfiguration(); + if ((axisConfiguration != null) && (axisConfiguration.getClusteringAgent() != null)) { + clustered = true; } + + // Initialize distributed object handler + distributedObjectHandler = new DistributedObjectHandler(isClustered(), + ServiceReferenceHolder.getInstance().getHazelcastInstance()); + + // Initialize objects + clusterIdToMemberContextListMap = distributedObjectHandler.getMap(CC_CLUSTER_ID_TO_MEMBER_CTX); + memberIdToMemberContextMap = distributedObjectHandler.getMap(CC_MEMBER_ID_TO_MEMBER_CTX); + memberIdToScheduledTaskMap = distributedObjectHandler.getMap(CC_MEMBER_ID_TO_SCH_TASK); + kubClusterIdToKubClusterContextMap = distributedObjectHandler.getMap(CC_KUB_CLUSTER_ID_TO_KUB_CLUSTER_CTX); + clusterIdToContextMap = distributedObjectHandler.getMap(CC_CLUSTER_ID_TO_CLUSTER_CTX); + cartridgeTypeToPartitionIdsMap = distributedObjectHandler.getMap(CC_CARTRIDGE_TYPE_TO_PARTITION_IDS); + cartridges = distributedObjectHandler.getList(CC_CARTRIDGES); + serviceGroups = distributedObjectHandler.getList(CC_SERVICE_GROUPS); + + // Update context from the registry + updateContextFromRegistry(); } - public static CloudControllerContext getInstance() { - if (instance == null) { - synchronized (CloudControllerContext.class) { - if (instance == null && RegistryManager.getInstance() != null) { - Object obj = RegistryManager.getInstance().retrieve(); - if (obj != null) { - if (obj instanceof CloudControllerContext) { - instance = (CloudControllerContext) obj; - } - } - } - if(instance == null) { - instance = new CloudControllerContext(); - } - } - } - return instance; - } - - public List<Cartridge> getCartridges() { - return cartridges; - } - - public void setCartridges(List<Cartridge> cartridges) { - this.cartridges = cartridges; - } - - public void setServiceGroups(List<ServiceGroup> serviceGroups) { - this.serviceGroups = serviceGroups; - } - - public List<ServiceGroup> getServiceGroups() { - return this.serviceGroups; - } - - - public Cartridge getCartridge(String cartridgeType) { - for (Cartridge cartridge : cartridges) { - if (cartridge.getType().equals(cartridgeType)) { - return cartridge; - } - } - return null; - } - - public void addCartridge(Cartridge newCartridges) { - cartridges.add(newCartridges); - } - - public void removeCartridges(List<Cartridge> cartridges) { - if (this.cartridges != null) { - this.cartridges.removeAll(cartridges); - } - - } - - public ServiceGroup getServiceGroup(String name) { - for (ServiceGroup serviceGroup : serviceGroups) { - if (serviceGroup.getName().equals(name)) { - return serviceGroup; - } - } - - return null; - } - - public void addServiceGroup(ServiceGroup newServiceGroup) { - this.serviceGroups.add(newServiceGroup); - } - - public void removeServiceGroup(List<ServiceGroup> serviceGroup) { - if (this.serviceGroups != null) { - this.serviceGroups.removeAll(serviceGroup); - } - } - - public String getSerializationDir() { - return serializationDir; - } - - public void setSerializationDir(String serializationDir) { - this.serializationDir = serializationDir; - } - - public AsyncDataPublisher getDataPublisher() { - return dataPublisher; - } - - public void setDataPublisher(AsyncDataPublisher dataPublisher) { - this.dataPublisher = dataPublisher; - } - - public String getStreamId() { - return streamId; - } - - public void setStreamId(String streamId) { - this.streamId = streamId; - } - - public boolean isPublisherRunning() { - return isPublisherRunning; - } - - public void setPublisherRunning(boolean isPublisherRunning) { - this.isPublisherRunning = isPublisherRunning; - } - - public boolean isTopologySyncRunning() { - return isTopologySyncRunning; - } - - public void setTopologySyncRunning(boolean isTopologySyncRunning) { - this.isTopologySyncRunning = isTopologySyncRunning; - } - - public void addMemberContext(MemberContext ctxt) { - memberIdToContext.put(ctxt.getMemberId(), ctxt); - - List<MemberContext> ctxts; - - if((ctxts = clusterIdToMemberContext.get(ctxt.getClusterId())) == null) { - ctxts = new ArrayList<MemberContext>(); - } - if(ctxts.contains(ctxt)) { - ctxts.remove(ctxt); + public static CloudControllerContext getInstance() { + if (instance == null) { + synchronized (CloudControllerContext.class) { + if (instance == null) { + instance = new CloudControllerContext(); + } + } + } + return instance; + } + + public List<Cartridge> getCartridges() { + return cartridges; + } + + public void setCartridges(List<Cartridge> cartridges) { + this.cartridges = cartridges; + } + + public void setServiceGroups(List<ServiceGroup> serviceGroups) { + this.serviceGroups = serviceGroups; + } + + public List<ServiceGroup> getServiceGroups() { + return this.serviceGroups; + } + + public Cartridge getCartridge(String cartridgeType) { + for (Cartridge cartridge : cartridges) { + if (cartridge.getType().equals(cartridgeType)) { + return cartridge; + } + } + return null; + } + + public void addCartridge(Cartridge newCartridges) { + distributedObjectHandler.addToList(cartridges, newCartridges); + } + + public ServiceGroup getServiceGroup(String name) { + for (ServiceGroup serviceGroup : serviceGroups) { + if (serviceGroup.getName().equals(name)) { + return serviceGroup; + } } - ctxts.add(ctxt); - clusterIdToMemberContext.put(ctxt.getClusterId(), ctxts); - if(log.isDebugEnabled()) { - - log.debug("Added Member Context to the information model. "+ctxt); + return null; + } + + public void addServiceGroup(ServiceGroup newServiceGroup) { + distributedObjectHandler.addToList(serviceGroups, newServiceGroup); + } + + public void removeServiceGroup(List<ServiceGroup> serviceGroup) { + if (this.serviceGroups != null) { + this.serviceGroups.removeAll(serviceGroup); } } - + + public AsyncDataPublisher getDataPublisher() { + return dataPublisher; + } + + public void setDataPublisher(AsyncDataPublisher dataPublisher) { + this.dataPublisher = dataPublisher; + } + + public String getStreamId() { + return streamId; + } + + public void setStreamId(String streamId) { + this.streamId = streamId; + } + + public boolean isPublisherRunning() { + return isPublisherRunning; + } + + public void setPublisherRunning(boolean isPublisherRunning) { + this.isPublisherRunning = isPublisherRunning; + } + + public boolean isTopologySyncRunning() { + return isTopologySyncRunning; + } + + public void setTopologySyncRunning(boolean isTopologySyncRunning) { + this.isTopologySyncRunning = isTopologySyncRunning; + } + + public void addMemberContext(MemberContext memberContext) { + distributedObjectHandler.putToMap(memberIdToMemberContextMap, memberContext.getMemberId(), memberContext); + + List<MemberContext> memberContextList; + if ((memberContextList = clusterIdToMemberContextListMap.get(memberContext.getClusterId())) == null) { + memberContextList = new ArrayList<MemberContext>(); + } + if (memberContextList.contains(memberContext)) { + distributedObjectHandler.removeFromList(memberContextList,memberContext); + } + distributedObjectHandler.addToList(memberContextList, memberContext); + distributedObjectHandler.putToMap(clusterIdToMemberContextListMap, memberContext.getClusterId(), + memberContextList); + if (log.isDebugEnabled()) { + log.debug("Added member context to the cloud controller context: " + memberContext); + } + } + public void addScheduledFutureJob(String memberId, ScheduledFuture<?> job) { - memberIdToScheduledTask.put(memberId, job); + distributedObjectHandler.putToMap(memberIdToScheduledTaskMap, memberId, job); } - + public List<MemberContext> removeMemberContextsOfCluster(String clusterId) { - List<MemberContext> ctxts = clusterIdToMemberContext.remove(clusterId); - if(ctxts == null) { + List<MemberContext> memberContextList = clusterIdToMemberContextListMap.get(clusterId); + distributedObjectHandler.removeFromMap(clusterIdToMemberContextListMap, clusterId); + if (memberContextList == null) { return new ArrayList<MemberContext>(); } - for (MemberContext memberContext : ctxts) { + for (MemberContext memberContext : memberContextList) { String memberId = memberContext.getMemberId(); - memberIdToContext.remove(memberId); - stopTask(memberIdToScheduledTask.remove(memberId)); - } - if(log.isDebugEnabled()) { - - log.debug("Removed Member Context from the information model. "+ instance); + distributedObjectHandler.removeFromMap(memberIdToMemberContextMap, memberId); + ScheduledFuture<?> task = memberIdToScheduledTaskMap.get(memberId); + distributedObjectHandler.removeFromMap(memberIdToScheduledTaskMap, memberId); + stopTask(task); + + if (log.isDebugEnabled()) { + log.debug("Removed member context from cloud controller context: " + + "[member-id] " + memberId); + } } - return ctxts; + return memberContextList; } - + public MemberContext removeMemberContext(String memberId, String clusterId) { - MemberContext returnedCtxt = memberIdToContext.remove(memberId); - List<MemberContext> ctxts = clusterIdToMemberContext.get(clusterId); - - if (ctxts != null) { - - List<MemberContext> newCtxts = new ArrayList<MemberContext>(ctxts); - - for (Iterator<MemberContext> iterator = newCtxts.iterator(); iterator.hasNext();) { - MemberContext memberContext = (MemberContext) iterator.next(); - if(memberId.equals(memberContext.getMemberId())) { - if(log.isDebugEnabled()) { - - log.debug("MemberContext [id]: "+memberId+" removed from information model."); + MemberContext removedMemberContext = memberIdToMemberContextMap.get(memberId); + distributedObjectHandler.removeFromMap(memberIdToMemberContextMap, memberId); + + List<MemberContext> memberContextList = clusterIdToMemberContextListMap.get(clusterId); + if (memberContextList != null) { + List<MemberContext> newCtxts = new ArrayList<MemberContext>(memberContextList); + for (Iterator<MemberContext> iterator = newCtxts.iterator(); iterator.hasNext(); ) { + MemberContext memberContext = iterator.next(); + if (memberId.equals(memberContext.getMemberId())) { + if (log.isDebugEnabled()) { + log.debug("Member context removed from cloud controller context: [member-id] " + memberId); } iterator.remove(); } } - - clusterIdToMemberContext.put(clusterId, newCtxts); + distributedObjectHandler.putToMap(clusterIdToMemberContextListMap, clusterId, newCtxts); } - - stopTask(memberIdToScheduledTask.remove(memberId)); - - return returnedCtxt; + ScheduledFuture<?> task = memberIdToScheduledTaskMap.get(memberId); + distributedObjectHandler.removeFromMap(memberIdToScheduledTaskMap, memberId); + stopTask(task); + return removedMemberContext; } - + private void stopTask(ScheduledFuture<?> task) { if (task != null) { - task.cancel(true); - log.info("Scheduled Pod Activation Watcher task canceled."); + log.info("Scheduled pod activation watcher task canceled"); } } - + public MemberContext getMemberContextOfMemberId(String memberId) { - return memberIdToContext.get(memberId); + return memberIdToMemberContextMap.get(memberId); } - + public List<MemberContext> getMemberContextsOfClusterId(String clusterId) { - return clusterIdToMemberContext.get(clusterId); + return clusterIdToMemberContextListMap.get(clusterId); + } + + public void addClusterContext(ClusterContext ctxt) { + distributedObjectHandler.putToMap(clusterIdToContextMap, ctxt.getClusterId(), ctxt); } - public Map<String, List<MemberContext>> getClusterIdToMemberContext() { - return clusterIdToMemberContext; + public ClusterContext getClusterContext(String clusterId) { + return clusterIdToContextMap.get(clusterId); } - - public void setClusterIdToMemberContext(Map<String, List<MemberContext>> clusterIdToMemberContext) { - this.clusterIdToMemberContext = clusterIdToMemberContext; + + public ClusterContext removeClusterContext(String clusterId) { + ClusterContext removed = clusterIdToContextMap.get(clusterId); + distributedObjectHandler.removeFromMap(clusterIdToContextMap, clusterId); + return removed; } - public Map<String, MemberContext> getMemberIdToContext() { - return memberIdToContext; + public ExecutorService getExecutorService() { + return executorService; } - public void setMemberIdToContext(Map<String, MemberContext> memberIdToContext) { - this.memberIdToContext = memberIdToContext; + public List<String> getPartitionIds(String cartridgeType) { + return cartridgeTypeToPartitionIdsMap.get(cartridgeType); } - public void addClusterContext(ClusterContext ctxt) { - clusterIdToContext.put(ctxt.getClusterId(), ctxt); + public void addToCartridgeTypeToPartitionIdMap(String cartridgeType, String partitionId) { + List<String> list = this.cartridgeTypeToPartitionIdsMap.get(cartridgeType); + if (list == null) { + list = new ArrayList<String>(); + } + list.add(partitionId); + distributedObjectHandler.putToMap(cartridgeTypeToPartitionIdsMap, cartridgeType, list); } - - public ClusterContext getClusterContext(String clusterId) { - return clusterIdToContext.get(clusterId); + + public void removeFromCartridgeTypeToPartitionIds(String cartridgeType) { + distributedObjectHandler.removeFromMap(cartridgeTypeToPartitionIdsMap, cartridgeType); } - - public ClusterContext removeClusterContext(String clusterId) { - return clusterIdToContext.remove(clusterId); - } - - public Map<String, ClusterContext> getClusterIdToContext() { - return clusterIdToContext; - } - - public void setClusterIdToContext(Map<String, ClusterContext> clusterIdToContext) { - this.clusterIdToContext = clusterIdToContext; - } - - public ExecutorService getExecutor() { - return executor; - } - - public void setExecutor(ExecutorService executor) { - this.executor = executor; - } - - public Map<String, List<String>> getCartridgeTypeToPartitionIds() { - return cartridgeTypeToPartitionIds; - } - - public void setCartridgeTypeToPartitionIds( - Map<String, List<String>> cartridgeTypeToPartitionIds) { - this.cartridgeTypeToPartitionIds = cartridgeTypeToPartitionIds; - } - - public void addToCartridgeTypeToPartitionIdMap(String cartridgeType, String partitionId) { - List<String> list = this.cartridgeTypeToPartitionIds.get(cartridgeType); - - if(list == null) { - list = new ArrayList<String>(); - } - - list.add(partitionId); - this.cartridgeTypeToPartitionIds.put(cartridgeType, list); - } - - public void removeFromCartridgeTypeToPartitionIds(String cartridgeType) { - this.cartridgeTypeToPartitionIds.remove(cartridgeType); - } - - public Map<String, KubernetesClusterContext> getKubClusterIdToKubClusterContext() { - return kubClusterIdToKubClusterContext; - } - - public KubernetesClusterContext getKubernetesClusterContext(String kubClusterId) { - return kubClusterIdToKubClusterContext.get(kubClusterId); - } - - public void addKubernetesClusterContext(KubernetesClusterContext ctxt) { - this.kubClusterIdToKubClusterContext.put(ctxt.getKubernetesClusterId(), ctxt); - } - - public void setKubClusterIdToKubClusterContext( - Map<String, KubernetesClusterContext> kubClusterIdToKubClusterContext) { - this.kubClusterIdToKubClusterContext = kubClusterIdToKubClusterContext; - } - - public Map<String, ScheduledFuture<?>> getMemberIdToScheduledTask() { - return memberIdToScheduledTask; - } - - public void setMemberIdToScheduledTask(Map<String, ScheduledFuture<?>> memberIdToScheduledTask) { - this.memberIdToScheduledTask = memberIdToScheduledTask; + + public KubernetesClusterContext getKubernetesClusterContext(String kubClusterId) { + return kubClusterIdToKubClusterContextMap.get(kubClusterId); + } + + public void addKubernetesClusterContext(KubernetesClusterContext kubernetesClusterContext) { + distributedObjectHandler.putToMap(kubClusterIdToKubClusterContextMap, + kubernetesClusterContext.getKubernetesClusterId(), + kubernetesClusterContext); } public boolean isClustered() { return clustered; } + + public boolean isCoordinator() { + AxisConfiguration axisConfiguration = ServiceReferenceHolder.getInstance().getAxisConfiguration(); + ClusteringAgent clusteringAgent = axisConfiguration.getClusteringAgent(); + return ((axisConfiguration != null) && (clusteringAgent != null) && (clusteringAgent.isCoordinator())); + } + + public void persist() throws RegistryException { + if ((!isClustered()) || (isCoordinator())) { + RegistryManager.getInstance().persist(CloudControllerConstants.DATA_RESOURCE, this); + } + } + + private void updateContextFromRegistry() { + if ((!isClustered()) || (isCoordinator())) { + try { + Object obj = RegistryManager.getInstance().read(CloudControllerConstants.DATA_RESOURCE); + if (obj != null) { + Object dataObj = Deserializer.deserializeFromByteArray((byte[]) obj); + if (dataObj instanceof CloudControllerContext) { + CloudControllerContext serializedObj = (CloudControllerContext) dataObj; + + copyMap(clusterIdToMemberContextListMap, serializedObj.clusterIdToMemberContextListMap); + copyMap(memberIdToMemberContextMap, serializedObj.memberIdToMemberContextMap); + copyMap(memberIdToScheduledTaskMap, serializedObj.memberIdToScheduledTaskMap); + copyMap(kubClusterIdToKubClusterContextMap, serializedObj.kubClusterIdToKubClusterContextMap); + copyMap(clusterIdToContextMap, serializedObj.clusterIdToContextMap); + copyMap(cartridgeTypeToPartitionIdsMap, serializedObj.cartridgeTypeToPartitionIdsMap); + + copyList(cartridges, serializedObj.getCartridges()); + copyList(serviceGroups, serializedObj.getServiceGroups()); + + if (log.isDebugEnabled()) { + log.debug("Cloud controller context is read from the registry"); + } + } else { + if (log.isDebugEnabled()) { + log.debug("Cloud controller context could not be found in the registry"); + } + } + } + } catch (Exception e) { + String msg = "Unable to read cloud controller context from the registry. " + + "Hence, any historical data will not be reflected"; + log.warn(msg, e); + } + } + } + + private void copyMap(Map sourceMap, Map destinationMap) { + for(Object key : sourceMap.keySet()) { + distributedObjectHandler.putToMap(destinationMap, key, sourceMap.get(key)); + } + } + + private void copyList(List sourceList, List destinationList) { + for(Object item : sourceList) { + distributedObjectHandler.addToList(destinationList, item); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/403d5a45/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/deployers/CloudControllerDeployer.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/deployers/CloudControllerDeployer.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/deployers/CloudControllerDeployer.java index 6c299bb..8af29ad 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/deployers/CloudControllerDeployer.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/deployers/CloudControllerDeployer.java @@ -85,18 +85,12 @@ public class CloudControllerDeployer extends AbstractDeployer { } public void undeploy(String file) throws DeploymentException { - if (file.contains(FILE_NAME)) { - // reset - CloudControllerContext context = CloudControllerContext.getInstance(); - context.setSerializationDir(""); - // grab the entry from Map if (fileToIaasProviderListMap.containsKey(file)) { // remove 'em CloudControllerConfig.getInstance().getIaasProviders().removeAll(fileToIaasProviderListMap.get(file)); - - log.info("Successfully undeployed the cloud-controller XML file specified at " + + log.info("Successfully un-deployed the cloud-controller XML file specified at " + file); } http://git-wip-us.apache.org/repos/asf/stratos/blob/403d5a45/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java index 9d8a7c5..7337813 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java @@ -23,7 +23,6 @@ package org.apache.stratos.cloud.controller.internal; import com.hazelcast.core.HazelcastInstance; -import org.apache.axis2.clustering.ClusteringAgent; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.cloud.controller.context.CloudControllerContext; @@ -74,7 +73,7 @@ public class CloudControllerServiceComponent { tApplicationTopicReceiver.start(); if (log.isInfoEnabled()) { - log.info("Application Receiver thread started"); + log.info("Application event receiver thread started"); } clusterStatusTopicReceiver = new ClusterStatusTopicReceiver(); @@ -82,7 +81,7 @@ public class CloudControllerServiceComponent { tClusterStatusTopicReceiver.start(); if (log.isInfoEnabled()) { - log.info("Cluster status Receiver thread started"); + log.info("Cluster status receiver thread started"); } instanceStatusTopicReceiver = new InstanceStatusTopicReceiver(); @@ -100,13 +99,16 @@ public class CloudControllerServiceComponent { if(log.isInfoEnabled()) { log.info("Scheduling tasks"); } - - TopologySynchronizerTaskScheduler - .schedule(ServiceReferenceHolder.getInstance() - .getTaskService()); - + + if ((!CloudControllerContext.getInstance().isClustered()) || + (CloudControllerContext.getInstance().isCoordinator())) { + TopologySynchronizerTaskScheduler.schedule(ServiceReferenceHolder.getInstance().getTaskService()); + if(log.isInfoEnabled()) { + log.info("Topology synchronizer task scheduled"); + } + } } catch (Throwable e) { - log.error("******* Cloud Controller Service bundle is failed to activate ****", e); + log.error("**** Cloud controller service bundle is failed to activate ****", e); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/403d5a45/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java index 739469b..a195708 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java @@ -222,18 +222,15 @@ public class TopologyBuilder { */ private static void persist(CloudControllerContext context) { try { - RegistryManager.getInstance().persist( - context); + context.persist(); } catch (RegistryException e) { - - String msg = "Failed to persist the Cloud Controller data in registry. Further, transaction roll back also failed."; - log.fatal(msg); + String msg = "Failed to persist the cloud controller context in registry."; + log.error(msg); throw new CloudControllerException(msg, e); } } public static void handleClusterReset(ClusterStatusClusterResetEvent event) { - TopologyManager.acquireWriteLock(); try { http://git-wip-us.apache.org/repos/asf/stratos/blob/403d5a45/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/registry/RegistryManager.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/registry/RegistryManager.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/registry/RegistryManager.java index cd05fdd..3b589ff 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/registry/RegistryManager.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/registry/RegistryManager.java @@ -35,37 +35,30 @@ import org.wso2.carbon.registry.core.exceptions.RegistryException; import org.wso2.carbon.registry.core.exceptions.ResourceNotFoundException; import org.wso2.carbon.utils.multitenancy.MultitenantConstants; +import java.io.Serializable; + /** - * + * Registry manager provides functionality for persisting resources in the registry and reading them back. */ public class RegistryManager { private final static Log log = LogFactory.getLog(RegistryManager.class); - private static Registry registryService; private static class Holder { - static final RegistryManager INSTANCE = new RegistryManager(); + static final RegistryManager instance = new RegistryManager(); } public static RegistryManager getInstance() { - registryService = ServiceReferenceHolder.getInstance().getRegistry(); - if (registryService == null) { - log.warn("Registry Service is null. Hence unable to fetch data from registry."); - return null; - } - - return Holder.INSTANCE; + return Holder.instance; } private RegistryManager() { try { - - if (!registryService.resourceExists(CloudControllerConstants.CLOUD_CONTROLLER_RESOURCE)) { - registryService.put(CloudControllerConstants.CLOUD_CONTROLLER_RESOURCE, - registryService.newCollection()); + Registry registry = ServiceReferenceHolder.getInstance().getRegistry(); + if ((registry != null) && (!registry.resourceExists(CloudControllerConstants.CLOUD_CONTROLLER_RESOURCE))) { + registry.put(CloudControllerConstants.CLOUD_CONTROLLER_RESOURCE, registry.newCollection()); } } catch (RegistryException e) { - String msg = - "Failed to create the registry resource " + + String msg = "Failed to create the registry resource " + CloudControllerConstants.CLOUD_CONTROLLER_RESOURCE; log.error(msg, e); throw new CloudControllerException(msg, e); @@ -73,108 +66,62 @@ public class RegistryManager { } /** - * Persist an object in the local registry. + * Persist an object in the registry. * - * @param dataObj object to be persisted. + * @param serializableObject object to be persisted. */ - public synchronized void persist(CloudControllerContext dataObj) throws RegistryException { - try { - - PrivilegedCarbonContext ctx = PrivilegedCarbonContext.getThreadLocalCarbonContext(); - ctx.setTenantId(MultitenantConstants.SUPER_TENANT_ID); - ctx.setTenantDomain(MultitenantConstants.SUPER_TENANT_DOMAIN_NAME); - - registryService.beginTransaction(); - - Resource nodeResource = registryService.newResource(); - - nodeResource.setContent(Serializer.serializeToByteArray(dataObj)); - - registryService.put(CloudControllerConstants.CLOUD_CONTROLLER_RESOURCE + CloudControllerConstants.DATA_RESOURCE, nodeResource); - - registryService.commitTransaction(); - - } catch (Exception e) { - String msg = "Failed to persist the cloud controller data in registry."; - registryService.rollbackTransaction(); - log.error(msg, e); - throw new CloudControllerException(msg, e); - + public synchronized void persist(String resourcePath, Serializable serializableObject) throws RegistryException { + String absoluteResourcePath = CloudControllerConstants.CLOUD_CONTROLLER_RESOURCE + resourcePath; + if(log.isDebugEnabled()) { + log.debug(String.format("Persisting resource in registry: [resource-path] %s", absoluteResourcePath)); } - } + Registry registry = ServiceReferenceHolder.getInstance().getRegistry(); - public synchronized void persistTopology(Topology topology) throws RegistryException { try { - PrivilegedCarbonContext ctx = PrivilegedCarbonContext.getThreadLocalCarbonContext(); ctx.setTenantId(MultitenantConstants.SUPER_TENANT_ID); ctx.setTenantDomain(MultitenantConstants.SUPER_TENANT_DOMAIN_NAME); - - registryService.beginTransaction(); - - Resource nodeResource = registryService.newResource(); - nodeResource.setContent(Serializer.serializeToByteArray(topology)); + registry.beginTransaction(); - registryService.put(CloudControllerConstants.CLOUD_CONTROLLER_RESOURCE + CloudControllerConstants.TOPOLOGY_RESOURCE, nodeResource); + Resource nodeResource = registry.newResource(); + nodeResource.setContent(Serializer.serializeToByteArray(serializableObject)); + registry.put(absoluteResourcePath, nodeResource); - registryService.commitTransaction(); + registry.commitTransaction(); + if(log.isDebugEnabled()) { + log.debug(String.format("Resource persisted successfully in registry: [resource-path] %s", + absoluteResourcePath)); + } } catch (Exception e) { - String msg = "Failed to persist the cloud controller data in registry."; - registryService.rollbackTransaction(); + String msg = "Failed to persist resource in registry: " + absoluteResourcePath; + registry.rollbackTransaction(); log.error(msg, e); throw new CloudControllerException(msg, e); - } } - - public synchronized Object retrieve() { - + public synchronized Object read(String resourcePath) { try { - PrivilegedCarbonContext ctx = PrivilegedCarbonContext.getThreadLocalCarbonContext(); + Registry registry = ServiceReferenceHolder.getInstance().getRegistry(); + if(registry == null) { + return null; + } + + PrivilegedCarbonContext ctx = PrivilegedCarbonContext.getThreadLocalCarbonContext(); ctx.setTenantId(MultitenantConstants.SUPER_TENANT_ID); ctx.setTenantDomain(MultitenantConstants.SUPER_TENANT_DOMAIN_NAME); - Resource resource = registryService.get( - CloudControllerConstants.CLOUD_CONTROLLER_RESOURCE + CloudControllerConstants.DATA_RESOURCE); - + Resource resource = registry.get(CloudControllerConstants.CLOUD_CONTROLLER_RESOURCE + resourcePath); return resource.getContent(); - } catch (ResourceNotFoundException ignore) { // this means, we've never persisted CC info in registry return null; } catch (RegistryException e) { - String msg = "Failed to retrieve cloud controller data from registry."; + String msg = "Failed to read resource from registry: " + + CloudControllerConstants.CLOUD_CONTROLLER_RESOURCE + resourcePath; log.error(msg, e); throw new CloudControllerException(msg, e); } - } - - public synchronized Object retrieveTopology() { - - try { - PrivilegedCarbonContext ctx = PrivilegedCarbonContext - .getThreadLocalCarbonContext(); - ctx.setTenantId(MultitenantConstants.SUPER_TENANT_ID); - ctx.setTenantDomain(MultitenantConstants.SUPER_TENANT_DOMAIN_NAME); - - Resource resource = registryService - .get(CloudControllerConstants.CLOUD_CONTROLLER_RESOURCE - + CloudControllerConstants.TOPOLOGY_RESOURCE); - - return resource.getContent(); - - } catch (ResourceNotFoundException ignore) { - // this means, we've never persisted CC info in registry - return null; - } catch (RegistryException e) { - String msg = "Failed to retrieve cloud controller data from registry."; - log.error(msg, e); - throw new CloudControllerException(msg, e); - } - - } - } http://git-wip-us.apache.org/repos/asf/stratos/blob/403d5a45/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/registry/Serializer.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/registry/Serializer.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/registry/Serializer.java index bf0122a..1759902 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/registry/Serializer.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/registry/Serializer.java @@ -64,17 +64,17 @@ public class Serializer { /** * Serialize a {@link org.apache.stratos.cloud.controller.context.CloudControllerContext} to a byte array. - * @param serializableObj + * @param serializableObject * @return byte[] * @throws IOException */ - public static byte[] serializeToByteArray(CloudControllerContext serializableObj) throws IOException { + public static byte[] serializeToByteArray(Serializable serializableObject) throws IOException { ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutput out = null; try { out = new ObjectOutputStream(bos); - out.writeObject(serializableObj); + out.writeObject(serializableObject); return bos.toByteArray(); http://git-wip-us.apache.org/repos/asf/stratos/blob/403d5a45/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java index d1ef730..ae3a7ae 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java @@ -38,9 +38,7 @@ import org.apache.stratos.cloud.controller.functions.ContainerClusterContextToKu import org.apache.stratos.cloud.controller.functions.ContainerClusterContextToReplicationController; import org.apache.stratos.cloud.controller.functions.PodToMemberContext; import org.apache.stratos.cloud.controller.iaas.Iaas; -import org.apache.stratos.cloud.controller.registry.Deserializer; import org.apache.stratos.cloud.controller.messaging.publisher.CartridgeInstanceDataPublisher; -import org.apache.stratos.cloud.controller.registry.RegistryManager; import org.apache.stratos.cloud.controller.services.CloudControllerService; import org.apache.stratos.cloud.controller.messaging.topology.TopologyBuilder; import org.apache.stratos.cloud.controller.messaging.topology.TopologyEventPublisher; @@ -82,51 +80,10 @@ public class CloudControllerServiceImpl implements CloudControllerService { private static final Log LOG = LogFactory.getLog(CloudControllerServiceImpl.class); public static final String IS_LOAD_BALANCER = "load.balancer"; - private CloudControllerContext dataHolder = CloudControllerContext + private CloudControllerContext cloudControllerContext = CloudControllerContext .getInstance(); public CloudControllerServiceImpl() { - // acquire serialized data from registry - acquireData(); - } - - private void acquireData() { - - Object obj = RegistryManager.getInstance().retrieve(); - if (obj != null) { - try { - Object dataObj = Deserializer - .deserializeFromByteArray((byte[]) obj); - if (dataObj instanceof CloudControllerContext) { - CloudControllerContext serializedObj = (CloudControllerContext) dataObj; - CloudControllerContext currentData = CloudControllerContext - .getInstance(); - - // assign necessary data - currentData.setClusterIdToContext(serializedObj.getClusterIdToContext()); - currentData.setMemberIdToContext(serializedObj.getMemberIdToContext()); - currentData.setClusterIdToMemberContext(serializedObj.getClusterIdToMemberContext()); - currentData.setCartridges(serializedObj.getCartridges()); - currentData.setKubClusterIdToKubClusterContext(serializedObj.getKubClusterIdToKubClusterContext()); - currentData.setServiceGroups(serializedObj.getServiceGroups()); - - if (LOG.isDebugEnabled()) { - - LOG.debug("Cloud Controller Data is retrieved from registry."); - } - } else { - if (LOG.isDebugEnabled()) { - - LOG.debug("Cloud Controller Data cannot be found in registry."); - } - } - } catch (Exception e) { - - String msg = "Unable to acquire data from Registry. Hence, any historical data will not get reflected."; - LOG.warn(msg, e); - } - - } } public void deployCartridgeDefinition(CartridgeConfig cartridgeConfig) throws InvalidCartridgeDefinitionException, @@ -177,8 +134,8 @@ public class CloudControllerServiceImpl implements CloudControllerService { // TODO transaction begins String cartridgeType = cartridge.getType(); - if (dataHolder.getCartridge(cartridgeType) != null) { - Cartridge cartridgeToBeRemoved = dataHolder.getCartridge(cartridgeType); + if (cloudControllerContext.getCartridge(cartridgeType) != null) { + Cartridge cartridgeToBeRemoved = cloudControllerContext.getCartridge(cartridgeType); // undeploy try { undeployCartridgeDefinition(cartridgeToBeRemoved.getType()); @@ -188,7 +145,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { populateNewCartridge(cartridge, cartridgeToBeRemoved); } - dataHolder.addCartridge(cartridge); + cloudControllerContext.addCartridge(cartridge); // persist persist(); @@ -228,10 +185,10 @@ public class CloudControllerServiceImpl implements CloudControllerService { public void undeployCartridgeDefinition(String cartridgeType) throws InvalidCartridgeTypeException { Cartridge cartridge = null; - if ((cartridge = dataHolder.getCartridge(cartridgeType)) != null) { - if (dataHolder.getCartridges().remove(cartridge)) { + if ((cartridge = cloudControllerContext.getCartridge(cartridgeType)) != null) { + if (cloudControllerContext.getCartridges().remove(cartridge)) { // invalidate partition validation cache - dataHolder.removeFromCartridgeTypeToPartitionIds(cartridgeType); + cloudControllerContext.removeFromCartridgeTypeToPartitionIds(cartridgeType); if (LOG.isDebugEnabled()) { LOG.debug("Partition cache invalidated for cartridge " + cartridgeType); @@ -301,7 +258,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { } } - dataHolder.addServiceGroup(servicegroup); + cloudControllerContext.addServiceGroup(servicegroup); this.persist(); @@ -314,10 +271,10 @@ public class CloudControllerServiceImpl implements CloudControllerService { ServiceGroup serviceGroup = null; - serviceGroup = dataHolder.getServiceGroup(name); + serviceGroup = cloudControllerContext.getServiceGroup(name); if (serviceGroup != null) { - if (dataHolder.getServiceGroups().remove(serviceGroup)) { + if (cloudControllerContext.getServiceGroups().remove(serviceGroup)) { persist(); if (LOG.isInfoEnabled()) { LOG.info("Successfully undeployed the Service Group definition: " + serviceGroup); @@ -339,7 +296,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { LOG.debug("getServiceGroupDefinition:" + name); } - ServiceGroup serviceGroup = this.dataHolder.getServiceGroup(name); + ServiceGroup serviceGroup = this.cloudControllerContext.getServiceGroup(name); if (serviceGroup == null) { if (LOG.isDebugEnabled()) { @@ -405,13 +362,13 @@ public class CloudControllerServiceImpl implements CloudControllerService { memberContext); String partitionId = partition.getId(); - ClusterContext ctxt = dataHolder.getClusterContext(clusterId); + ClusterContext ctxt = cloudControllerContext.getClusterContext(clusterId); handleNullObject(ctxt, "Instance start-up failed. Invalid cluster id. " + memberContext); String cartridgeType = ctxt.getCartridgeType(); - Cartridge cartridge = dataHolder.getCartridge(cartridgeType); + Cartridge cartridge = cloudControllerContext.getCartridge(cartridgeType); if (cartridge == null) { String msg = @@ -591,11 +548,9 @@ public class CloudControllerServiceImpl implements CloudControllerService { */ private void persist() { try { - RegistryManager.getInstance().persist( - dataHolder); + cloudControllerContext.persist(); } catch (RegistryException e) { - - String msg = "Failed to persist the Cloud Controller data in registry. Further, transaction roll back also failed."; + String msg = "Failed to persist the cloud controller context in registry."; LOG.fatal(msg); throw new CloudControllerException(msg, e); } @@ -611,7 +566,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { handleNullObject(memberId, "Termination failed. Null member id."); - MemberContext ctxt = dataHolder.getMemberContextOfMemberId(memberId); + MemberContext ctxt = cloudControllerContext.getMemberContextOfMemberId(memberId); if (ctxt == null) { String msg = "Termination failed. Invalid Member Id: " + memberId; @@ -767,7 +722,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { try { // these will never be null, since we do not add null values for these. - Cartridge cartridge = dataHolder.getCartridge(cartridgeType); + Cartridge cartridge = cloudControllerContext.getCartridge(cartridgeType); LOG.info("Starting to terminate an instance with member id : " + memberId + " in partition id: " + partitionId + " of cluster id: " + clusterId + @@ -832,7 +787,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { String clusterId = memberContext.getClusterId(); Partition partition = memberContext.getPartition(); - ClusterContext ctxt = dataHolder.getClusterContext(clusterId); + ClusterContext ctxt = cloudControllerContext.getClusterContext(clusterId); Iaas iaas = iaasProvider.getIaas(); String publicIp = null; @@ -1015,7 +970,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { } } - dataHolder.addMemberContext(memberContext); + cloudControllerContext.addMemberContext(memberContext); // persist in registry persist(); @@ -1067,7 +1022,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { handleNullObject(clusterId, "Instance termination failed. Cluster id is null."); - List<MemberContext> ctxts = dataHolder.getMemberContextsOfClusterId(clusterId); + List<MemberContext> ctxts = cloudControllerContext.getMemberContextsOfClusterId(clusterId); if (ctxts == null) { String msg = "Instance termination failed. No members found for cluster id: " + clusterId; @@ -1129,7 +1084,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { private void detachVolume(IaasProvider iaasProvider, MemberContext ctxt) { String clusterId = ctxt.getClusterId(); - ClusterContext clusterCtxt = dataHolder.getClusterContext(clusterId); + ClusterContext clusterCtxt = cloudControllerContext.getClusterContext(clusterId); if (clusterCtxt.getVolumes() != null) { for (Volume volume : clusterCtxt.getVolumes()) { try { @@ -1171,7 +1126,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { null); // update data holders - dataHolder.removeMemberContext(memberContext.getMemberId(), memberContext.getClusterId()); + cloudControllerContext.removeMemberContext(memberContext.getMemberId(), memberContext.getClusterId()); // persist persist(); @@ -1195,7 +1150,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { handleNullObject(hostName, "Service registration failed. Hostname is null."); Cartridge cartridge = null; - if ((cartridge = dataHolder.getCartridge(cartridgeType)) == null) { + if ((cartridge = cloudControllerContext.getCartridge(cartridgeType)) == null) { String msg = "Registration of cluster: " + clusterId + " failed. - Unregistered Cartridge type: " + cartridgeType; @@ -1212,7 +1167,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { payload, hostName, props, isLb, registrant.getPersistence()); - dataHolder.addClusterContext(ctxt);*/ + cloudControllerContext.addClusterContext(ctxt);*/ TopologyBuilder.handleClusterCreated(registrant, isLb); persist(); @@ -1254,7 +1209,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { @Override public String[] getRegisteredCartridges() { // get the list of cartridges registered - List<Cartridge> cartridges = dataHolder + List<Cartridge> cartridges = cloudControllerContext .getCartridges(); if (cartridges == null) { @@ -1282,7 +1237,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { @Override public CartridgeInfo getCartridgeInfo(String cartridgeType) throws UnregisteredCartridgeException { - Cartridge cartridge = dataHolder + Cartridge cartridge = cloudControllerContext .getCartridge(cartridgeType); if (cartridge != null) { @@ -1301,13 +1256,13 @@ public class CloudControllerServiceImpl implements CloudControllerService { public void unregisterService(String clusterId) throws UnregisteredClusterException { final String clusterId_ = clusterId; - ClusterContext ctxt = dataHolder.getClusterContext(clusterId_); + ClusterContext ctxt = cloudControllerContext.getClusterContext(clusterId_); handleNullObject(ctxt, "Service unregistration failed. Invalid cluster id: " + clusterId); String cartridgeType = ctxt.getCartridgeType(); - Cartridge cartridge = dataHolder.getCartridge(cartridgeType); + Cartridge cartridge = cloudControllerContext.getCartridge(cartridgeType); if (cartridge == null) { String msg = @@ -1322,12 +1277,12 @@ public class CloudControllerServiceImpl implements CloudControllerService { } else { -// TopologyBuilder.handleClusterMaintenanceMode(dataHolder.getClusterContext(clusterId_)); +// TopologyBuilder.handleClusterMaintenanceMode(cloudControllerContext.getClusterContext(clusterId_)); Runnable terminateInTimeout = new Runnable() { @Override public void run() { - ClusterContext ctxt = dataHolder.getClusterContext(clusterId_); + ClusterContext ctxt = cloudControllerContext.getClusterContext(clusterId_); if (ctxt == null) { String msg = "Service unregistration failed. Cluster not found: " + clusterId_; LOG.error(msg); @@ -1366,7 +1321,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { }; Runnable unregister = new Runnable() { public void run() { - ClusterContext ctxt = dataHolder.getClusterContext(clusterId_); + ClusterContext ctxt = cloudControllerContext.getClusterContext(clusterId_); if (ctxt == null) { String msg = "Service unregistration failed. Cluster not found: " + clusterId_; LOG.error(msg); @@ -1389,12 +1344,12 @@ public class CloudControllerServiceImpl implements CloudControllerService { private void deleteVolumes(ClusterContext ctxt) { if (ctxt.isVolumeRequired()) { - Cartridge cartridge = dataHolder.getCartridge(ctxt.getCartridgeType()); + Cartridge cartridge = cloudControllerContext.getCartridge(ctxt.getCartridgeType()); if (cartridge != null && cartridge.getIaases() != null && ctxt.getVolumes() != null) { for (Volume volume : ctxt.getVolumes()) { if (volume.getId() != null) { String iaasType = volume.getIaasType(); - //Iaas iaas = dataHolder.getIaasProvider(iaasType).getIaas(); + //Iaas iaas = cloudControllerContext.getIaasProvider(iaasType).getIaas(); Iaas iaas = cartridge.getIaasProvider(iaasType).getIaas(); if (iaas != null) { try { @@ -1442,17 +1397,13 @@ public class CloudControllerServiceImpl implements CloudControllerService { public boolean validateDeploymentPolicy(String cartridgeType, Partition[] partitions) throws InvalidPartitionException, InvalidCartridgeTypeException { - Map<String, List<String>> validatedCache = dataHolder.getCartridgeTypeToPartitionIds(); - List<String> validatedPartitions = new ArrayList<String>(); - - if (validatedCache.containsKey(cartridgeType)) { + List<String> validatedPartitions = CloudControllerContext.getInstance().getPartitionIds(cartridgeType); + if (validatedPartitions != null) { // cache hit for this cartridge // get list of partitions - validatedPartitions = validatedCache.get(cartridgeType); if (LOG.isDebugEnabled()) { LOG.debug("Partition validation cache hit for cartridge type: " + cartridgeType); } - } Map<String, IaasProvider> partitionToIaasProviders = @@ -1462,7 +1413,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { LOG.debug("Deployment policy validation started for cartridge type: " + cartridgeType); } - Cartridge cartridge = dataHolder.getCartridge(cartridgeType); + Cartridge cartridge = cloudControllerContext.getCartridge(cartridgeType); if (cartridge == null) { String msg = "Invalid Cartridge Type: " + cartridgeType; @@ -1482,7 +1433,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { Callable<IaasProvider> worker = new PartitionValidatorCallable( partition, cartridge); Future<IaasProvider> job = CloudControllerContext.getInstance() - .getExecutor().submit(worker); + .getExecutorService().submit(worker); jobList.put(partition.getId(), job); } @@ -1498,7 +1449,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { partitionToIaasProviders.put(partitionId, job.get()); // add to cache - this.dataHolder.addToCartridgeTypeToPartitionIdMap(cartridgeType, partitionId); + this.cloudControllerContext.addToCartridgeTypeToPartitionIdMap(cartridgeType, partitionId); if (LOG.isDebugEnabled()) { LOG.debug("Partition " + partitionId + " added to the cache against cartridge type: " + cartridgeType); @@ -1522,10 +1473,10 @@ public class CloudControllerServiceImpl implements CloudControllerService { } private void onClusterRemoval(final String clusterId) { - ClusterContext ctxt = dataHolder.getClusterContext(clusterId); + ClusterContext ctxt = cloudControllerContext.getClusterContext(clusterId); TopologyBuilder.handleClusterRemoved(ctxt); - dataHolder.removeClusterContext(clusterId); - dataHolder.removeMemberContextsOfCluster(clusterId); + cloudControllerContext.removeClusterContext(clusterId); + cloudControllerContext.removeMemberContextsOfCluster(clusterId); persist(); } @@ -1570,7 +1521,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { public ClusterContext getClusterContext(String clusterId) { - return dataHolder.getClusterContext(clusterId); + return cloudControllerContext.getClusterContext(clusterId); } @Override @@ -1590,12 +1541,12 @@ public class CloudControllerServiceImpl implements CloudControllerService { LOG.debug("Received a container spawn request : " + containerClusterContext.toString()); } - ClusterContext ctxt = dataHolder.getClusterContext(clusterId); + ClusterContext ctxt = cloudControllerContext.getClusterContext(clusterId); handleNullObject(ctxt, "Container start-up failed. Invalid cluster id. " + containerClusterContext.toString()); String cartridgeType = ctxt.getCartridgeType(); - Cartridge cartridge = dataHolder.getCartridge(cartridgeType); + Cartridge cartridge = cloudControllerContext.getCartridge(cartridgeType); if (cartridge == null) { String msg = @@ -1647,7 +1598,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { allocatedServiceHostPortProp.setName(StratosConstants.ALLOCATED_SERVICE_HOST_PORT); allocatedServiceHostPortProp.setValue(String.valueOf(service.getPort())); ctxt.getProperties().addProperty(allocatedServiceHostPortProp); - dataHolder.addClusterContext(ctxt); + cloudControllerContext.addClusterContext(ctxt); if (LOG.isDebugEnabled()) { LOG.debug("Cloud Controller successfully started the service " @@ -1700,14 +1651,14 @@ public class CloudControllerServiceImpl implements CloudControllerService { .getProperties(), StratosConstants.ALLOCATED_SERVICE_HOST_PORT, String.valueOf(service.getPort()))); - dataHolder.addMemberContext(context); + cloudControllerContext.addMemberContext(context); // wait till Pod status turns to running and send member spawned. ScheduledThreadExecutor exec = ScheduledThreadExecutor.getInstance(); if (LOG.isDebugEnabled()) { LOG.debug("Cloud Controller is starting the instance start up thread."); } - dataHolder.addScheduledFutureJob(context.getMemberId(), exec.schedule(new PodActivationWatcher(pod.getId(), context, kubApi), 5000)); + cloudControllerContext.addScheduledFutureJob(context.getMemberId(), exec.schedule(new PodActivationWatcher(pod.getId(), context, kubApi), 5000)); memberContexts.add(context); } @@ -1745,18 +1696,18 @@ public class CloudControllerServiceImpl implements CloudControllerService { String kubernetesClusterId, String kubernetesMasterIp, String kubernetesPortRange) { - KubernetesClusterContext origCtxt = dataHolder.getKubernetesClusterContext(kubernetesClusterId); + KubernetesClusterContext origCtxt = cloudControllerContext.getKubernetesClusterContext(kubernetesClusterId); KubernetesClusterContext newCtxt = new KubernetesClusterContext(kubernetesClusterId, kubernetesPortRange, kubernetesMasterIp); if (origCtxt == null) { - dataHolder.addKubernetesClusterContext(newCtxt); + cloudControllerContext.addKubernetesClusterContext(newCtxt); return newCtxt; } if (!origCtxt.equals(newCtxt)) { // if for some reason master IP etc. have changed newCtxt.setAvailableHostPorts(origCtxt.getAvailableHostPorts()); - dataHolder.addKubernetesClusterContext(newCtxt); + cloudControllerContext.addKubernetesClusterContext(newCtxt); return newCtxt; } else { return origCtxt; @@ -1767,7 +1718,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { public MemberContext[] terminateAllContainers(String clusterId) throws InvalidClusterException { - ClusterContext ctxt = dataHolder.getClusterContext(clusterId); + ClusterContext ctxt = cloudControllerContext.getClusterContext(clusterId); handleNullObject(ctxt, "Kubernetes units temrination failed. Invalid cluster id. " + clusterId); String kubernetesClusterId = CloudControllerUtil.getProperty(ctxt.getProperties(), @@ -1775,7 +1726,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { handleNullObject(kubernetesClusterId, "Kubernetes units termination failed. Cannot find '" + StratosConstants.KUBERNETES_CLUSTER_ID + "'. " + ctxt); - KubernetesClusterContext kubClusterContext = dataHolder.getKubernetesClusterContext(kubernetesClusterId); + KubernetesClusterContext kubClusterContext = cloudControllerContext.getKubernetesClusterContext(kubernetesClusterId); handleNullObject(kubClusterContext, "Kubernetes units termination failed. Cannot find a matching Kubernetes Cluster for cluster id: " + kubernetesClusterId); @@ -1838,7 +1789,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { + StratosConstants.ALLOCATED_SERVICE_HOST_PORT); } - List<MemberContext> membersToBeRemoved = dataHolder.getMemberContextsOfClusterId(clusterId); + List<MemberContext> membersToBeRemoved = cloudControllerContext.getMemberContextsOfClusterId(clusterId); for (MemberContext memberContext : membersToBeRemoved) { logTermination(memberContext); @@ -1858,12 +1809,12 @@ public class CloudControllerServiceImpl implements CloudControllerService { LOG.debug("CloudControllerServiceImpl:updateContainers for cluster : " + clusterId); } - ClusterContext ctxt = dataHolder.getClusterContext(clusterId); + ClusterContext ctxt = cloudControllerContext.getClusterContext(clusterId); handleNullObject(ctxt, "Container update failed. Invalid cluster id. " + clusterId); String cartridgeType = ctxt.getCartridgeType(); - Cartridge cartridge = dataHolder.getCartridge(cartridgeType); + Cartridge cartridge = cloudControllerContext.getCartridge(cartridgeType); if (cartridge == null) { String msg = @@ -1876,7 +1827,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { try { String kubernetesClusterId = validateProperty(StratosConstants.KUBERNETES_CLUSTER_ID, ctxt); - KubernetesClusterContext kubClusterContext = dataHolder.getKubernetesClusterContext(kubernetesClusterId); + KubernetesClusterContext kubClusterContext = cloudControllerContext.getKubernetesClusterContext(kubernetesClusterId); if (kubClusterContext == null) { String msg = @@ -1936,7 +1887,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { for (Pod pod : allPods) { MemberContext context; // if member context does not exist -> a new member (scale up) - if ((context = dataHolder.getMemberContextOfMemberId(pod.getId())) == null) { + if ((context = cloudControllerContext.getMemberContextOfMemberId(pod.getId())) == null) { context = podToMemberContextFunc.apply(pod); context.setCartridgeType(cartridgeType); @@ -1952,7 +1903,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { if (LOG.isDebugEnabled()) { LOG.debug("Cloud Controller is starting the instance start up thread."); } - dataHolder.addScheduledFutureJob(context.getMemberId(), exec.schedule(new PodActivationWatcher(pod.getId(), context, kubApi), 5000)); + cloudControllerContext.addScheduledFutureJob(context.getMemberId(), exec.schedule(new PodActivationWatcher(pod.getId(), context, kubApi), 5000)); memberContexts.add(context); @@ -1969,7 +1920,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { List<Pod> difference = ListUtils.subtract(Arrays.asList(previousStatePods), Arrays.asList(allPods)); for (Pod pod : difference) { if (pod != null) { - MemberContext context = dataHolder.getMemberContextOfMemberId(pod.getId()); + MemberContext context = cloudControllerContext.getMemberContextOfMemberId(pod.getId()); logTermination(context); memberContexts.add(context); } @@ -2001,7 +1952,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { handleNullObject(memberId, "Failed to terminate member. Invalid Member id. [Member id] " + memberId); - MemberContext memberContext = dataHolder.getMemberContextOfMemberId(memberId); + MemberContext memberContext = cloudControllerContext.getMemberContextOfMemberId(memberId); handleNullObject(memberContext, "Failed to terminate member. Member id not found. [Member id] " + memberId); @@ -2009,7 +1960,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { handleNullObject(clusterId, "Failed to terminate member. Cluster id is null. [Member id] " + memberId); - ClusterContext ctxt = dataHolder.getClusterContext(clusterId); + ClusterContext ctxt = cloudControllerContext.getClusterContext(clusterId); handleNullObject(ctxt, String.format("Failed to terminate member [Member id] %s. Invalid cluster id %s ", memberId, clusterId)); @@ -2020,7 +1971,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { handleNullObject(kubernetesClusterId, String.format("Failed to terminate member [Member id] %s. Cannot find '" + StratosConstants.KUBERNETES_CLUSTER_ID + "' in [cluster context] %s ", memberId, ctxt)); - KubernetesClusterContext kubClusterContext = dataHolder.getKubernetesClusterContext(kubernetesClusterId); + KubernetesClusterContext kubClusterContext = cloudControllerContext.getKubernetesClusterContext(kubernetesClusterId); handleNullObject(kubClusterContext, String.format("Failed to terminate member [Member id] %s. Cannot find a matching Kubernetes Cluster in [cluster context] %s ", memberId, ctxt)); @@ -2030,7 +1981,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { // member id = pod id kubApi.deletePod(memberId); - MemberContext memberToBeRemoved = dataHolder.getMemberContextOfMemberId(memberId); + MemberContext memberToBeRemoved = cloudControllerContext.getMemberContextOfMemberId(memberId); logTermination(memberToBeRemoved); @@ -2064,7 +2015,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { for (ApplicationClusterContext appClusterCtxt : appClustersContexts) { - dataHolder.addClusterContext(new ClusterContext(appClusterCtxt.getClusterId(), + cloudControllerContext.addClusterContext(new ClusterContext(appClusterCtxt.getClusterId(), appClusterCtxt.getCartridgeType(), appClusterCtxt.getTextPayload(), appClusterCtxt.getHostName(), appClusterCtxt.isLbCluster(), appClusterCtxt.getProperties())); // create Cluster objects @@ -2074,7 +2025,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { newCluster.setTenantRange(appClusterCtxt.getTenantRange()); //newCluster.setStatus(ClusterStatus.Created, null); newCluster.setHostNames(Arrays.asList(appClusterCtxt.getHostName())); - Cartridge cartridge = dataHolder.getCartridge(appClusterCtxt.getCartridgeType()); + Cartridge cartridge = cloudControllerContext.getCartridge(appClusterCtxt.getCartridgeType()); if(cartridge.getDeployerType() != null && cartridge.getDeployerType().equals(StratosConstants.KUBERNETES_DEPLOYER_TYPE)) { newCluster.setKubernetesCluster(true); @@ -2107,7 +2058,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { // // // Create a Cluster Context obj. for each of the Clusters in the Application // for (ApplicationClusterContext applicationClusterContext : applicationParser.getApplicationClusterContexts()) { -// dataHolder.addClusterContext(new ClusterContext(applicationClusterContext.getClusterId(), +// cloudControllerContext.addClusterContext(new ClusterContext(applicationClusterContext.getClusterId(), // applicationClusterContext.getCartridgeType(), applicationClusterContext.getTextPayload(), // applicationClusterContext.getHostName(), applicationClusterContext.isLbCluster())); // } @@ -2129,7 +2080,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { // // // Create a Cluster Context obj. for each of the Clusters in the Application // for (ApplicationClusterContext applicationClusterContext : applicationParser.getApplicationClusterContexts()) { -// dataHolder.addClusterContext(new ClusterContext(applicationClusterContext.getClusterId(), +// cloudControllerContext.addClusterContext(new ClusterContext(applicationClusterContext.getClusterId(), // applicationClusterContext.getCartridgeType(), applicationClusterContext.getTextPayload(), // applicationClusterContext.getHostName(), applicationClusterContext.isLbCluster())); // } http://git-wip-us.apache.org/repos/asf/stratos/blob/403d5a45/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerUtil.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerUtil.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerUtil.java index 8b88301..40b2459 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerUtil.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerUtil.java @@ -343,16 +343,15 @@ public class CloudControllerUtil { public static void persistTopology(Topology topology) { try { - RegistryManager.getInstance().persistTopology(topology); + RegistryManager.getInstance().persist(CloudControllerConstants.TOPOLOGY_RESOURCE, topology); } catch (RegistryException e) { - String msg = "Failed to persist the Topology in registry. "; log.fatal(msg, e); } } public static Topology retrieveTopology() { - Object obj = RegistryManager.getInstance().retrieveTopology(); + Object obj = RegistryManager.getInstance().read(CloudControllerConstants.TOPOLOGY_RESOURCE); if (obj != null) { try { Object dataObj = Deserializer @@ -367,7 +366,6 @@ public class CloudControllerUtil { log.warn(msg, e); } } - return null; }
