Updated Branches: refs/heads/master 4cd05fe38 -> c03a42a6e
Fixed cloud controller topology manager singleton model Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/8c72f145 Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/8c72f145 Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/8c72f145 Branch: refs/heads/master Commit: 8c72f1452c666b31e4f44b55a4bea4f27fe7b8d2 Parents: dfc250e Author: Imesh Gunaratne <[email protected]> Authored: Wed Dec 25 15:24:33 2013 +0530 Committer: Imesh Gunaratne <[email protected]> Committed: Wed Dec 25 15:24:33 2013 +0530 ---------------------------------------------------------------------- .../impl/CloudControllerServiceImpl.java | 4 +- .../internal/CloudControllerDSComponent.java | 2 +- .../topic/TopologySynchronizerTask.java | 64 ----------- .../InstanceStatusEventMessageDelegator.java | 68 ++++++++++++ .../InstanceStatusEventMessageListener.java | 43 ++++++++ .../status/InstanceStatusEventMessageQueue.java | 44 ++++++++ .../InstanceStatusEventMessageDelegator.java | 66 ----------- .../InstanceStatusEventMessageListener.java | 42 ------- .../controller/topology/TopologyBuilder.java | 68 ++++++------ .../controller/topology/TopologyManager.java | 109 +++++++++---------- .../topology/TopologySynchronizerTask.java | 62 +++++++++++ 11 files changed, 306 insertions(+), 266 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/8c72f145/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java index 2f8e278..2340abe 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java @@ -31,8 +31,8 @@ import org.apache.stratos.cloud.controller.pojo.*; import org.apache.stratos.cloud.controller.publisher.CartridgeInstanceDataPublisherTask; import org.apache.stratos.cloud.controller.registry.RegistryManager; import org.apache.stratos.cloud.controller.runtime.FasterLookUpDataHolder; -import org.apache.stratos.cloud.controller.topic.TopologySynchronizerTask; -import org.apache.stratos.cloud.controller.topology.InstanceStatusEventMessageDelegator; +import org.apache.stratos.cloud.controller.topology.TopologySynchronizerTask; +import org.apache.stratos.cloud.controller.topic.instance.status.InstanceStatusEventMessageDelegator; import org.apache.stratos.cloud.controller.topology.TopologyBuilder; import org.apache.stratos.cloud.controller.util.CloudControllerConstants; import org.apache.stratos.cloud.controller.util.CloudControllerUtil; http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/8c72f145/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java index 23d07b5..c3fe066 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java @@ -27,7 +27,7 @@ import org.apache.stratos.cloud.controller.exception.CloudControllerException; import org.apache.stratos.cloud.controller.impl.CloudControllerServiceImpl; import org.apache.stratos.cloud.controller.interfaces.CloudControllerService; import org.apache.stratos.cloud.controller.runtime.FasterLookUpDataHolder; -import org.apache.stratos.cloud.controller.topology.InstanceStatusEventMessageListener; +import org.apache.stratos.cloud.controller.topic.instance.status.InstanceStatusEventMessageListener; import org.apache.stratos.cloud.controller.util.CloudControllerConstants; import org.apache.stratos.cloud.controller.util.ServiceReferenceHolder; import org.apache.stratos.messaging.broker.publish.EventPublisher; http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/8c72f145/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/TopologySynchronizerTask.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/TopologySynchronizerTask.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/TopologySynchronizerTask.java deleted file mode 100644 index 6f3dc45..0000000 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/TopologySynchronizerTask.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.cloud.controller.topic; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.cloud.controller.runtime.FasterLookUpDataHolder; -import org.apache.stratos.cloud.controller.topology.TopologyEventSender; -import org.apache.stratos.cloud.controller.topology.TopologyManager; -import org.wso2.carbon.ntask.core.Task; - -import java.util.Map; - -public class TopologySynchronizerTask implements Task{ - private static final Log log = LogFactory.getLog(TopologySynchronizerTask.class); - - @Override - public void execute() { - if(FasterLookUpDataHolder.getInstance().isTopologySyncRunning()|| - // this is a temporary fix to avoid task execution - limitation with ntask - !FasterLookUpDataHolder.getInstance().getEnableTopologySync()){ - return; - } - - if (log.isDebugEnabled()) { - log.debug("TopologySynchronizerTask ..."); - } - - // publish to the topic - if (TopologyManager.getInstance().getTopology() != null) { - TopologyEventSender.sendCompleteTopologyEvent(TopologyManager.getInstance().getTopology()); - } - } - - @Override - public void init() { - - // this is a temporary fix to avoid task execution - limitation with ntask - if(!FasterLookUpDataHolder.getInstance().getEnableTopologySync()){ - log.debug("Topology Sync is disabled."); - return; - } - } - - @Override - public void setProperties(Map<String, String> arg0) {} - -} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/8c72f145/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageDelegator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageDelegator.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageDelegator.java new file mode 100644 index 0000000..1b94bbd --- /dev/null +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageDelegator.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.cloud.controller.topic.instance.status; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.cloud.controller.topology.TopologyBuilder; +import org.apache.stratos.cloud.controller.topology.TopologyManager; +import org.apache.stratos.messaging.event.instance.status.InstanceActivatedEvent; +import org.apache.stratos.messaging.event.instance.status.InstanceStartedEvent; +import org.apache.stratos.messaging.util.Constants; +import org.apache.stratos.messaging.util.Util; + +import javax.jms.TextMessage; + +public class InstanceStatusEventMessageDelegator implements Runnable { + private static final Log log = LogFactory.getLog(InstanceStatusEventMessageDelegator.class); + + @Override + public void run() { + log.info("Instance status event message delegator started"); + + while (true) { + try { + TextMessage message = InstanceStatusEventMessageQueue.getInstance().take(); + + // retrieve the header + String type = message.getStringProperty(Constants.EVENT_CLASS_NAME); + log.info(String.format("Instance status event message received from queue: %s", type)); + + if (InstanceStartedEvent.class.getName().equals(type)) { + // retrieve the actual message + String json = message.getText(); + TopologyBuilder.handleMemberStarted((InstanceStartedEvent) Util. + jsonToObject(json, InstanceStartedEvent.class)); + } else if (InstanceActivatedEvent.class.getName().equals(type)) { + // retrieve the actual message + String json = message.getText(); + TopologyBuilder.handleMemberActivated((InstanceActivatedEvent) Util. + jsonToObject(json, InstanceActivatedEvent.class)); + } else { + log.warn("Event message received is not InstanceStartedEvent or InstanceActivatedEvent"); + } + } catch (Exception e) { + String error = "Failed to retrieve the instance status event message"; + log.error(error, e); + // Commenting throwing the error. Otherwise thread will not execute if an exception is thrown. + //throw new RuntimeException(error, e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/8c72f145/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageListener.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageListener.java new file mode 100644 index 0000000..7f40036 --- /dev/null +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageListener.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.cloud.controller.topic.instance.status; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.cloud.controller.topology.TopologyManager; + +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.TextMessage; + +/** + * this is to handle the topology subscription + */ +public class InstanceStatusEventMessageListener implements MessageListener{ + private static final Log log = LogFactory.getLog(InstanceStatusEventMessageListener.class); + + @Override + public void onMessage(Message message) { + TextMessage receivedMessage = (TextMessage) message; + InstanceStatusEventMessageQueue.getInstance().add(receivedMessage); + if(log.isDebugEnabled()) { + log.debug(String.format("Instance status message added to queue: %s", message)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/8c72f145/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageQueue.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageQueue.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageQueue.java new file mode 100644 index 0000000..5c767ce --- /dev/null +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageQueue.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.cloud.controller.topic.instance.status; + +import javax.jms.TextMessage; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * Implements a blocking queue for managing instance status event messages. + */ +public class InstanceStatusEventMessageQueue extends LinkedBlockingQueue<TextMessage>{ + private static volatile InstanceStatusEventMessageQueue instance; + + private InstanceStatusEventMessageQueue(){ + } + + public static synchronized InstanceStatusEventMessageQueue getInstance() { + if (instance == null) { + synchronized (InstanceStatusEventMessageQueue.class){ + if (instance == null) { + instance = new InstanceStatusEventMessageQueue(); + } + } + } + return instance; + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/8c72f145/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/InstanceStatusEventMessageDelegator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/InstanceStatusEventMessageDelegator.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/InstanceStatusEventMessageDelegator.java deleted file mode 100644 index af94659..0000000 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/InstanceStatusEventMessageDelegator.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.cloud.controller.topology; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.messaging.event.instance.status.InstanceActivatedEvent; -import org.apache.stratos.messaging.event.instance.status.InstanceStartedEvent; -import org.apache.stratos.messaging.util.Constants; -import org.apache.stratos.messaging.util.Util; - -import javax.jms.TextMessage; - -public class InstanceStatusEventMessageDelegator implements Runnable { - private static final Log log = LogFactory.getLog(InstanceStatusEventMessageDelegator.class); - - @Override - public void run() { - log.info("Instance status event message delegator started"); - - while (true) { - try { - TextMessage message = TopologyManager.getInstance().getInstanceStatusMessageQueue().take(); - - // retrieve the header - String type = message.getStringProperty(Constants.EVENT_CLASS_NAME); - log.info(String.format("Instance status event message received from queue: %s", type)); - - if (InstanceStartedEvent.class.getName().equals(type)) { - // retrieve the actual message - String json = message.getText(); - TopologyBuilder.handleMemberStarted((InstanceStartedEvent) Util. - jsonToObject(json, InstanceStartedEvent.class)); - } else if (InstanceActivatedEvent.class.getName().equals(type)) { - // retrieve the actual message - String json = message.getText(); - TopologyBuilder.handleMemberActivated((InstanceActivatedEvent) Util. - jsonToObject(json, InstanceActivatedEvent.class)); - } else { - log.warn("Event message received is not InstanceStartedEvent or InstanceActivatedEvent"); - } - } catch (Exception e) { - String error = "Failed to retrieve the instance status event message"; - log.error(error, e); - // Commenting throwing the error. Otherwise thread will not execute if an exception is thrown. - //throw new RuntimeException(error, e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/8c72f145/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/InstanceStatusEventMessageListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/InstanceStatusEventMessageListener.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/InstanceStatusEventMessageListener.java deleted file mode 100644 index f363997..0000000 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/InstanceStatusEventMessageListener.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.cloud.controller.topology; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import javax.jms.Message; -import javax.jms.MessageListener; -import javax.jms.TextMessage; - -/** - * this is to handle the topology subscription - */ -public class InstanceStatusEventMessageListener implements MessageListener{ - private static final Log log = LogFactory.getLog(InstanceStatusEventMessageListener.class); - - @Override - public void onMessage(Message message) { - TextMessage receivedMessage = (TextMessage) message; - TopologyManager.getInstance().getInstanceStatusMessageQueue().add(receivedMessage); - if(log.isDebugEnabled()) { - log.debug(String.format("Instance status message added to queue: %s", message)); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/8c72f145/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java index ba6a49a..bc4da0f 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java @@ -46,13 +46,13 @@ public class TopologyBuilder { public static void handleServiceCreated(List<Cartridge> cartridgeList) { Service service; - Topology topology = TopologyManager.getInstance().getTopology(); + Topology topology = TopologyManager.getTopology(); if (cartridgeList == null) { throw new RuntimeException(String.format("Cartridge list is empty")); } try { - TopologyManager.getInstance().acquireWriteLock(); + TopologyManager.acquireWriteLock(); for (Cartridge cartridge : cartridgeList) { if (!topology.serviceExists(cartridge.getType())) { service = new Service(cartridge.getType(), cartridge.isMultiTenant() ? ServiceType.MultiTenant : ServiceType.SingleTenant); @@ -66,28 +66,28 @@ public class TopologyBuilder { service.addPort(port); } topology.addService(service); - TopologyManager.getInstance().updateTopology(topology); + TopologyManager.updateTopology(topology); } } } finally { - TopologyManager.getInstance().releaseWriteLock(); + TopologyManager.releaseWriteLock(); } TopologyEventSender.sendServiceCreateEvent(cartridgeList); } public static void handleServiceRemoved(List<Cartridge> cartridgeList) { - Topology topology = TopologyManager.getInstance().getTopology(); + Topology topology = TopologyManager.getTopology(); for (Cartridge cartridge : cartridgeList) { if (topology.getService(cartridge.getType()).getClusters().size() == 0) { if (topology.serviceExists(cartridge.getType())) { try { - TopologyManager.getInstance().acquireWriteLock(); + TopologyManager.acquireWriteLock(); topology.removeService(cartridge.getType()); - TopologyManager.getInstance().updateTopology(topology); + TopologyManager.updateTopology(topology); } finally { - TopologyManager.getInstance().releaseWriteLock(); + TopologyManager.releaseWriteLock(); } TopologyEventSender.sendServiceRemovedEvent(cartridgeList); } else { @@ -101,10 +101,10 @@ public class TopologyBuilder { } public static void handleClusterCreated(Registrant registrant) { - Topology topology = TopologyManager.getInstance().getTopology(); + Topology topology = TopologyManager.getTopology(); Service service; try { - TopologyManager.getInstance().acquireWriteLock(); + TopologyManager.acquireWriteLock(); String cartridgeType = registrant.getCartridgeType(); service = topology.getService(cartridgeType); Properties props = CloudControllerUtil.toJavaUtilProperties(registrant.getProperties()); @@ -134,16 +134,16 @@ public class TopologyBuilder { cluster.setLbCluster(isLb); service.addCluster(cluster); } - TopologyManager.getInstance().updateTopology(topology); + TopologyManager.updateTopology(topology); TopologyEventSender.sendClusterCreatedEvent(cartridgeType, clusterId, cluster); } finally { - TopologyManager.getInstance().releaseWriteLock(); + TopologyManager.releaseWriteLock(); } } public static void handleClusterRemoved(ClusterContext ctxt) { - Topology topology = TopologyManager.getInstance().getTopology(); + Topology topology = TopologyManager.getTopology(); Service service = topology.getService(ctxt.getCartridgeType()); if (service == null) { throw new RuntimeException(String.format("Service %s does not exist", @@ -157,11 +157,11 @@ public class TopologyBuilder { } try { - TopologyManager.getInstance().acquireWriteLock(); + TopologyManager.acquireWriteLock(); service.removeCluster(ctxt.getClusterId()); - TopologyManager.getInstance().updateTopology(topology); + TopologyManager.updateTopology(topology); } finally { - TopologyManager.getInstance().releaseWriteLock(); + TopologyManager.releaseWriteLock(); } TopologyEventSender.sendClusterRemovedEvent(ctxt); } @@ -169,7 +169,7 @@ public class TopologyBuilder { public static void handleMemberSpawned(String memberId, String serviceName, String clusterId, String networkPartitionId, String partitionId, String privateIp, String lbClusterId) { //adding the new member to the cluster after it is successfully started in IaaS. - Topology topology = TopologyManager.getInstance().getTopology(); + Topology topology = TopologyManager.getTopology(); Service service = topology.getService(serviceName); Cluster cluster = service.getCluster(clusterId); @@ -178,21 +178,21 @@ public class TopologyBuilder { } try { - TopologyManager.getInstance().acquireWriteLock(); + TopologyManager.acquireWriteLock(); Member member = new Member(serviceName, clusterId, networkPartitionId, partitionId, memberId); member.setStatus(MemberStatus.Created); member.setMemberIp(privateIp); member.setLbClusterId(lbClusterId); cluster.addMember(member); - TopologyManager.getInstance().updateTopology(topology); + TopologyManager.updateTopology(topology); } finally { - TopologyManager.getInstance().releaseWriteLock(); + TopologyManager.releaseWriteLock(); } TopologyEventSender.sendInstanceSpawnedEvent(serviceName, clusterId, networkPartitionId, partitionId, memberId, lbClusterId); } public static void handleMemberStarted(InstanceStartedEvent instanceStartedEvent) { - Topology topology = TopologyManager.getInstance().getTopology(); + Topology topology = TopologyManager.getTopology(); Service service = topology.getService(instanceStartedEvent.getServiceName()); if (service == null) { throw new RuntimeException(String.format("Service %s does not exist", @@ -211,20 +211,20 @@ public class TopologyBuilder { instanceStartedEvent.getMemberId())); } try { - TopologyManager.getInstance().acquireWriteLock(); + TopologyManager.acquireWriteLock(); member.setStatus(MemberStatus.Starting); log.info("member started event adding status started"); - TopologyManager.getInstance().updateTopology(topology); + TopologyManager.updateTopology(topology); } finally { - TopologyManager.getInstance().releaseWriteLock(); + TopologyManager.releaseWriteLock(); } //memberStartedEvent. TopologyEventSender.sendMemberStartedEvent(instanceStartedEvent); } public static void handleMemberActivated(InstanceActivatedEvent instanceActivatedEvent) { - Topology topology = TopologyManager.getInstance().getTopology(); + Topology topology = TopologyManager.getTopology(); Service service = topology.getService(instanceActivatedEvent.getServiceName()); if (service == null) { throw new RuntimeException(String.format("Service %s does not exist", @@ -247,7 +247,7 @@ public class TopologyBuilder { instanceActivatedEvent.getClusterId(), instanceActivatedEvent.getNetworkPartitionId(), instanceActivatedEvent.getPartitionId(), instanceActivatedEvent.getMemberId()); try { - TopologyManager.getInstance().acquireWriteLock(); + TopologyManager.acquireWriteLock(); member.setStatus(MemberStatus.Activated); log.info("member started event adding status activated"); Cartridge cartridge = FasterLookUpDataHolder.getInstance(). @@ -265,16 +265,16 @@ public class TopologyBuilder { } memberActivatedEvent.setMemberIp(member.getMemberIp()); - TopologyManager.getInstance().updateTopology(topology); + TopologyManager.updateTopology(topology); } finally { - TopologyManager.getInstance().releaseWriteLock(); + TopologyManager.releaseWriteLock(); } TopologyEventSender.sendMemberActivatedEvent(memberActivatedEvent); } public static void handleMemberTerminated(String serviceName, String clusterId, String networkPartitionId, String partitionId, String memberId) { - Topology topology = TopologyManager.getInstance().getTopology(); + Topology topology = TopologyManager.getTopology(); Service service = topology.getService(serviceName); Cluster cluster = service.getCluster(clusterId); Member member = cluster.getMember(memberId); @@ -285,11 +285,11 @@ public class TopologyBuilder { } try { - TopologyManager.getInstance().acquireWriteLock(); + TopologyManager.acquireWriteLock(); cluster.removeMember(member); - TopologyManager.getInstance().updateTopology(topology); + TopologyManager.updateTopology(topology); } finally { - TopologyManager.getInstance().releaseWriteLock(); + TopologyManager.releaseWriteLock(); } TopologyEventSender.sendMemberTerminatedEvent(serviceName, clusterId, networkPartitionId, partitionId, memberId); } @@ -297,9 +297,9 @@ public class TopologyBuilder { public static void handleMemberSuspended() { //TODO try { - TopologyManager.getInstance().acquireWriteLock(); + TopologyManager.acquireWriteLock(); } finally { - TopologyManager.getInstance().releaseWriteLock(); + TopologyManager.releaseWriteLock(); } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/8c72f145/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyManager.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyManager.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyManager.java index cdc63d1..9862b9a 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyManager.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyManager.java @@ -18,17 +18,12 @@ */ package org.apache.stratos.cloud.controller.topology; +import com.google.gson.Gson; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.cloud.controller.util.CloudControllerUtil; import org.apache.stratos.messaging.domain.topology.Topology; -import com.google.gson.Gson; - -import javax.jms.TextMessage; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.locks.ReentrantReadWriteLock; /** @@ -37,86 +32,86 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; public class TopologyManager { private static final Log log = LogFactory.getLog(TopologyManager.class); - private volatile ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); - private volatile ReentrantReadWriteLock.ReadLock readLock = lock.readLock(); - private volatile ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock(); - private volatile Topology topology; - private static TopologyManager instance; - private BlockingQueue<TextMessage> instanceStatusMessageQueue = new LinkedBlockingQueue<TextMessage>(); - + private static volatile ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); + private static volatile ReentrantReadWriteLock.ReadLock readLock = lock.readLock(); + private static volatile ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock(); + private static volatile Topology topology; private TopologyManager() { } - public static TopologyManager getInstance() { - synchronized (TopologyManager.class) { - if (instance == null) { - instance = new TopologyManager(); - } - return instance; - + public static void acquireReadLock() { + if(log.isDebugEnabled()) { + log.debug("Read lock acquired"); } - } - - public void acquireReadLock() { readLock.lock(); } - public void releaseReadLock() { + public static void releaseReadLock() { + if(log.isDebugEnabled()) { + log.debug("Read lock released"); + } readLock.unlock(); } - public void acquireWriteLock() { + public static void acquireWriteLock() { + if(log.isDebugEnabled()) { + log.debug("Write lock acquired"); + } writeLock.lock(); } - public void releaseWriteLock() { + public static void releaseWriteLock() { + if(log.isDebugEnabled()) { + log.debug("Write lock released"); + } writeLock.unlock(); } - public synchronized Topology getTopology() { - synchronized (TopologyManager.class) { - if(this.topology == null) { - //need to initialize the topology - this.topology = CloudControllerUtil.retrieveTopology(); - if (this.topology == null) { - if(log.isDebugEnabled()) { - log.debug("Creating new topology"); + public static Topology getTopology() { + if (topology == null) { + synchronized (TopologyManager.class) { + if (topology == null) { + if (log.isDebugEnabled()) { + log.debug("Trying to retrieve topology from registry"); + } + topology = CloudControllerUtil.retrieveTopology(); + if (topology == null) { + if (log.isDebugEnabled()) { + log.debug("Topology not found in registry, creating new"); + } + topology = new Topology(); + } + if (log.isDebugEnabled()) { + log.debug("Topology initialized"); } - this.topology = new Topology(); } } } - return this.topology; + return topology; } - public synchronized void updateTopology(Topology topology) { + /** + * Update in-memory topology and persist it in registry. + * @param topology_ + */ + public static void updateTopology(Topology topology_) { synchronized (TopologyManager.class) { - this.topology = topology; - CloudControllerUtil.persistTopology(this.topology); - if (log.isDebugEnabled()) { - log.debug(String.format("Topology updated: %s", toJson())); - } + if (log.isDebugEnabled()) { + log.debug("Updating topology"); + } + topology = topology_; + CloudControllerUtil.persistTopology(topology); + if (log.isDebugEnabled()) { + log.debug(String.format("Topology updated: %s", toJson(topology))); + } } } - public String toJson() { + private static String toJson(Object object) { Gson gson = new Gson(); - return gson.toJson(topology); - - } - - public void setTopology(Topology topology) { - this.topology = topology; - } - - public BlockingQueue<TextMessage> getInstanceStatusMessageQueue() { - return instanceStatusMessageQueue; - } - - public void setInstanceStatusMessageQueue(BlockingQueue<TextMessage> instanceStatusMessageQueue) { - this.instanceStatusMessageQueue = instanceStatusMessageQueue; + return gson.toJson(object); } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/8c72f145/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologySynchronizerTask.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologySynchronizerTask.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologySynchronizerTask.java new file mode 100644 index 0000000..6c5a1c7 --- /dev/null +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologySynchronizerTask.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.cloud.controller.topology; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.cloud.controller.runtime.FasterLookUpDataHolder; +import org.wso2.carbon.ntask.core.Task; + +import java.util.Map; + +public class TopologySynchronizerTask implements Task{ + private static final Log log = LogFactory.getLog(TopologySynchronizerTask.class); + + @Override + public void execute() { + if(FasterLookUpDataHolder.getInstance().isTopologySyncRunning()|| + // this is a temporary fix to avoid task execution - limitation with ntask + !FasterLookUpDataHolder.getInstance().getEnableTopologySync()){ + return; + } + + if (log.isDebugEnabled()) { + log.debug("TopologySynchronizerTask ..."); + } + + // publish to the topic + if (TopologyManager.getTopology() != null) { + TopologyEventSender.sendCompleteTopologyEvent(TopologyManager.getTopology()); + } + } + + @Override + public void init() { + + // this is a temporary fix to avoid task execution - limitation with ntask + if(!FasterLookUpDataHolder.getInstance().getEnableTopologySync()){ + log.debug("Topology Sync is disabled."); + return; + } + } + + @Override + public void setProperties(Map<String, String> arg0) {} + +}
