Updated Branches: refs/heads/master a6f454914 -> d4a6119f9
adding file based topology persistence Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/7299b9d0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/7299b9d0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/7299b9d0 Branch: refs/heads/master Commit: 7299b9d031fa66845170ad1050daaff5753e995d Parents: 96db316 Author: Nirmal Fernando <[email protected]> Authored: Mon Dec 2 22:31:40 2013 +0530 Committer: Nirmal Fernando <[email protected]> Committed: Mon Dec 2 22:31:40 2013 +0530 ---------------------------------------------------------------------- .../impl/CloudControllerServiceImpl.java | 6 +- .../internal/CloudControllerDSComponent.java | 69 +++++++++++++----- .../runtime/FasterLookUpDataHolder.java | 4 +- .../controller/topology/TopologyManager.java | 75 ++++++++++++-------- .../controller/util/CloudControllerUtil.java | 37 ++++++++++ .../controller/util/ServiceReferenceHolder.java | 10 +++ 6 files changed, 151 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7299b9d0/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java index 373c5df..e934c3d 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java @@ -42,8 +42,12 @@ import org.apache.stratos.cloud.controller.runtime.FasterLookUpDataHolder; import org.apache.stratos.cloud.controller.topic.TopologySynchronizerTask; import org.apache.stratos.cloud.controller.topology.TopologyBuilder; import org.apache.stratos.cloud.controller.topology.TopologyEventMessageDelegator; +import org.apache.stratos.cloud.controller.topology.TopologyListener; import org.apache.stratos.cloud.controller.util.*; import org.apache.stratos.cloud.controller.validate.interfaces.PartitionValidator; +import org.apache.stratos.messaging.broker.publish.EventPublisher; +import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber; +import org.apache.stratos.messaging.util.Constants; import org.jclouds.compute.ComputeService; import org.jclouds.compute.domain.NodeMetadata; import org.jclouds.compute.domain.Template; @@ -74,7 +78,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { // acquire serialized data from registry acquireData(); - + // gets the task service TaskService taskService = ServiceReferenceHolder .getInstance().getTaskService(); http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7299b9d0/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java index ada91f0..ad1c5d5 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java @@ -38,13 +38,14 @@ import org.osgi.service.component.ComponentContext; import org.wso2.carbon.ntask.core.service.TaskService; import org.wso2.carbon.registry.core.exceptions.RegistryException; import org.wso2.carbon.registry.core.service.RegistryService; +import org.wso2.carbon.utils.ConfigurationContextService; import java.util.List; /** * Registering Cloud Controller Service. * - * @scr.component name="org.wso2.carbon.stratos.cloud.controller" immediate="true" + * @scr.component name="org.apache.stratos.cloud.controller" immediate="true" * @scr.reference name="ntask.component" interface="org.wso2.carbon.ntask.core.service.TaskService" * cardinality="1..1" policy="dynamic" bind="setTaskService" * unbind="unsetTaskService" @@ -52,8 +53,17 @@ import java.util.List; * interface= * "org.wso2.carbon.registry.core.service.RegistryService" * cardinality="1..1" policy="dynamic" bind="setRegistryService" - * unbind="unsetRegistryService" + * unbind="unsetRegistryService" + * @scr.reference name="config.context.service" + * interface="org.wso2.carbon.utils.ConfigurationContextService" + * cardinality="1..1" policy="dynamic" + * bind="setConfigurationContextService" + * unbind="unsetConfigurationContextService" */ +//* @scr.reference name="org.apache.stratos.cloud.controller.deployers" +//* interface="org.apache.stratos.cloud.controller.interfaces.CloudControllerDeployerService" +//* cardinality="1..1" policy="dynamic" bind="setCloudControllerDeployerService" +//* unbind="unsetCloudControllerDeployerService" public class CloudControllerDSComponent { private static final Log log = LogFactory.getLog(CloudControllerDSComponent.class); @@ -61,27 +71,42 @@ public class CloudControllerDSComponent { protected void activate(ComponentContext context) { try { - // get all the topics - comma separated list - String topicsString = dataHolder.getTopologyConfig().getProperty(CloudControllerConstants.TOPICS_PROPERTY); - - if(topicsString == null || topicsString.isEmpty()) { - topicsString = Constants.TOPOLOGY_TOPIC; - } - - String[] topics = topicsString.split(","); + + // register deployers of CC +// AxisConfiguration axisConfig = ServiceReferenceHolder.getInstance().getAxisConfiguration(); +// +// if(axisConfig == null) { +// String msg = "Axis Configuration is null. Cannot register deployers."; +// log.error(msg); +// throw new CloudControllerException(msg); +// } +// +// DeploymentEngine deploymentEngine = (DeploymentEngine) axisConfig.getConfigurator(); +// Deployer cloudControllerDeployer = new CloudControllerDeployer(); +// Deployer cartridgeDeployer = new CartridgeDeployer(); +// deploymentEngine.addDeployer(cloudControllerDeployer, "../../conf", "xml"); +// deploymentEngine.addDeployer(cartridgeDeployer, "cartridges", "xml"); - // initialize the topic publishers - for (String topic : topics) { - - dataHolder.addEventPublisher(new EventPublisher(topic), topic); - } + // get all the topics - comma separated list + String topicsString = dataHolder.getTopologyConfig().getProperty(CloudControllerConstants.TOPICS_PROPERTY); + + if(topicsString == null || topicsString.isEmpty()) { + topicsString = Constants.TOPOLOGY_TOPIC; + } + + String[] topics = topicsString.split(","); + for (String topic : topics) { + + dataHolder.addEventPublisher(new EventPublisher(topic), topic); + } //initialting the subscriber TopicSubscriber subscriber = new TopicSubscriber(CloudControllerConstants.INSTANCE_TOPIC); subscriber.setMessageListener(new TopologyListener()); Thread tsubscriber = new Thread(subscriber); - tsubscriber.start(); - + tsubscriber.start(); + + // initialize the topic publishers BundleContext bundleContext = context.getBundleContext(); bundleContext.registerService(CloudControllerService.class.getName(), new CloudControllerServiceImpl(), null); @@ -128,6 +153,15 @@ public class CloudControllerDSComponent { ServiceReferenceHolder.getInstance().setRegistry(null); } + protected void setConfigurationContextService(ConfigurationContextService cfgCtxService) { + ServiceReferenceHolder.getInstance().setAxisConfiguration( + cfgCtxService.getServerConfigContext().getAxisConfiguration()); + } + + protected void unsetConfigurationContextService(ConfigurationContextService cfgCtxService) { + ServiceReferenceHolder.getInstance().setAxisConfiguration(null); + } + protected void deactivate(ComponentContext ctx) { List<EventPublisher> publishers = dataHolder.getAllEventPublishers(); @@ -135,4 +169,5 @@ public class CloudControllerDSComponent { topicPublisher.close(); } } + } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7299b9d0/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/runtime/FasterLookUpDataHolder.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/runtime/FasterLookUpDataHolder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/runtime/FasterLookUpDataHolder.java index fda88a0..558a382 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/runtime/FasterLookUpDataHolder.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/runtime/FasterLookUpDataHolder.java @@ -100,9 +100,9 @@ public class FasterLookUpDataHolder implements Serializable{ private String serializationDir; private boolean enableBAMDataPublisher; - private DataPublisherConfig dataPubConfig; + private transient DataPublisherConfig dataPubConfig; private boolean enableTopologySync; - private TopologyConfig topologyConfig; + private transient TopologyConfig topologyConfig; /** * Key - node id http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7299b9d0/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyManager.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyManager.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyManager.java index 7610dbb..098ad49 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyManager.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyManager.java @@ -19,14 +19,17 @@ package org.apache.stratos.cloud.controller.topology; import com.google.gson.Gson; + import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.cloud.controller.exception.CloudControllerException; import org.apache.stratos.cloud.controller.util.CloudControllerConstants; +import org.apache.stratos.cloud.controller.util.CloudControllerUtil; import org.apache.stratos.messaging.domain.topology.Topology; import javax.jms.TextMessage; + import java.io.File; import java.io.IOException; import java.util.concurrent.BlockingQueue; @@ -83,24 +86,29 @@ public class TopologyManager { synchronized (TopologyManager.class) { if(this.topology == null) { //need to initialize the topology -// if(this.topologyFile.exists()) { -// try { -// currentContent = FileUtils.readFileToString(this.topologyFile); -// Gson gson = new Gson(); -// this.topology = gson.fromJson(currentContent, Topology.class); -// if(log.isDebugEnabled()) { -// log.debug("The current topology is: " + currentContent); -// } -// } catch (IOException e) { -// log.error(e.getMessage()); -// throw new CloudControllerException(e.getMessage(), e); -// } -// } else { +// this.topology = CloudControllerUtil.retrieve(); +// if (this.topology == null) { + +// } + if(this.topologyFile.exists()) { + try { + currentContent = FileUtils.readFileToString(this.topologyFile); + Gson gson = new Gson(); + this.topology = gson.fromJson(currentContent, Topology.class); + if(log.isDebugEnabled()) { + log.debug("The current topology is: " + currentContent); + } + } catch (IOException e) { + log.error(e.getMessage()); + throw new CloudControllerException(e.getMessage(), e); + } + + } else { if(log.isDebugEnabled()) { log.debug("Creating new topology"); } this.topology = new Topology(); -// } + } } } if(log.isDebugEnabled()) { @@ -112,22 +120,29 @@ public class TopologyManager { public synchronized void updateTopology(Topology topology) { synchronized (TopologyManager.class) { this.topology = topology; -// if (this.topologyFile.exists()) { -// this.backup.delete(); -// this.topologyFile.renameTo(backup); -// } -// Gson gson = new Gson(); -// String message = gson.toJson(topology); -// // overwrite the topology file -// try { -// FileUtils.writeStringToFile(this.topologyFile, message); -// if(log.isDebugEnabled()) { -// log.debug("The updated topology is: " + message); -// } -// } catch (IOException e) { -// log.error(e.getMessage()); -// throw new CloudControllerException(e.getMessage(), e); -// } + CloudControllerUtil.persist(this.topology); + if (log.isDebugEnabled()) { + Gson gson = new Gson(); + String message = gson.toJson(topology); + log.debug("Topology got updated. Full Topology: "+message); + } + + if (this.topologyFile.exists()) { + this.backup.delete(); + this.topologyFile.renameTo(backup); + } + Gson gson = new Gson(); + String message = gson.toJson(topology); + // overwrite the topology file + try { + FileUtils.writeStringToFile(this.topologyFile, message); + if(log.isDebugEnabled()) { + log.debug("The updated topology is: " + message); + } + } catch (IOException e) { + log.error(e.getMessage()); + throw new CloudControllerException(e.getMessage(), e); + } } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7299b9d0/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerUtil.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerUtil.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerUtil.java index 4b77fc5..22e5903 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerUtil.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerUtil.java @@ -21,11 +21,15 @@ package org.apache.stratos.cloud.controller.util; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.cloud.controller.exception.CloudControllerException; +import org.apache.stratos.cloud.controller.persist.Deserializer; import org.apache.stratos.cloud.controller.pojo.AppType; import org.apache.stratos.cloud.controller.pojo.Cartridge; import org.apache.stratos.cloud.controller.pojo.CartridgeInfo; import org.apache.stratos.cloud.controller.pojo.PortMapping; import org.apache.stratos.cloud.controller.pojo.Property; +import org.apache.stratos.cloud.controller.registry.RegistryManager; +import org.apache.stratos.messaging.domain.topology.Topology; +import org.wso2.carbon.registry.core.exceptions.RegistryException; import java.util.ArrayList; import java.util.Iterator; @@ -108,6 +112,39 @@ public class CloudControllerUtil { return javaProps; } + + public static void persist(Topology topology) { + try { + RegistryManager.getInstance().persistTopology(topology); + } catch (RegistryException e) { + + String msg = "Failed to persist the Topology in registry. "; + log.fatal(msg, e); +// throw new CloudControllerException(msg, e); + } + } + + public static Topology retrieve() { + + Object obj = RegistryManager.getInstance().retrieveTopology(); + if (obj != null) { + try { + Object dataObj = Deserializer + .deserializeFromByteArray((byte[]) obj); + if(dataObj instanceof Topology) { + return (Topology) dataObj; + } else { + return null; + } + } catch (Exception e) { + String msg = "Unable to retrieve data from Registry. Hence, any historical data will not get reflected."; + log.warn(msg, e); + } + } + + return null; + } + public static void handleException(String msg, Exception e){ log.error(msg, e); http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/7299b9d0/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/ServiceReferenceHolder.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/ServiceReferenceHolder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/ServiceReferenceHolder.java index 8bcebb1..cc0718d 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/ServiceReferenceHolder.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/ServiceReferenceHolder.java @@ -18,6 +18,7 @@ */ package org.apache.stratos.cloud.controller.util; +import org.apache.axis2.engine.AxisConfiguration; import org.wso2.carbon.ntask.core.service.TaskService; import org.wso2.carbon.registry.core.Registry; import org.wso2.carbon.registry.core.session.UserRegistry; @@ -30,6 +31,7 @@ public class ServiceReferenceHolder { private static ServiceReferenceHolder instance; private TaskService taskService; private Registry registry; + private AxisConfiguration axisConfiguration; private ServiceReferenceHolder() { } @@ -41,6 +43,14 @@ public class ServiceReferenceHolder { return instance; } + public void setAxisConfiguration(AxisConfiguration axisConfiguration) { + this.axisConfiguration = axisConfiguration; + } + + public AxisConfiguration getAxisConfiguration() { + return axisConfiguration; + } + public TaskService getTaskService() { return taskService; }
