Topology model initial implementation at SM backend

Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/cbc482a2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/cbc482a2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/cbc482a2

Branch: refs/heads/master
Commit: cbc482a249e8a6cae2175639dd2f2f9a15289814
Parents: a0323f5
Author: Isuru <[email protected]>
Authored: Tue Dec 10 20:48:09 2013 +0530
Committer: Isuru <[email protected]>
Committed: Tue Dec 10 20:48:09 2013 +0530

----------------------------------------------------------------------
 .../internal/ADCManagementServerComponent.java  |   7 +
 .../adc/mgt/listener/TopologyEventListner.java  |  40 ++
 .../processor/InstanceStatusProcessor.java      | 409 +++++++++++++++++++
 .../event/processor/TopologyEventProcessor.java |  33 ++
 .../processor/TopologyEventProcessorChain.java  |  59 +++
 .../topology/model/TopologyClusterModel.java    | 203 +++++++++
 6 files changed, 751 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/cbc482a2/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/internal/ADCManagementServerComponent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/internal/ADCManagementServerComponent.java
 
b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/internal/ADCManagementServerComponent.java
index a77cd48..0b9c7b8 100644
--- 
a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/internal/ADCManagementServerComponent.java
+++ 
b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/internal/ADCManagementServerComponent.java
@@ -21,6 +21,7 @@ package org.apache.stratos.adc.mgt.internal;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.adc.mgt.listener.InstanceStatusListener;
+import org.apache.stratos.adc.mgt.listener.TopologyEventListner;
 import org.apache.stratos.adc.mgt.publisher.TenantEventPublisher;
 import org.apache.stratos.adc.mgt.publisher.TenantSynchronizerTaskScheduler;
 import org.apache.stratos.adc.mgt.utils.CartridgeConfigFileReader;
@@ -97,6 +98,12 @@ public class ADCManagementServerComponent {
             Thread tsubscriber = new Thread(subscriber);
                        tsubscriber.start();
 
+            //initializing the topology event subscriber
+            TopicSubscriber topologyTopicSubscriber = new 
TopicSubscriber(Constants.TOPOLOGY_TOPIC);
+            topologyTopicSubscriber.setMessageListener(new 
TopologyEventListner());
+            Thread topologyTopicSubscriberThread = new 
Thread(topologyTopicSubscriber);
+            topologyTopicSubscriberThread.start();
+
             //Starting Topology Receiver
             TopologyReceiver topologyReceiver = new TopologyReceiver();
             Thread topologyReceiverThread = new Thread(topologyReceiver);

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/cbc482a2/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/listener/TopologyEventListner.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/listener/TopologyEventListner.java
 
b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/listener/TopologyEventListner.java
new file mode 100644
index 0000000..0ec88f2
--- /dev/null
+++ 
b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/listener/TopologyEventListner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.adc.mgt.listener;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import 
org.apache.stratos.adc.mgt.topology.event.processor.TopologyEventProcessorChain;
+
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
+public class TopologyEventListner implements MessageListener {
+
+    private static final Log log = 
LogFactory.getLog(TopologyEventListner.class);
+
+    public TopologyEventListner() {
+    }
+
+    public void onMessage(Message message) {
+
+        TopologyEventProcessorChain.getInstance().startProcessing(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/cbc482a2/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/InstanceStatusProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/InstanceStatusProcessor.java
 
b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/InstanceStatusProcessor.java
new file mode 100644
index 0000000..0f274de
--- /dev/null
+++ 
b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/InstanceStatusProcessor.java
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.adc.mgt.topology.event.processor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.adc.mgt.dao.CartridgeSubscriptionInfo;
+import org.apache.stratos.adc.mgt.topology.model.TopologyClusterModel;
+import org.apache.stratos.adc.mgt.utils.PersistenceManager;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
+import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
+import org.apache.stratos.messaging.event.topology.MemberSuspendedEvent;
+import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+import org.apache.stratos.messaging.util.Constants;
+import org.apache.stratos.messaging.util.Util;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.TextMessage;
+import java.util.HashMap;
+import java.util.Map;
+
+public class InstanceStatusProcessor extends TopologyEventProcessor {
+
+    private static final Log log = 
LogFactory.getLog(InstanceStatusProcessor.class);
+
+    private Map<String, Integer> clusterIdToActiveInstanceCountMap;
+
+    public InstanceStatusProcessor () {
+        clusterIdToActiveInstanceCountMap = new HashMap<String, Integer>();
+    }
+
+    @Override
+    public void process(Message message) {
+
+        //new InstanceStatusListenerThread(message).start();
+        //go to next processor in the chain
+        if(nextTopologyEventProcessor != null) {
+            nextTopologyEventProcessor.process(message);
+        }
+    }
+
+    private void doProcessing (Message message) {
+
+        String messageType = null;
+
+        try {
+            messageType = 
message.getStringProperty(Constants.EVENT_CLASS_NAME);
+
+        } catch (JMSException e) {
+            log.error("Error in getting message type from received Message " + 
message.getClass().toString(), e);
+            return;
+        }
+
+        if (MemberStartedEvent.class.getName().equals(messageType)) {
+
+            log.info("Received message: " + messageType);
+
+            MemberStartedEvent event = getMemberStartedEvent(message);
+            String clusterDomain = event.getClusterId();
+            CartridgeSubscriptionInfo cartridgeSubscriptionInfo = 
getCartridgeSubscriptionInfo(clusterDomain);
+
+            if(cartridgeSubscriptionInfo != null) {
+                Cluster cluster = TopologyManager.getTopology().
+                        
getService(cartridgeSubscriptionInfo.getCartridge()).getCluster(clusterDomain);
+                
TopologyClusterModel.getInstance().addCluster(cartridgeSubscriptionInfo.getTenantId(),
+                        cartridgeSubscriptionInfo.getCartridge(),
+                        cartridgeSubscriptionInfo.getAlias(), cluster);
+            }
+
+        }
+        else if (MemberActivatedEvent.class.getName().equals(messageType)) {
+
+            log.info("Received message: " + messageType);
+
+            MemberActivatedEvent event = getMemberActivetedEvent(message);
+            String clusterDomain = event.getClusterId();
+            CartridgeSubscriptionInfo cartridgeSubscriptionInfo = 
getCartridgeSubscriptionInfo(clusterDomain);
+
+            if(cartridgeSubscriptionInfo != null) {
+                Cluster cluster = TopologyManager.getTopology().
+                        
getService(cartridgeSubscriptionInfo.getCartridge()).getCluster(clusterDomain);
+                
TopologyClusterModel.getInstance().addCluster(cartridgeSubscriptionInfo.getTenantId(),
+                        cartridgeSubscriptionInfo.getCartridge(),
+                        cartridgeSubscriptionInfo.getAlias(), cluster);
+            }
+
+
+        } else if (MemberSuspendedEvent.class.getName().equals(messageType)) {
+
+            log.info("Received message: " + messageType);
+
+            MemberStartedEvent event = getMemberStartedEvent(message);
+            String clusterDomain = event.getClusterId();
+            CartridgeSubscriptionInfo cartridgeSubscriptionInfo = 
getCartridgeSubscriptionInfo(clusterDomain);
+
+            if(cartridgeSubscriptionInfo != null) {
+                Cluster cluster = TopologyManager.getTopology().
+                        
getService(cartridgeSubscriptionInfo.getCartridge()).getCluster(clusterDomain);
+                
TopologyClusterModel.getInstance().addCluster(cartridgeSubscriptionInfo.getTenantId(),
+                        cartridgeSubscriptionInfo.getCartridge(),
+                        cartridgeSubscriptionInfo.getAlias(), cluster);
+            }
+
+        } else if (MemberTerminatedEvent.class.getName().equals(messageType)) {
+
+            log.info("Received message: " + messageType);
+
+            MemberStartedEvent event = getMemberStartedEvent(message);
+            String clusterDomain = event.getClusterId();
+            CartridgeSubscriptionInfo cartridgeSubscriptionInfo = 
getCartridgeSubscriptionInfo(clusterDomain);
+
+            if(cartridgeSubscriptionInfo != null) {
+                Cluster cluster = TopologyManager.getTopology().
+                        
getService(cartridgeSubscriptionInfo.getCartridge()).getCluster(clusterDomain);
+                
TopologyClusterModel.getInstance().addCluster(cartridgeSubscriptionInfo.getTenantId(),
+                        cartridgeSubscriptionInfo.getCartridge(),
+                        cartridgeSubscriptionInfo.getAlias(), cluster);
+            }
+
+        } else {
+            //cannot happen
+        }
+    }
+
+    private MemberStartedEvent getMemberStartedEvent (Message message) {
+
+        String json = null;
+        try {
+            json = ((TextMessage)message).getText();
+
+        } catch (JMSException e) {
+            log.error("Error in getting Json message type from received 
Message ", e);
+            return null;
+        }
+        MemberStartedEvent event = (MemberStartedEvent) 
Util.jsonToObject(json, MemberStartedEvent.class);
+
+        if(log.isDebugEnabled()) {
+            log.debug("Received message details: [ " +
+                    "Cluster Id: " + event.getClusterId() +
+                    "\nMember Id: " + event.getMemberId() +
+                    "\nService name: " + event.getServiceName() +
+                    "\nStatus: " + event.getStatus().name() + " ]");
+        }
+
+        return event;
+    }
+
+    private MemberActivatedEvent getMemberActivetedEvent (Message message) {
+
+        String json = null;
+        try {
+            json = ((TextMessage)message).getText();
+
+        } catch (JMSException e) {
+            log.error("Error in getting Json message type from received 
Message ", e);
+            return null;
+        }
+        MemberActivatedEvent event = (MemberActivatedEvent) 
Util.jsonToObject(json, MemberStartedEvent.class);
+
+        if(log.isDebugEnabled()) {
+            log.debug("Received message details: [ " +
+                    "Cluster Id: " + event.getClusterId() +
+                    "\nMember Id: " + event.getMemberId() +
+                    "\nService name: " + event.getServiceName() +
+                    "\nIp: " + event.getMemberIp() + " ]");
+        }
+
+        return event;
+    }
+
+    private MemberSuspendedEvent getMemberSuspendedEvent (Message message) {
+
+        String json = null;
+        try {
+            json = ((TextMessage)message).getText();
+
+        } catch (JMSException e) {
+            log.error("Error in getting Json message type from received 
Message ", e);
+            return null;
+        }
+        MemberSuspendedEvent event = (MemberSuspendedEvent) 
Util.jsonToObject(json, MemberStartedEvent.class);
+
+        if(log.isDebugEnabled()) {
+            log.debug("Received message details: [ " +
+                    "Cluster Id: " + event.getClusterId() +
+                    "\nMember Id: " + event.getMemberId() +
+                    "\nService name: " + event.getServiceName() + " ]");
+        }
+
+        return event;
+    }
+
+    private MemberTerminatedEvent getMemberTerminatedEvebt (Message message) {
+
+        String json = null;
+        try {
+            json = ((TextMessage)message).getText();
+
+        } catch (JMSException e) {
+            log.error("Error in getting Json message type from received 
Message ", e);
+            return null;
+        }
+        MemberTerminatedEvent event = (MemberTerminatedEvent) 
Util.jsonToObject(json, MemberStartedEvent.class);
+
+        if(log.isDebugEnabled()) {
+            log.debug("Received message details: [ " +
+                    "Cluster Id: " + event.getClusterId() +
+                    "\nMember Id: " + event.getMemberId() +
+                    "\nService name: " + event.getServiceName() + " ]");
+        }
+
+        return event;
+    }
+
+    private CartridgeSubscriptionInfo getCartridgeSubscriptionInfo (String 
clusterDomain) {
+
+        try {
+            return 
PersistenceManager.getSubscriptionFromClusterId(clusterDomain);
+
+        } catch (Exception e) {
+            log.error("Error getting subscription information for cluster " + 
clusterDomain, e);
+            return null;
+        }
+    }
+
+    /**
+     * Message Processing Thread class for InstanceStatusProcessor
+     */
+    /*private class InstanceStatusListenerThread extends Thread {
+
+        Message message;
+
+        public InstanceStatusListenerThread (Message message) {
+            this.message = message;
+        }
+
+        public void run () {
+
+            String messageType = null;
+
+            try {
+                messageType = 
message.getStringProperty(Constants.EVENT_CLASS_NAME);
+
+            } catch (JMSException e) {
+                log.error("Error in getting message type from received Message 
" + message.getClass().toString(), e);
+                return;
+            }
+
+            if (MemberStartedEvent.class.getName().equals(messageType)) {
+
+                log.info("Received message: " + messageType);
+
+                MemberStartedEvent event = getMemberStartedEvent();
+                String clusterDomain = event.getClusterId();
+                CartridgeSubscriptionInfo cartridgeSubscriptionInfo = 
getCartridgeSubscriptionInfo(clusterDomain);
+                if(cartridgeSubscriptionInfo != null) {
+
+                }
+
+            }
+            else if (MemberActivatedEvent.class.getName().equals(messageType)) 
{
+
+                log.info("Received message: " + messageType);
+
+                MemberActivatedEvent event = getMemberActivetedEvent();
+                String clusterDomain = event.getClusterId();
+
+
+            } else if 
(MemberSuspendedEvent.class.getName().equals(messageType)) {
+
+                log.info("Received message: " + messageType);
+
+                MemberStartedEvent event = getMemberStartedEvent();
+                String clusterDomain = event.getClusterId();
+
+            } else if 
(MemberTerminatedEvent.class.getName().equals(messageType)) {
+
+                log.info("Received message: " + messageType);
+
+                MemberStartedEvent event = getMemberStartedEvent();
+                String clusterDomain = event.getClusterId();
+
+            } else {
+                //cannot happen
+            }
+        }
+
+        private MemberStartedEvent getMemberStartedEvent () {
+
+            String json = null;
+            try {
+                json = ((TextMessage)message).getText();
+
+            } catch (JMSException e) {
+                log.error("Error in getting Json message type from received 
Message ", e);
+                return null;
+            }
+            MemberStartedEvent event = (MemberStartedEvent) 
Util.jsonToObject(json, MemberStartedEvent.class);
+
+            if(log.isDebugEnabled()) {
+                log.debug("Received message details: [ " +
+                        "Cluster Id: " + event.getClusterId() +
+                        "\nMember Id: " + event.getMemberId() +
+                        "\nService name: " + event.getServiceName() +
+                        "\nStatus: " + event.getStatus().name() + " ]");
+            }
+
+            return event;
+        }
+
+        private MemberActivatedEvent getMemberActivetedEvent () {
+
+            String json = null;
+            try {
+                json = ((TextMessage)message).getText();
+
+            } catch (JMSException e) {
+                log.error("Error in getting Json message type from received 
Message ", e);
+                return null;
+            }
+            MemberActivatedEvent event = (MemberActivatedEvent) 
Util.jsonToObject(json, MemberStartedEvent.class);
+
+            if(log.isDebugEnabled()) {
+                log.debug("Received message details: [ " +
+                        "Cluster Id: " + event.getClusterId() +
+                        "\nMember Id: " + event.getMemberId() +
+                        "\nService name: " + event.getServiceName() +
+                        "\nIp: " + event.getMemberIp() + " ]");
+            }
+
+            return event;
+        }
+
+        private MemberSuspendedEvent getMemberSuspendedEvent () {
+
+            String json = null;
+            try {
+                json = ((TextMessage)message).getText();
+
+            } catch (JMSException e) {
+                log.error("Error in getting Json message type from received 
Message ", e);
+                return null;
+            }
+            MemberSuspendedEvent event = (MemberSuspendedEvent) 
Util.jsonToObject(json, MemberStartedEvent.class);
+
+            if(log.isDebugEnabled()) {
+                log.debug("Received message details: [ " +
+                        "Cluster Id: " + event.getClusterId() +
+                        "\nMember Id: " + event.getMemberId() +
+                        "\nService name: " + event.getServiceName() + " ]");
+            }
+
+            return event;
+        }
+
+        private MemberTerminatedEvent getMemberTerminatedEvebt () {
+
+            String json = null;
+            try {
+                json = ((TextMessage)message).getText();
+
+            } catch (JMSException e) {
+                log.error("Error in getting Json message type from received 
Message ", e);
+                return null;
+            }
+            MemberTerminatedEvent event = (MemberTerminatedEvent) 
Util.jsonToObject(json, MemberStartedEvent.class);
+
+            if(log.isDebugEnabled()) {
+                log.debug("Received message details: [ " +
+                        "Cluster Id: " + event.getClusterId() +
+                        "\nMember Id: " + event.getMemberId() +
+                        "\nService name: " + event.getServiceName() + " ]");
+            }
+
+            return event;
+        }
+
+        private CartridgeSubscriptionInfo getCartridgeSubscriptionInfo (String 
clusterDomain) {
+
+            try {
+                return 
PersistenceManager.getSubscriptionFromClusterId(clusterDomain);
+
+            } catch (Exception e) {
+                log.error("Error getting subscription information for cluster 
" + clusterDomain, e);
+                return null;
+            }
+        }
+    }*/
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/cbc482a2/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/TopologyEventProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/TopologyEventProcessor.java
 
b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/TopologyEventProcessor.java
new file mode 100644
index 0000000..f582d57
--- /dev/null
+++ 
b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/TopologyEventProcessor.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.adc.mgt.topology.event.processor;
+
+import javax.jms.Message;
+
+public abstract class TopologyEventProcessor {
+
+    protected TopologyEventProcessor nextTopologyEventProcessor = null;
+
+    public void setNext (TopologyEventProcessor nextTopologyEventProcessor) {
+        this.nextTopologyEventProcessor = nextTopologyEventProcessor;
+    }
+
+    public abstract void process (Message message);
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/cbc482a2/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/TopologyEventProcessorChain.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/TopologyEventProcessorChain.java
 
b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/TopologyEventProcessorChain.java
new file mode 100644
index 0000000..5c25c59
--- /dev/null
+++ 
b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/TopologyEventProcessorChain.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.adc.mgt.topology.event.processor;
+
+import javax.jms.Message;
+
+public class TopologyEventProcessorChain {
+
+    private TopologyEventProcessor firstTopologyEventProcessor = null;
+    private static TopologyEventProcessorChain topologyEventProcessorChain;
+
+    private TopologyEventProcessorChain () {
+        firstTopologyEventProcessor = new InstanceStatusProcessor();
+    }
+
+    public static TopologyEventProcessorChain getInstance () {
+
+        if(topologyEventProcessorChain == null) {
+            synchronized (TopologyEventProcessorChain.class) {
+                if(topologyEventProcessorChain == null) {
+                      topologyEventProcessorChain = new 
TopologyEventProcessorChain();
+                }
+            }
+        }
+
+        return topologyEventProcessorChain;
+    }
+
+    public void initProcessorChain () {
+
+        //if any other topology event processors are added, link them as 
follows
+        //firstTopologyEventProcessor.setNext(secondTopologyeventProcessor);
+        //secondTopologyeventProcessor.setNext(null);
+        firstTopologyEventProcessor.setNext(null);
+    }
+
+    public void startProcessing (Message message) {
+        firstTopologyEventProcessor.process(message);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/cbc482a2/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/model/TopologyClusterModel.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/model/TopologyClusterModel.java
 
b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/model/TopologyClusterModel.java
new file mode 100644
index 0000000..f70c0d2
--- /dev/null
+++ 
b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/model/TopologyClusterModel.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.adc.mgt.topology.model;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class TopologyClusterModel {
+
+    private static final Log log = 
LogFactory.getLog(TopologyClusterModel.class);
+    private Map<TenantIdAndAliasTopologyKey, Cluster> 
tenantIdAndAliasTopologyKeyToClusterMap;
+    private Map<Integer, List<Cluster>> tenantIdToClusterMap;
+    private Map<TenantIdAndTypeTopologyKey , List<Cluster>> 
tenantIdAndTypeTopologyKeyToClusterMap;
+    private static TopologyClusterModel topologyClusterModel;
+
+    //locks
+    private static volatile ReentrantReadWriteLock lock = new 
ReentrantReadWriteLock();
+    private static volatile ReentrantReadWriteLock.ReadLock readLock = 
lock.readLock();
+    private static volatile ReentrantReadWriteLock.WriteLock writeLock = 
lock.writeLock();
+
+    private TopologyClusterModel() {
+        tenantIdAndAliasTopologyKeyToClusterMap = new 
HashMap<TenantIdAndAliasTopologyKey, Cluster>();
+        tenantIdAndTypeTopologyKeyToClusterMap = new 
HashMap<TenantIdAndTypeTopologyKey, List<Cluster>>();
+        tenantIdToClusterMap = new HashMap<Integer, List<Cluster>>();
+    }
+
+    public static TopologyClusterModel getInstance () {
+        if(topologyClusterModel == null) {
+            synchronized (TopologyClusterModel.class) {
+                if (topologyClusterModel == null) {
+                    topologyClusterModel = new TopologyClusterModel();
+                }
+            }
+        }
+
+        return topologyClusterModel;
+    }
+
+    public void addCluster (int tenantId, String cartridgeType, String 
subscriptionAlias, Cluster cluster) {
+
+        List<Cluster> clusters;
+        writeLock.lock();
+
+        try {
+            //[Tenant Id + Subscription Alias] -> Cluster map
+            tenantIdAndAliasTopologyKeyToClusterMap.put(new 
TenantIdAndAliasTopologyKey(tenantId, subscriptionAlias), cluster);
+
+            //Tenant Id -> Cluster map
+            clusters = tenantIdToClusterMap.get(tenantId);
+            if(clusters == null) {
+                clusters = new ArrayList<Cluster>();
+                clusters.add(cluster);
+                tenantIdToClusterMap.put(tenantId, clusters);
+            } else {
+                clusters.add(cluster);
+            }
+
+            //[Tenant Id + Cartridge Type] -> Cluster map
+            clusters = tenantIdAndTypeTopologyKeyToClusterMap.get(new 
TenantIdAndTypeTopologyKey(tenantId, cartridgeType));
+            if(clusters == null) {
+                clusters = new ArrayList<Cluster>();
+                clusters.add(cluster);
+                tenantIdAndTypeTopologyKeyToClusterMap.put(new 
TenantIdAndTypeTopologyKey(tenantId, cartridgeType), clusters);
+            } else {
+                clusters.add(cluster);
+            }
+
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    public Cluster getCluster (int tenantId, String subscriptionAlias) {
+
+        readLock.lock();
+        try {
+            return tenantIdAndAliasTopologyKeyToClusterMap.get(new 
TenantIdAndAliasTopologyKey(tenantId, subscriptionAlias));
+
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public List<Cluster> getClusters (int tenantId, String cartridgeType) {
+
+        readLock.lock();
+        try {
+            return tenantIdAndTypeTopologyKeyToClusterMap.get(new 
TenantIdAndTypeTopologyKey(tenantId, cartridgeType));
+
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public List<Cluster> getClusters (int tenantId) {
+
+        readLock.lock();
+        try {
+            return tenantIdToClusterMap.get(tenantId);
+
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public void removeCluster (int tenantId, String subscriptionAlias) {
+        tenantIdAndAliasTopologyKeyToClusterMap.remove(new 
TenantIdAndAliasTopologyKey(tenantId, subscriptionAlias));
+    }
+
+    private class TenantIdAndAliasTopologyKey {
+
+        private int tenantId;
+        private String subscriptionAlias;
+
+        public TenantIdAndAliasTopologyKey (int tenantId, String 
subscriptionAlias) {
+
+            this.tenantId = tenantId;
+            this.subscriptionAlias = subscriptionAlias;
+        }
+
+        public boolean equals(Object other) {
+
+            if(this == other) {
+                return true;
+            }
+            if(!(other instanceof TenantIdAndAliasTopologyKey)) {
+                return false;
+            }
+
+            TenantIdAndAliasTopologyKey that = 
(TenantIdAndAliasTopologyKey)other;
+            return ((this.tenantId == that.tenantId) && 
(this.subscriptionAlias == that.subscriptionAlias));
+        }
+
+        public int hashCode () {
+
+            int subscriptionAliasHashCode = 0;
+            if(subscriptionAlias != null) {
+                subscriptionAliasHashCode = subscriptionAlias.hashCode();
+            }
+
+            return (tenantId * 3 + subscriptionAliasHashCode * 5);
+        }
+    }
+
+    public class TenantIdAndTypeTopologyKey {
+
+        private int tenantId;
+        private String subscriptionAlias;
+
+        public TenantIdAndTypeTopologyKey (int tenantId, String 
subscriptionAlias) {
+
+            this.tenantId = tenantId;
+            this.subscriptionAlias = subscriptionAlias;
+        }
+
+        public boolean equals(Object other) {
+
+            if(this == other) {
+                return true;
+            }
+            if(!(other instanceof TenantIdAndTypeTopologyKey)) {
+                return false;
+            }
+
+            TenantIdAndTypeTopologyKey that = 
(TenantIdAndTypeTopologyKey)other;
+            return ((this.tenantId == that.tenantId) && 
(this.subscriptionAlias == that.subscriptionAlias));
+        }
+
+        public int hashCode () {
+
+            int subscriptionAliasHashCode = 0;
+            if(subscriptionAlias != null) {
+                subscriptionAliasHashCode = subscriptionAlias.hashCode();
+            }
+
+            return (tenantId * 3 + subscriptionAliasHashCode * 5);
+        }
+    }
+}

Reply via email to