Topology model initial implementation at SM backend
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/cbc482a2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/cbc482a2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/cbc482a2 Branch: refs/heads/master Commit: cbc482a249e8a6cae2175639dd2f2f9a15289814 Parents: a0323f5 Author: Isuru <[email protected]> Authored: Tue Dec 10 20:48:09 2013 +0530 Committer: Isuru <[email protected]> Committed: Tue Dec 10 20:48:09 2013 +0530 ---------------------------------------------------------------------- .../internal/ADCManagementServerComponent.java | 7 + .../adc/mgt/listener/TopologyEventListner.java | 40 ++ .../processor/InstanceStatusProcessor.java | 409 +++++++++++++++++++ .../event/processor/TopologyEventProcessor.java | 33 ++ .../processor/TopologyEventProcessorChain.java | 59 +++ .../topology/model/TopologyClusterModel.java | 203 +++++++++ 6 files changed, 751 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/cbc482a2/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/internal/ADCManagementServerComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/internal/ADCManagementServerComponent.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/internal/ADCManagementServerComponent.java index a77cd48..0b9c7b8 100644 --- a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/internal/ADCManagementServerComponent.java +++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/internal/ADCManagementServerComponent.java @@ -21,6 +21,7 @@ package org.apache.stratos.adc.mgt.internal; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.adc.mgt.listener.InstanceStatusListener; +import org.apache.stratos.adc.mgt.listener.TopologyEventListner; import org.apache.stratos.adc.mgt.publisher.TenantEventPublisher; import org.apache.stratos.adc.mgt.publisher.TenantSynchronizerTaskScheduler; import org.apache.stratos.adc.mgt.utils.CartridgeConfigFileReader; @@ -97,6 +98,12 @@ public class ADCManagementServerComponent { Thread tsubscriber = new Thread(subscriber); tsubscriber.start(); + //initializing the topology event subscriber + TopicSubscriber topologyTopicSubscriber = new TopicSubscriber(Constants.TOPOLOGY_TOPIC); + topologyTopicSubscriber.setMessageListener(new TopologyEventListner()); + Thread topologyTopicSubscriberThread = new Thread(topologyTopicSubscriber); + topologyTopicSubscriberThread.start(); + //Starting Topology Receiver TopologyReceiver topologyReceiver = new TopologyReceiver(); Thread topologyReceiverThread = new Thread(topologyReceiver); http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/cbc482a2/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/listener/TopologyEventListner.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/listener/TopologyEventListner.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/listener/TopologyEventListner.java new file mode 100644 index 0000000..0ec88f2 --- /dev/null +++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/listener/TopologyEventListner.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.adc.mgt.listener; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.adc.mgt.topology.event.processor.TopologyEventProcessorChain; + +import javax.jms.Message; +import javax.jms.MessageListener; + +public class TopologyEventListner implements MessageListener { + + private static final Log log = LogFactory.getLog(TopologyEventListner.class); + + public TopologyEventListner() { + } + + public void onMessage(Message message) { + + TopologyEventProcessorChain.getInstance().startProcessing(message); + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/cbc482a2/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/InstanceStatusProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/InstanceStatusProcessor.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/InstanceStatusProcessor.java new file mode 100644 index 0000000..0f274de --- /dev/null +++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/InstanceStatusProcessor.java @@ -0,0 +1,409 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.adc.mgt.topology.event.processor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.adc.mgt.dao.CartridgeSubscriptionInfo; +import org.apache.stratos.adc.mgt.topology.model.TopologyClusterModel; +import org.apache.stratos.adc.mgt.utils.PersistenceManager; +import org.apache.stratos.messaging.domain.topology.Cluster; +import org.apache.stratos.messaging.event.topology.MemberActivatedEvent; +import org.apache.stratos.messaging.event.topology.MemberStartedEvent; +import org.apache.stratos.messaging.event.topology.MemberSuspendedEvent; +import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; +import org.apache.stratos.messaging.util.Constants; +import org.apache.stratos.messaging.util.Util; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.TextMessage; +import java.util.HashMap; +import java.util.Map; + +public class InstanceStatusProcessor extends TopologyEventProcessor { + + private static final Log log = LogFactory.getLog(InstanceStatusProcessor.class); + + private Map<String, Integer> clusterIdToActiveInstanceCountMap; + + public InstanceStatusProcessor () { + clusterIdToActiveInstanceCountMap = new HashMap<String, Integer>(); + } + + @Override + public void process(Message message) { + + //new InstanceStatusListenerThread(message).start(); + //go to next processor in the chain + if(nextTopologyEventProcessor != null) { + nextTopologyEventProcessor.process(message); + } + } + + private void doProcessing (Message message) { + + String messageType = null; + + try { + messageType = message.getStringProperty(Constants.EVENT_CLASS_NAME); + + } catch (JMSException e) { + log.error("Error in getting message type from received Message " + message.getClass().toString(), e); + return; + } + + if (MemberStartedEvent.class.getName().equals(messageType)) { + + log.info("Received message: " + messageType); + + MemberStartedEvent event = getMemberStartedEvent(message); + String clusterDomain = event.getClusterId(); + CartridgeSubscriptionInfo cartridgeSubscriptionInfo = getCartridgeSubscriptionInfo(clusterDomain); + + if(cartridgeSubscriptionInfo != null) { + Cluster cluster = TopologyManager.getTopology(). + getService(cartridgeSubscriptionInfo.getCartridge()).getCluster(clusterDomain); + TopologyClusterModel.getInstance().addCluster(cartridgeSubscriptionInfo.getTenantId(), + cartridgeSubscriptionInfo.getCartridge(), + cartridgeSubscriptionInfo.getAlias(), cluster); + } + + } + else if (MemberActivatedEvent.class.getName().equals(messageType)) { + + log.info("Received message: " + messageType); + + MemberActivatedEvent event = getMemberActivetedEvent(message); + String clusterDomain = event.getClusterId(); + CartridgeSubscriptionInfo cartridgeSubscriptionInfo = getCartridgeSubscriptionInfo(clusterDomain); + + if(cartridgeSubscriptionInfo != null) { + Cluster cluster = TopologyManager.getTopology(). + getService(cartridgeSubscriptionInfo.getCartridge()).getCluster(clusterDomain); + TopologyClusterModel.getInstance().addCluster(cartridgeSubscriptionInfo.getTenantId(), + cartridgeSubscriptionInfo.getCartridge(), + cartridgeSubscriptionInfo.getAlias(), cluster); + } + + + } else if (MemberSuspendedEvent.class.getName().equals(messageType)) { + + log.info("Received message: " + messageType); + + MemberStartedEvent event = getMemberStartedEvent(message); + String clusterDomain = event.getClusterId(); + CartridgeSubscriptionInfo cartridgeSubscriptionInfo = getCartridgeSubscriptionInfo(clusterDomain); + + if(cartridgeSubscriptionInfo != null) { + Cluster cluster = TopologyManager.getTopology(). + getService(cartridgeSubscriptionInfo.getCartridge()).getCluster(clusterDomain); + TopologyClusterModel.getInstance().addCluster(cartridgeSubscriptionInfo.getTenantId(), + cartridgeSubscriptionInfo.getCartridge(), + cartridgeSubscriptionInfo.getAlias(), cluster); + } + + } else if (MemberTerminatedEvent.class.getName().equals(messageType)) { + + log.info("Received message: " + messageType); + + MemberStartedEvent event = getMemberStartedEvent(message); + String clusterDomain = event.getClusterId(); + CartridgeSubscriptionInfo cartridgeSubscriptionInfo = getCartridgeSubscriptionInfo(clusterDomain); + + if(cartridgeSubscriptionInfo != null) { + Cluster cluster = TopologyManager.getTopology(). + getService(cartridgeSubscriptionInfo.getCartridge()).getCluster(clusterDomain); + TopologyClusterModel.getInstance().addCluster(cartridgeSubscriptionInfo.getTenantId(), + cartridgeSubscriptionInfo.getCartridge(), + cartridgeSubscriptionInfo.getAlias(), cluster); + } + + } else { + //cannot happen + } + } + + private MemberStartedEvent getMemberStartedEvent (Message message) { + + String json = null; + try { + json = ((TextMessage)message).getText(); + + } catch (JMSException e) { + log.error("Error in getting Json message type from received Message ", e); + return null; + } + MemberStartedEvent event = (MemberStartedEvent) Util.jsonToObject(json, MemberStartedEvent.class); + + if(log.isDebugEnabled()) { + log.debug("Received message details: [ " + + "Cluster Id: " + event.getClusterId() + + "\nMember Id: " + event.getMemberId() + + "\nService name: " + event.getServiceName() + + "\nStatus: " + event.getStatus().name() + " ]"); + } + + return event; + } + + private MemberActivatedEvent getMemberActivetedEvent (Message message) { + + String json = null; + try { + json = ((TextMessage)message).getText(); + + } catch (JMSException e) { + log.error("Error in getting Json message type from received Message ", e); + return null; + } + MemberActivatedEvent event = (MemberActivatedEvent) Util.jsonToObject(json, MemberStartedEvent.class); + + if(log.isDebugEnabled()) { + log.debug("Received message details: [ " + + "Cluster Id: " + event.getClusterId() + + "\nMember Id: " + event.getMemberId() + + "\nService name: " + event.getServiceName() + + "\nIp: " + event.getMemberIp() + " ]"); + } + + return event; + } + + private MemberSuspendedEvent getMemberSuspendedEvent (Message message) { + + String json = null; + try { + json = ((TextMessage)message).getText(); + + } catch (JMSException e) { + log.error("Error in getting Json message type from received Message ", e); + return null; + } + MemberSuspendedEvent event = (MemberSuspendedEvent) Util.jsonToObject(json, MemberStartedEvent.class); + + if(log.isDebugEnabled()) { + log.debug("Received message details: [ " + + "Cluster Id: " + event.getClusterId() + + "\nMember Id: " + event.getMemberId() + + "\nService name: " + event.getServiceName() + " ]"); + } + + return event; + } + + private MemberTerminatedEvent getMemberTerminatedEvebt (Message message) { + + String json = null; + try { + json = ((TextMessage)message).getText(); + + } catch (JMSException e) { + log.error("Error in getting Json message type from received Message ", e); + return null; + } + MemberTerminatedEvent event = (MemberTerminatedEvent) Util.jsonToObject(json, MemberStartedEvent.class); + + if(log.isDebugEnabled()) { + log.debug("Received message details: [ " + + "Cluster Id: " + event.getClusterId() + + "\nMember Id: " + event.getMemberId() + + "\nService name: " + event.getServiceName() + " ]"); + } + + return event; + } + + private CartridgeSubscriptionInfo getCartridgeSubscriptionInfo (String clusterDomain) { + + try { + return PersistenceManager.getSubscriptionFromClusterId(clusterDomain); + + } catch (Exception e) { + log.error("Error getting subscription information for cluster " + clusterDomain, e); + return null; + } + } + + /** + * Message Processing Thread class for InstanceStatusProcessor + */ + /*private class InstanceStatusListenerThread extends Thread { + + Message message; + + public InstanceStatusListenerThread (Message message) { + this.message = message; + } + + public void run () { + + String messageType = null; + + try { + messageType = message.getStringProperty(Constants.EVENT_CLASS_NAME); + + } catch (JMSException e) { + log.error("Error in getting message type from received Message " + message.getClass().toString(), e); + return; + } + + if (MemberStartedEvent.class.getName().equals(messageType)) { + + log.info("Received message: " + messageType); + + MemberStartedEvent event = getMemberStartedEvent(); + String clusterDomain = event.getClusterId(); + CartridgeSubscriptionInfo cartridgeSubscriptionInfo = getCartridgeSubscriptionInfo(clusterDomain); + if(cartridgeSubscriptionInfo != null) { + + } + + } + else if (MemberActivatedEvent.class.getName().equals(messageType)) { + + log.info("Received message: " + messageType); + + MemberActivatedEvent event = getMemberActivetedEvent(); + String clusterDomain = event.getClusterId(); + + + } else if (MemberSuspendedEvent.class.getName().equals(messageType)) { + + log.info("Received message: " + messageType); + + MemberStartedEvent event = getMemberStartedEvent(); + String clusterDomain = event.getClusterId(); + + } else if (MemberTerminatedEvent.class.getName().equals(messageType)) { + + log.info("Received message: " + messageType); + + MemberStartedEvent event = getMemberStartedEvent(); + String clusterDomain = event.getClusterId(); + + } else { + //cannot happen + } + } + + private MemberStartedEvent getMemberStartedEvent () { + + String json = null; + try { + json = ((TextMessage)message).getText(); + + } catch (JMSException e) { + log.error("Error in getting Json message type from received Message ", e); + return null; + } + MemberStartedEvent event = (MemberStartedEvent) Util.jsonToObject(json, MemberStartedEvent.class); + + if(log.isDebugEnabled()) { + log.debug("Received message details: [ " + + "Cluster Id: " + event.getClusterId() + + "\nMember Id: " + event.getMemberId() + + "\nService name: " + event.getServiceName() + + "\nStatus: " + event.getStatus().name() + " ]"); + } + + return event; + } + + private MemberActivatedEvent getMemberActivetedEvent () { + + String json = null; + try { + json = ((TextMessage)message).getText(); + + } catch (JMSException e) { + log.error("Error in getting Json message type from received Message ", e); + return null; + } + MemberActivatedEvent event = (MemberActivatedEvent) Util.jsonToObject(json, MemberStartedEvent.class); + + if(log.isDebugEnabled()) { + log.debug("Received message details: [ " + + "Cluster Id: " + event.getClusterId() + + "\nMember Id: " + event.getMemberId() + + "\nService name: " + event.getServiceName() + + "\nIp: " + event.getMemberIp() + " ]"); + } + + return event; + } + + private MemberSuspendedEvent getMemberSuspendedEvent () { + + String json = null; + try { + json = ((TextMessage)message).getText(); + + } catch (JMSException e) { + log.error("Error in getting Json message type from received Message ", e); + return null; + } + MemberSuspendedEvent event = (MemberSuspendedEvent) Util.jsonToObject(json, MemberStartedEvent.class); + + if(log.isDebugEnabled()) { + log.debug("Received message details: [ " + + "Cluster Id: " + event.getClusterId() + + "\nMember Id: " + event.getMemberId() + + "\nService name: " + event.getServiceName() + " ]"); + } + + return event; + } + + private MemberTerminatedEvent getMemberTerminatedEvebt () { + + String json = null; + try { + json = ((TextMessage)message).getText(); + + } catch (JMSException e) { + log.error("Error in getting Json message type from received Message ", e); + return null; + } + MemberTerminatedEvent event = (MemberTerminatedEvent) Util.jsonToObject(json, MemberStartedEvent.class); + + if(log.isDebugEnabled()) { + log.debug("Received message details: [ " + + "Cluster Id: " + event.getClusterId() + + "\nMember Id: " + event.getMemberId() + + "\nService name: " + event.getServiceName() + " ]"); + } + + return event; + } + + private CartridgeSubscriptionInfo getCartridgeSubscriptionInfo (String clusterDomain) { + + try { + return PersistenceManager.getSubscriptionFromClusterId(clusterDomain); + + } catch (Exception e) { + log.error("Error getting subscription information for cluster " + clusterDomain, e); + return null; + } + } + }*/ +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/cbc482a2/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/TopologyEventProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/TopologyEventProcessor.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/TopologyEventProcessor.java new file mode 100644 index 0000000..f582d57 --- /dev/null +++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/TopologyEventProcessor.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.adc.mgt.topology.event.processor; + +import javax.jms.Message; + +public abstract class TopologyEventProcessor { + + protected TopologyEventProcessor nextTopologyEventProcessor = null; + + public void setNext (TopologyEventProcessor nextTopologyEventProcessor) { + this.nextTopologyEventProcessor = nextTopologyEventProcessor; + } + + public abstract void process (Message message); +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/cbc482a2/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/TopologyEventProcessorChain.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/TopologyEventProcessorChain.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/TopologyEventProcessorChain.java new file mode 100644 index 0000000..5c25c59 --- /dev/null +++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/TopologyEventProcessorChain.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.adc.mgt.topology.event.processor; + +import javax.jms.Message; + +public class TopologyEventProcessorChain { + + private TopologyEventProcessor firstTopologyEventProcessor = null; + private static TopologyEventProcessorChain topologyEventProcessorChain; + + private TopologyEventProcessorChain () { + firstTopologyEventProcessor = new InstanceStatusProcessor(); + } + + public static TopologyEventProcessorChain getInstance () { + + if(topologyEventProcessorChain == null) { + synchronized (TopologyEventProcessorChain.class) { + if(topologyEventProcessorChain == null) { + topologyEventProcessorChain = new TopologyEventProcessorChain(); + } + } + } + + return topologyEventProcessorChain; + } + + public void initProcessorChain () { + + //if any other topology event processors are added, link them as follows + //firstTopologyEventProcessor.setNext(secondTopologyeventProcessor); + //secondTopologyeventProcessor.setNext(null); + firstTopologyEventProcessor.setNext(null); + } + + public void startProcessing (Message message) { + firstTopologyEventProcessor.process(message); + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/cbc482a2/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/model/TopologyClusterModel.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/model/TopologyClusterModel.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/model/TopologyClusterModel.java new file mode 100644 index 0000000..f70c0d2 --- /dev/null +++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/model/TopologyClusterModel.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.adc.mgt.topology.model; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.domain.topology.Cluster; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class TopologyClusterModel { + + private static final Log log = LogFactory.getLog(TopologyClusterModel.class); + private Map<TenantIdAndAliasTopologyKey, Cluster> tenantIdAndAliasTopologyKeyToClusterMap; + private Map<Integer, List<Cluster>> tenantIdToClusterMap; + private Map<TenantIdAndTypeTopologyKey , List<Cluster>> tenantIdAndTypeTopologyKeyToClusterMap; + private static TopologyClusterModel topologyClusterModel; + + //locks + private static volatile ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private static volatile ReentrantReadWriteLock.ReadLock readLock = lock.readLock(); + private static volatile ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock(); + + private TopologyClusterModel() { + tenantIdAndAliasTopologyKeyToClusterMap = new HashMap<TenantIdAndAliasTopologyKey, Cluster>(); + tenantIdAndTypeTopologyKeyToClusterMap = new HashMap<TenantIdAndTypeTopologyKey, List<Cluster>>(); + tenantIdToClusterMap = new HashMap<Integer, List<Cluster>>(); + } + + public static TopologyClusterModel getInstance () { + if(topologyClusterModel == null) { + synchronized (TopologyClusterModel.class) { + if (topologyClusterModel == null) { + topologyClusterModel = new TopologyClusterModel(); + } + } + } + + return topologyClusterModel; + } + + public void addCluster (int tenantId, String cartridgeType, String subscriptionAlias, Cluster cluster) { + + List<Cluster> clusters; + writeLock.lock(); + + try { + //[Tenant Id + Subscription Alias] -> Cluster map + tenantIdAndAliasTopologyKeyToClusterMap.put(new TenantIdAndAliasTopologyKey(tenantId, subscriptionAlias), cluster); + + //Tenant Id -> Cluster map + clusters = tenantIdToClusterMap.get(tenantId); + if(clusters == null) { + clusters = new ArrayList<Cluster>(); + clusters.add(cluster); + tenantIdToClusterMap.put(tenantId, clusters); + } else { + clusters.add(cluster); + } + + //[Tenant Id + Cartridge Type] -> Cluster map + clusters = tenantIdAndTypeTopologyKeyToClusterMap.get(new TenantIdAndTypeTopologyKey(tenantId, cartridgeType)); + if(clusters == null) { + clusters = new ArrayList<Cluster>(); + clusters.add(cluster); + tenantIdAndTypeTopologyKeyToClusterMap.put(new TenantIdAndTypeTopologyKey(tenantId, cartridgeType), clusters); + } else { + clusters.add(cluster); + } + + } finally { + writeLock.unlock(); + } + } + + public Cluster getCluster (int tenantId, String subscriptionAlias) { + + readLock.lock(); + try { + return tenantIdAndAliasTopologyKeyToClusterMap.get(new TenantIdAndAliasTopologyKey(tenantId, subscriptionAlias)); + + } finally { + readLock.unlock(); + } + } + + public List<Cluster> getClusters (int tenantId, String cartridgeType) { + + readLock.lock(); + try { + return tenantIdAndTypeTopologyKeyToClusterMap.get(new TenantIdAndTypeTopologyKey(tenantId, cartridgeType)); + + } finally { + readLock.unlock(); + } + } + + public List<Cluster> getClusters (int tenantId) { + + readLock.lock(); + try { + return tenantIdToClusterMap.get(tenantId); + + } finally { + readLock.unlock(); + } + } + + public void removeCluster (int tenantId, String subscriptionAlias) { + tenantIdAndAliasTopologyKeyToClusterMap.remove(new TenantIdAndAliasTopologyKey(tenantId, subscriptionAlias)); + } + + private class TenantIdAndAliasTopologyKey { + + private int tenantId; + private String subscriptionAlias; + + public TenantIdAndAliasTopologyKey (int tenantId, String subscriptionAlias) { + + this.tenantId = tenantId; + this.subscriptionAlias = subscriptionAlias; + } + + public boolean equals(Object other) { + + if(this == other) { + return true; + } + if(!(other instanceof TenantIdAndAliasTopologyKey)) { + return false; + } + + TenantIdAndAliasTopologyKey that = (TenantIdAndAliasTopologyKey)other; + return ((this.tenantId == that.tenantId) && (this.subscriptionAlias == that.subscriptionAlias)); + } + + public int hashCode () { + + int subscriptionAliasHashCode = 0; + if(subscriptionAlias != null) { + subscriptionAliasHashCode = subscriptionAlias.hashCode(); + } + + return (tenantId * 3 + subscriptionAliasHashCode * 5); + } + } + + public class TenantIdAndTypeTopologyKey { + + private int tenantId; + private String subscriptionAlias; + + public TenantIdAndTypeTopologyKey (int tenantId, String subscriptionAlias) { + + this.tenantId = tenantId; + this.subscriptionAlias = subscriptionAlias; + } + + public boolean equals(Object other) { + + if(this == other) { + return true; + } + if(!(other instanceof TenantIdAndTypeTopologyKey)) { + return false; + } + + TenantIdAndTypeTopologyKey that = (TenantIdAndTypeTopologyKey)other; + return ((this.tenantId == that.tenantId) && (this.subscriptionAlias == that.subscriptionAlias)); + } + + public int hashCode () { + + int subscriptionAliasHashCode = 0; + if(subscriptionAlias != null) { + subscriptionAliasHashCode = subscriptionAlias.hashCode(); + } + + return (tenantId * 3 + subscriptionAliasHashCode * 5); + } + } +}
