Updated Branches:
  refs/heads/master 8a036e5f7 -> 3e2d598a5

using the event listner model without a custom event processor chain


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/04bed7e6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/04bed7e6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/04bed7e6

Branch: refs/heads/master
Commit: 04bed7e6fd2bbdf6a1defe00127cc6f1e633786d
Parents: 64bc33d
Author: Isuru <[email protected]>
Authored: Thu Dec 12 22:46:19 2013 +0530
Committer: Isuru <[email protected]>
Committed: Thu Dec 12 22:46:19 2013 +0530

----------------------------------------------------------------------
 .../internal/ADCManagementServerComponent.java  |  23 +-
 .../StratosManagerTopologyReceiver.java         | 293 +++++++++++++++++++
 2 files changed, 310 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/04bed7e6/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/internal/ADCManagementServerComponent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/internal/ADCManagementServerComponent.java
 
b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/internal/ADCManagementServerComponent.java
index 0b9c7b8..8999c9f 100644
--- 
a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/internal/ADCManagementServerComponent.java
+++ 
b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/internal/ADCManagementServerComponent.java
@@ -21,15 +21,14 @@ package org.apache.stratos.adc.mgt.internal;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.adc.mgt.listener.InstanceStatusListener;
-import org.apache.stratos.adc.mgt.listener.TopologyEventListner;
 import org.apache.stratos.adc.mgt.publisher.TenantEventPublisher;
 import org.apache.stratos.adc.mgt.publisher.TenantSynchronizerTaskScheduler;
+import 
org.apache.stratos.adc.mgt.topology.receiver.StratosManagerTopologyReceiver;
 import org.apache.stratos.adc.mgt.utils.CartridgeConfigFileReader;
 import org.apache.stratos.adc.mgt.utils.StratosDBUtils;
 import org.apache.stratos.adc.topology.mgt.service.TopologyManagementService;
 import org.apache.stratos.messaging.broker.publish.EventPublisher;
 import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyReceiver;
 import org.apache.stratos.messaging.util.Constants;
 import org.osgi.service.component.ComponentContext;
 import org.wso2.carbon.ntask.core.service.TaskService;
@@ -66,7 +65,9 @@ import org.wso2.carbon.utils.ConfigurationContextService;
  */
 
 public class ADCManagementServerComponent {
+
     private static final Log log = 
LogFactory.getLog(ADCManagementServerComponent.class);
+    private StratosManagerTopologyReceiver stratosManagerTopologyReceiver;
 
     protected void activate(ComponentContext componentContext) throws 
Exception {
                try {
@@ -99,7 +100,7 @@ public class ADCManagementServerComponent {
                        tsubscriber.start();
 
             //initializing the topology event subscriber
-            TopicSubscriber topologyTopicSubscriber = new 
TopicSubscriber(Constants.TOPOLOGY_TOPIC);
+            /*TopicSubscriber topologyTopicSubscriber = new 
TopicSubscriber(Constants.TOPOLOGY_TOPIC);
             topologyTopicSubscriber.setMessageListener(new 
TopologyEventListner());
             Thread topologyTopicSubscriberThread = new 
Thread(topologyTopicSubscriber);
             topologyTopicSubscriberThread.start();
@@ -107,11 +108,15 @@ public class ADCManagementServerComponent {
             //Starting Topology Receiver
             TopologyReceiver topologyReceiver = new TopologyReceiver();
             Thread topologyReceiverThread = new Thread(topologyReceiver);
+            topologyReceiverThread.start();*/
+
+            stratosManagerTopologyReceiver = new 
StratosManagerTopologyReceiver();
+            Thread topologyReceiverThread = new 
Thread(stratosManagerTopologyReceiver);
             topologyReceiverThread.start();
+            log.info("Topology receiver thread started");
 
-            if (log.isInfoEnabled()) {
-                log.info("ADC management server component is activated");
-            }
+            //Component activated successfully
+            log.info("ADC management server component is activated");
                        
                } catch (Exception e) {
             if(log.isFatalEnabled()) {
@@ -170,4 +175,10 @@ public class ADCManagementServerComponent {
         }
         ServiceReferenceHolder.getInstance().setTaskService(null);
     }
+
+    protected void deactivate(ComponentContext context) {
+
+        //terminate Stratos Manager Topology Receiver
+        stratosManagerTopologyReceiver.terminate();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/04bed7e6/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/receiver/StratosManagerTopologyReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/receiver/StratosManagerTopologyReceiver.java
 
b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/receiver/StratosManagerTopologyReceiver.java
new file mode 100644
index 0000000..75fdbc9
--- /dev/null
+++ 
b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/receiver/StratosManagerTopologyReceiver.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.adc.mgt.topology.receiver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.adc.mgt.dao.CartridgeSubscriptionInfo;
+import 
org.apache.stratos.adc.mgt.topology.model.TopologyClusterInformationModel;
+import org.apache.stratos.adc.mgt.utils.PersistenceManager;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.event.Event;
+import org.apache.stratos.messaging.event.topology.*;
+import org.apache.stratos.messaging.listener.topology.*;
+import 
org.apache.stratos.messaging.message.processor.topology.TopologyMessageProcessorChain;
+import 
org.apache.stratos.messaging.message.receiver.topology.TopologyEventMessageDelegator;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyReceiver;
+
+public class StratosManagerTopologyReceiver implements Runnable {
+
+    private static final Log log = 
LogFactory.getLog(StratosManagerTopologyReceiver.class);
+
+    private TopologyReceiver stratosManagerTopologyReceiver;
+    private boolean terminate;
+
+    public StratosManagerTopologyReceiver() {
+        this.terminate = false;
+        this.stratosManagerTopologyReceiver = new 
TopologyReceiver(createMessageDelegator());
+    }
+
+    private TopologyEventMessageDelegator createMessageDelegator() {
+        TopologyMessageProcessorChain processorChain = 
createEventProcessorChain();
+        return new TopologyEventMessageDelegator(processorChain);
+    }
+
+    private TopologyMessageProcessorChain createEventProcessorChain() {
+
+        TopologyMessageProcessorChain processorChain = new 
TopologyMessageProcessorChain();
+
+        //add listner to Complete Topology Event
+        processorChain.addEventListener(new CompleteTopologyEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                try {
+                    TopologyManager.acquireReadLock();
+                    for (Service service : 
TopologyManager.getTopology().getServices()) {
+                        //iterate through all clusters
+                        for (Cluster cluster : service.getClusters()) {
+                            //get subscription details
+                            CartridgeSubscriptionInfo 
cartridgeSubscriptionInfo =
+                                    
getCartridgeSubscriptionInfo(cluster.getClusterId());
+
+                            if(cartridgeSubscriptionInfo != null) {
+                                //add the information to Topology Cluster 
Info. model
+                                
TopologyClusterInformationModel.getInstance().addCluster(cartridgeSubscriptionInfo.getTenantId(),
+                                        
cartridgeSubscriptionInfo.getCartridge(),
+                                        cartridgeSubscriptionInfo.getAlias(), 
cluster);
+                            }
+                        }
+                    }
+                } finally {
+                    TopologyManager.releaseReadLock();
+                }
+            }
+        });
+
+        //Cluster Created event listner
+        processorChain.addEventListener(new ClusterCreatedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+
+                ClusterCreatedEvent clustercreatedEvent = 
(ClusterCreatedEvent) event;
+                //get subscription details
+                CartridgeSubscriptionInfo cartridgeSubscriptionInfo =
+                        
getCartridgeSubscriptionInfo(clustercreatedEvent.getClusterId());
+
+                if(cartridgeSubscriptionInfo != null) {
+
+                    Cluster cluster;
+                    //acquire read lock
+                    TopologyManager.acquireReadLock();
+                    try {
+                        cluster = TopologyManager.getTopology().
+                                
getService(cartridgeSubscriptionInfo.getCartridge()).getCluster(cartridgeSubscriptionInfo.getClusterDomain());
+                    } finally {
+                        //release read lock
+                        TopologyManager.releaseReadLock();
+                    }
+
+                    //add the information to Topology Cluster Info. model
+                    
TopologyClusterInformationModel.getInstance().addCluster(cartridgeSubscriptionInfo.getTenantId(),
+                            cartridgeSubscriptionInfo.getCartridge(),
+                            cartridgeSubscriptionInfo.getAlias(), cluster);
+                }
+
+            }
+        });
+
+        //Cluster Removed event listner
+        processorChain.addEventListener(new ClusterRemovedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+
+                ClusterRemovedEvent clusterRemovedEvent = 
(ClusterRemovedEvent) event;
+
+                CartridgeSubscriptionInfo cartridgeSubscriptionInfo =
+                        
getCartridgeSubscriptionInfo(clusterRemovedEvent.getClusterId());
+
+                if (cartridgeSubscriptionInfo != null) {
+                    //remove the information from Topology Cluster Info. model
+                    
TopologyClusterInformationModel.getInstance().removeCluster(cartridgeSubscriptionInfo.getTenantId(),
+                            cartridgeSubscriptionInfo.getCartridge(),
+                            cartridgeSubscriptionInfo.getAlias());
+                }
+            }
+        });
+
+        //Member Started event listner
+        processorChain.addEventListener(new MemberStartedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+
+                MemberStartedEvent memberStartedEvent = (MemberStartedEvent) 
event;
+
+                String clusterDomain = memberStartedEvent.getClusterId();
+                CartridgeSubscriptionInfo cartridgeSubscriptionInfo = 
getCartridgeSubscriptionInfo(clusterDomain);
+
+                if(cartridgeSubscriptionInfo != null) {
+
+                    Cluster cluster;
+                    //acquire read lock
+                    TopologyManager.acquireReadLock();
+                    try {
+                        cluster = TopologyManager.getTopology().
+                                
getService(cartridgeSubscriptionInfo.getCartridge()).getCluster(cartridgeSubscriptionInfo.getClusterDomain());
+                    } finally {
+                        //release read lock
+                        TopologyManager.releaseReadLock();
+                    }
+
+                    
TopologyClusterInformationModel.getInstance().addCluster(cartridgeSubscriptionInfo.getTenantId(),
+                            cartridgeSubscriptionInfo.getCartridge(),
+                            cartridgeSubscriptionInfo.getAlias(), cluster);
+                }
+
+            }
+        });
+
+        //Member Activated event listner
+        processorChain.addEventListener(new MemberActivatedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+
+                MemberActivatedEvent memberActivatedEvent = 
(MemberActivatedEvent) event;
+
+                String clusterDomain = memberActivatedEvent.getClusterId();
+                CartridgeSubscriptionInfo cartridgeSubscriptionInfo = 
getCartridgeSubscriptionInfo(clusterDomain);
+
+                if(cartridgeSubscriptionInfo != null) {
+
+                    Cluster cluster;
+                    //acquire read lock
+                    TopologyManager.acquireReadLock();
+                    try {
+                        cluster = TopologyManager.getTopology().
+                                
getService(cartridgeSubscriptionInfo.getCartridge()).getCluster(cartridgeSubscriptionInfo.getClusterDomain());
+                    } finally {
+                        //release read lock
+                        TopologyManager.releaseReadLock();
+                    }
+
+                    
TopologyClusterInformationModel.getInstance().addCluster(cartridgeSubscriptionInfo.getTenantId(),
+                            cartridgeSubscriptionInfo.getCartridge(),
+                            cartridgeSubscriptionInfo.getAlias(), cluster);
+                }
+
+            }
+        });
+
+        //Member Suspended event listner
+        processorChain.addEventListener(new MemberSuspendedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+
+                MemberSuspendedEvent memberSuspendedEvent = 
(MemberSuspendedEvent) event;
+
+                String clusterDomain = memberSuspendedEvent.getClusterId();
+                CartridgeSubscriptionInfo cartridgeSubscriptionInfo = 
getCartridgeSubscriptionInfo(clusterDomain);
+
+                if(cartridgeSubscriptionInfo != null) {
+
+                    Cluster cluster;
+                    //acquire read lock
+                    TopologyManager.acquireReadLock();
+                    try {
+                        cluster = TopologyManager.getTopology().
+                                
getService(cartridgeSubscriptionInfo.getCartridge()).getCluster(cartridgeSubscriptionInfo.getClusterDomain());
+                    } finally {
+                        //release read lock
+                        TopologyManager.releaseReadLock();
+                    }
+
+                    
TopologyClusterInformationModel.getInstance().addCluster(cartridgeSubscriptionInfo.getTenantId(),
+                            cartridgeSubscriptionInfo.getCartridge(),
+                            cartridgeSubscriptionInfo.getAlias(), cluster);
+                }
+
+            }
+        });
+
+        //Member Terminated event listner
+        processorChain.addEventListener(new MemberTerminatedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+
+                MemberTerminatedEvent memberTerminatedEvent = 
(MemberTerminatedEvent) event;
+
+                String clusterDomain = memberTerminatedEvent.getClusterId();
+                CartridgeSubscriptionInfo cartridgeSubscriptionInfo = 
getCartridgeSubscriptionInfo(clusterDomain);
+
+                if(cartridgeSubscriptionInfo != null) {
+
+                    Cluster cluster;
+                    //acquire read lock
+                    TopologyManager.acquireReadLock();
+                    try {
+                        cluster = TopologyManager.getTopology().
+                                
getService(cartridgeSubscriptionInfo.getCartridge()).getCluster(cartridgeSubscriptionInfo.getClusterDomain());
+                    } finally {
+                        //release read lock
+                        TopologyManager.releaseReadLock();
+                    }
+
+                    
TopologyClusterInformationModel.getInstance().addCluster(cartridgeSubscriptionInfo.getTenantId(),
+                            cartridgeSubscriptionInfo.getCartridge(),
+                            cartridgeSubscriptionInfo.getAlias(), cluster);
+                }
+
+            }
+        });
+
+        return processorChain;
+    }
+
+    private CartridgeSubscriptionInfo getCartridgeSubscriptionInfo (String 
clusterDomain) {
+
+        try {
+            return 
PersistenceManager.getSubscriptionFromClusterId(clusterDomain);
+
+        } catch (Exception e) {
+            log.error("Error getting subscription information for cluster " + 
clusterDomain, e);
+            return null;
+        }
+    }
+
+    @Override
+    public void run() {
+
+        Thread thread = new Thread(stratosManagerTopologyReceiver);
+        thread.start();
+        log.info("Stratos Manager topology receiver thread started");
+
+        //Keep running till terminate is set from deactivate method of the 
component
+        while (!terminate) {
+            //loop while terminate = false
+        }
+        log.info("Stratos Manager topology receiver thread terminated");
+    }
+
+    //terminate Topology Receiver
+    public void terminate () {
+        stratosManagerTopologyReceiver.terminate();
+        terminate = true;
+    }
+}

Reply via email to