Repository: stratos
Updated Branches:
  refs/heads/docker-grouping-merge a279f9c11 -> 62d65fd73


adding application topic receiver in cloud controller


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/62d65fd7
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/62d65fd7
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/62d65fd7

Branch: refs/heads/docker-grouping-merge
Commit: 62d65fd734ce3f227781d3cd7cfad1127cccf477
Parents: a279f9c
Author: reka <[email protected]>
Authored: Tue Nov 4 15:38:28 2014 +0530
Committer: reka <[email protected]>
Committed: Tue Nov 4 15:39:34 2014 +0530

----------------------------------------------------------------------
 .../receiver/ClusterStatusTopicReceiver.java    | 113 -------------------
 .../internal/CloudControllerDSComponent.java    |  21 +++-
 .../application/ApplicationTopicReceiver.java   |  60 ++++++++++
 .../status/ClusterStatusTopicReceiver.java      | 113 +++++++++++++++++++
 .../InstanceStatusEventMessageDelegator.java    |  84 ++++++++++++++
 .../InstanceStatusEventMessageListener.java     |  84 ++++++++++++++
 .../status/InstanceStatusEventMessageQueue.java |  45 ++++++++
 .../InstanceStatusEventMessageDelegator.java    |  84 --------------
 .../InstanceStatusEventMessageListener.java     |  84 --------------
 .../status/InstanceStatusEventMessageQueue.java |  45 --------
 .../controller/topology/TopologyBuilder.java    |  27 +++++
 .../topology/TopologyEventPublisher.java        |   9 ++
 12 files changed, 438 insertions(+), 331 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/62d65fd7/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/application/status/receiver/ClusterStatusTopicReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/application/status/receiver/ClusterStatusTopicReceiver.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/application/status/receiver/ClusterStatusTopicReceiver.java
deleted file mode 100644
index 4cc7599..0000000
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/application/status/receiver/ClusterStatusTopicReceiver.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.cloud.controller.application.status.receiver;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.cloud.controller.topology.TopologyBuilder;
-import org.apache.stratos.messaging.event.Event;
-import org.apache.stratos.messaging.event.cluster.status.*;
-import org.apache.stratos.messaging.listener.cluster.status.*;
-import 
org.apache.stratos.messaging.message.receiver.cluster.status.ClusterStatusEventReceiver;
-
-public class ClusterStatusTopicReceiver implements Runnable {
-    private static final Log log = 
LogFactory.getLog(ClusterStatusTopicReceiver.class);
-
-    private ClusterStatusEventReceiver statusEventReceiver;
-    private boolean terminated;
-
-    public ClusterStatusTopicReceiver() {
-        this.statusEventReceiver = new ClusterStatusEventReceiver();
-        addEventListeners();
-    }
-
-    public void run() {
-        //FIXME this activated before autoscaler deployer activated.
-        try {
-            Thread.sleep(15000);
-        } catch (InterruptedException ignore) {
-        }
-        Thread thread = new Thread(statusEventReceiver);
-        thread.start();
-        if (log.isInfoEnabled()) {
-            log.info("Cloud controller application status thread started");
-        }
-
-        // Keep the thread live until terminated
-        while (!terminated) {
-            try {
-                Thread.sleep(1000);
-            } catch (InterruptedException ignore) {
-            }
-        }
-        if (log.isInfoEnabled()) {
-            log.info("Cloud controller application status thread terminated");
-        }
-
-    }
-
-    private void addEventListeners() {
-        // Listen to topology events that affect clusters
-        statusEventReceiver.addEventListener(new 
ClusterStatusClusterResetEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-                
TopologyBuilder.handleClusterReset((ClusterStatusClusterResetEvent) event);
-            }
-        });
-
-        statusEventReceiver.addEventListener(new 
ClusterStatusClusterCreatedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-                
TopologyBuilder.handleClusterCreated((ClusterStatusClusterCreatedEvent) event);
-            }
-        });
-
-        statusEventReceiver.addEventListener(new 
ClusterStatusClusterActivatedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-                
TopologyBuilder.handleClusterActivatedEvent((ClusterStatusClusterActivatedEvent)
 event);
-            }
-        });
-
-        statusEventReceiver.addEventListener(new 
ClusterStatusClusterTerminatedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-                
TopologyBuilder.handleClusterTerminatedEvent((ClusterStatusClusterTerminatedEvent)
 event);
-            }
-        });
-
-        statusEventReceiver.addEventListener(new 
ClusterStatusClusterTerminatingEventListener(){
-            @Override
-            protected void onEvent(Event event) {
-                
TopologyBuilder.handleClusterTerminatingEvent((ClusterStatusClusterTerminatingEvent)
 event);
-            }
-        });
-
-        statusEventReceiver.addEventListener(new 
ClusterStatusClusterInactivateEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-                
TopologyBuilder.handleClusterInActivateEvent((ClusterStatusClusterInactivateEvent)
 event);
-            }
-        });
-    }
-
-    public void setTerminated(boolean terminated) {
-        this.terminated = terminated;
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/62d65fd7/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java
index b286390..26edbf1 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java
@@ -23,13 +23,14 @@ package org.apache.stratos.cloud.controller.internal;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import 
org.apache.stratos.cloud.controller.application.status.receiver.ClusterStatusTopicReceiver;
+import 
org.apache.stratos.cloud.controller.receiver.application.ApplicationTopicReceiver;
+import 
org.apache.stratos.cloud.controller.receiver.cluster.status.ClusterStatusTopicReceiver;
 import org.apache.stratos.cloud.controller.exception.CloudControllerException;
 import org.apache.stratos.cloud.controller.impl.CloudControllerServiceImpl;
 import org.apache.stratos.cloud.controller.interfaces.CloudControllerService;
 import 
org.apache.stratos.cloud.controller.publisher.TopologySynchronizerTaskScheduler;
-import 
org.apache.stratos.cloud.controller.topic.instance.status.InstanceStatusEventMessageDelegator;
-import 
org.apache.stratos.cloud.controller.topic.instance.status.InstanceStatusEventMessageListener;
+import 
org.apache.stratos.cloud.controller.receiver.instance.status.InstanceStatusEventMessageDelegator;
+import 
org.apache.stratos.cloud.controller.receiver.instance.status.InstanceStatusEventMessageListener;
 import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
 import org.apache.stratos.cloud.controller.util.ServiceReferenceHolder;
 import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
@@ -65,6 +66,8 @@ public class CloudControllerDSComponent {
 
     private static final Log log = 
LogFactory.getLog(CloudControllerDSComponent.class);
     private ClusterStatusTopicReceiver clusterStatusTopicReceiver;
+    private ApplicationTopicReceiver applicationTopicReceiver;
+
     protected void activate(ComponentContext context) {
         try {
                        
@@ -82,12 +85,20 @@ public class CloudControllerDSComponent {
                 log.info("Instance status message receiver thread started");
             }
 
+            applicationTopicReceiver = new ApplicationTopicReceiver();
+            Thread tApplicationTopicReceiver = new 
Thread(applicationTopicReceiver);
+            tApplicationTopicReceiver.start();
+
+            if (log.isInfoEnabled()) {
+                log.info("Application Receiver thread started");
+            }
+
             clusterStatusTopicReceiver = new ClusterStatusTopicReceiver();
             Thread tClusterStatusTopicReceiver = new 
Thread(clusterStatusTopicReceiver);
             tClusterStatusTopicReceiver.start();
 
-            if (log.isDebugEnabled()) {
-                log.debug("Cluster status Receiver thread started");
+            if (log.isInfoEnabled()) {
+                log.info("Cluster status Receiver thread started");
             }
 
                // Register cloud controller service

http://git-wip-us.apache.org/repos/asf/stratos/blob/62d65fd7/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/application/ApplicationTopicReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/application/ApplicationTopicReceiver.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/application/ApplicationTopicReceiver.java
new file mode 100644
index 0000000..7f8fd56
--- /dev/null
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/application/ApplicationTopicReceiver.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.cloud.controller.receiver.application;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cloud.controller.topology.TopologyBuilder;
+import org.apache.stratos.messaging.event.Event;
+import 
org.apache.stratos.messaging.event.applications.ApplicationTerminatedEvent;
+import 
org.apache.stratos.messaging.listener.applications.ApplicationTerminatedEventListener;
+import 
org.apache.stratos.messaging.message.receiver.applications.ApplicationsEventReceiver;
+
+/**
+ * This is to receive the application topic messages.
+ */
+public class ApplicationTopicReceiver implements Runnable {
+    private static final Log log = 
LogFactory.getLog(ApplicationTopicReceiver.class);
+    private ApplicationsEventReceiver applicationsEventReceiver;
+    private boolean terminated;
+
+    @Override
+    public void run() {
+        this.applicationsEventReceiver = new ApplicationsEventReceiver();
+        addEventListeners();
+
+    }
+
+    private void addEventListeners() {
+        applicationsEventReceiver.addEventListener(new 
ApplicationTerminatedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                //Remove the application related data
+                log.info("ApplicationTerminatedEvent received for 
[application]");
+                ApplicationTerminatedEvent terminatedEvent = 
(ApplicationTerminatedEvent)event;
+                String appId = terminatedEvent.getAppId();
+                TopologyBuilder.handleApplicationClustersRemoved(appId);
+            }
+        });
+    }
+
+    public void setTerminated(boolean terminated) {
+        this.terminated = terminated;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/62d65fd7/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/cluster/status/ClusterStatusTopicReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/cluster/status/ClusterStatusTopicReceiver.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/cluster/status/ClusterStatusTopicReceiver.java
new file mode 100644
index 0000000..196e1e0
--- /dev/null
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/cluster/status/ClusterStatusTopicReceiver.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.cloud.controller.receiver.cluster.status;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cloud.controller.topology.TopologyBuilder;
+import org.apache.stratos.messaging.event.Event;
+import org.apache.stratos.messaging.event.cluster.status.*;
+import org.apache.stratos.messaging.listener.cluster.status.*;
+import 
org.apache.stratos.messaging.message.receiver.cluster.status.ClusterStatusEventReceiver;
+
+public class ClusterStatusTopicReceiver implements Runnable {
+    private static final Log log = 
LogFactory.getLog(ClusterStatusTopicReceiver.class);
+
+    private ClusterStatusEventReceiver statusEventReceiver;
+    private boolean terminated;
+
+    public ClusterStatusTopicReceiver() {
+        this.statusEventReceiver = new ClusterStatusEventReceiver();
+        addEventListeners();
+    }
+
+    public void run() {
+        //FIXME this activated before autoscaler deployer activated.
+        try {
+            Thread.sleep(15000);
+        } catch (InterruptedException ignore) {
+        }
+        Thread thread = new Thread(statusEventReceiver);
+        thread.start();
+        if (log.isInfoEnabled()) {
+            log.info("Cloud controller application status thread started");
+        }
+
+        // Keep the thread live until terminated
+        while (!terminated) {
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException ignore) {
+            }
+        }
+        if (log.isInfoEnabled()) {
+            log.info("Cloud controller application status thread terminated");
+        }
+
+    }
+
+    private void addEventListeners() {
+        // Listen to topology events that affect clusters
+        statusEventReceiver.addEventListener(new 
ClusterStatusClusterResetEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                
TopologyBuilder.handleClusterReset((ClusterStatusClusterResetEvent) event);
+            }
+        });
+
+        statusEventReceiver.addEventListener(new 
ClusterStatusClusterCreatedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                
TopologyBuilder.handleClusterCreated((ClusterStatusClusterCreatedEvent) event);
+            }
+        });
+
+        statusEventReceiver.addEventListener(new 
ClusterStatusClusterActivatedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                
TopologyBuilder.handleClusterActivatedEvent((ClusterStatusClusterActivatedEvent)
 event);
+            }
+        });
+
+        statusEventReceiver.addEventListener(new 
ClusterStatusClusterTerminatedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                
TopologyBuilder.handleClusterTerminatedEvent((ClusterStatusClusterTerminatedEvent)
 event);
+            }
+        });
+
+        statusEventReceiver.addEventListener(new 
ClusterStatusClusterTerminatingEventListener(){
+            @Override
+            protected void onEvent(Event event) {
+                
TopologyBuilder.handleClusterTerminatingEvent((ClusterStatusClusterTerminatingEvent)
 event);
+            }
+        });
+
+        statusEventReceiver.addEventListener(new 
ClusterStatusClusterInactivateEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                
TopologyBuilder.handleClusterInActivateEvent((ClusterStatusClusterInactivateEvent)
 event);
+            }
+        });
+    }
+
+    public void setTerminated(boolean terminated) {
+        this.terminated = terminated;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/62d65fd7/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/instance/status/InstanceStatusEventMessageDelegator.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/instance/status/InstanceStatusEventMessageDelegator.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/instance/status/InstanceStatusEventMessageDelegator.java
new file mode 100644
index 0000000..5adf58f
--- /dev/null
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/instance/status/InstanceStatusEventMessageDelegator.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.cloud.controller.receiver.instance.status;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cloud.controller.topology.TopologyBuilder;
+import 
org.apache.stratos.messaging.event.instance.status.InstanceActivatedEvent;
+import 
org.apache.stratos.messaging.event.instance.status.InstanceMaintenanceModeEvent;
+import 
org.apache.stratos.messaging.event.instance.status.InstanceReadyToShutdownEvent;
+import org.apache.stratos.messaging.event.instance.status.InstanceStartedEvent;
+import org.apache.stratos.messaging.util.Constants;
+import org.apache.stratos.messaging.util.Util;
+
+import javax.jms.TextMessage;
+
+public class InstanceStatusEventMessageDelegator implements Runnable {
+    private static final Log log = 
LogFactory.getLog(InstanceStatusEventMessageDelegator.class);
+
+    @Override
+    public void run() {
+        log.info("Instance status event message delegator started");
+
+        while (true) {
+            try {
+                TextMessage message = 
InstanceStatusEventMessageQueue.getInstance().take();
+
+                // retrieve the header
+                String type = 
message.getStringProperty(Constants.EVENT_CLASS_NAME);
+                log.info(String.format("Instance status event message received 
from queue: %s", type));
+
+                if (InstanceStartedEvent.class.getName().equals(type)) {
+                    // retrieve the actual message
+                    String json = message.getText();
+                        
TopologyBuilder.handleMemberStarted((InstanceStartedEvent) Util.
+                                jsonToObject(json, 
InstanceStartedEvent.class));
+
+                } else if 
(InstanceActivatedEvent.class.getName().equals(type)) {
+                    // retrieve the actual message
+                    String json = message.getText();
+                    
TopologyBuilder.handleMemberActivated((InstanceActivatedEvent) Util.
+                                jsonToObject(json, 
InstanceActivatedEvent.class));
+
+                } else if 
(InstanceReadyToShutdownEvent.class.getName().equals(type)) {
+                    //retrieve the actual message
+                    String json = message.getText();
+                    
TopologyBuilder.handleMemberReadyToShutdown((InstanceReadyToShutdownEvent) Util.
+                                jsonToObject(json, 
InstanceReadyToShutdownEvent.class));
+
+                } else if 
(InstanceMaintenanceModeEvent.class.getName().equals(type)) {
+                    //retrieve the actual message
+                    String json = message.getText();
+                    
TopologyBuilder.handleMemberMaintenance((InstanceMaintenanceModeEvent) Util.
+                                jsonToObject(json, 
InstanceMaintenanceModeEvent.class));
+
+                } else {
+                    log.warn("Event message received is not 
InstanceStartedEvent or InstanceActivatedEvent");
+                }
+
+                } catch (Exception e) {
+                String error = "Failed to retrieve the instance status event 
message";
+                log.error(error, e);
+                // Commenting throwing the error. Otherwise thread will not 
execute if an exception is thrown.
+                //throw new RuntimeException(error, e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/62d65fd7/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/instance/status/InstanceStatusEventMessageListener.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/instance/status/InstanceStatusEventMessageListener.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/instance/status/InstanceStatusEventMessageListener.java
new file mode 100644
index 0000000..3523544
--- /dev/null
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/instance/status/InstanceStatusEventMessageListener.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.cloud.controller.receiver.instance.status;
+
+import javax.jms.JMSException;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.util.Constants;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
+/**
+ * this is to handle the topology subscription
+ */
+public class InstanceStatusEventMessageListener implements MqttCallback {
+       public static final String ORG_APACHE_STRATOS_MESSAGING_EVENT =
+                       "org.apache.stratos.messaging.event.";
+       private static final Log log = LogFactory
+                       .getLog(InstanceStatusEventMessageListener.class);
+
+       @Override
+       public void connectionLost(Throwable arg0) {
+               if (log.isDebugEnabled()) {
+                       log.debug("Connection lost");
+               }
+
+       }
+
+       @Override
+       public void deliveryComplete(IMqttDeliveryToken arg0) {
+               if (log.isDebugEnabled()) {
+                       log.debug("Delivery completed");
+               }
+       }
+
+       @Override
+       public void messageArrived(String topic, MqttMessage message)
+                       throws Exception {
+               if (message instanceof MqttMessage) {
+
+                       TextMessage receivedMessage = new ActiveMQTextMessage();
+
+                       try {
+                               if (log.isDebugEnabled()) {
+                                       log.debug(String.format(
+                                                       "Instance notifier 
message received: %s",
+                                                       
receivedMessage.getText()));
+                               }
+                               receivedMessage.setText(new 
String(message.getPayload()));
+                               
receivedMessage.setStringProperty(Constants.EVENT_CLASS_NAME,
+                                               
ORG_APACHE_STRATOS_MESSAGING_EVENT.concat(topic.replace(
+                                                               "/", ".")));
+
+                               // Add received message to the queue
+                               
InstanceStatusEventMessageQueue.getInstance().add(
+                                               receivedMessage);
+
+                       } catch (JMSException e) {
+                               log.error(e.getMessage(), e);
+                       }
+               }
+
+       }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/62d65fd7/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/instance/status/InstanceStatusEventMessageQueue.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/instance/status/InstanceStatusEventMessageQueue.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/instance/status/InstanceStatusEventMessageQueue.java
new file mode 100644
index 0000000..a978e62
--- /dev/null
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/instance/status/InstanceStatusEventMessageQueue.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.cloud.controller.receiver.instance.status;
+
+import javax.jms.TextMessage;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Implements a blocking queue for managing instance status event messages.
+ */
+public class InstanceStatusEventMessageQueue extends 
LinkedBlockingQueue<TextMessage>{
+       private static final long serialVersionUID = 828304342209475302L;
+       private static volatile InstanceStatusEventMessageQueue instance;
+
+    private InstanceStatusEventMessageQueue(){
+    }
+
+    public static InstanceStatusEventMessageQueue getInstance() {
+        if (instance == null) {
+            synchronized (InstanceStatusEventMessageQueue.class){
+                if (instance == null) {
+                    instance = new InstanceStatusEventMessageQueue();
+                }
+            }
+        }
+        return instance;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/62d65fd7/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageDelegator.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageDelegator.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageDelegator.java
deleted file mode 100644
index dd279ed..0000000
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageDelegator.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.cloud.controller.topic.instance.status;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.cloud.controller.topology.TopologyBuilder;
-import 
org.apache.stratos.messaging.event.instance.status.InstanceActivatedEvent;
-import 
org.apache.stratos.messaging.event.instance.status.InstanceMaintenanceModeEvent;
-import 
org.apache.stratos.messaging.event.instance.status.InstanceReadyToShutdownEvent;
-import org.apache.stratos.messaging.event.instance.status.InstanceStartedEvent;
-import org.apache.stratos.messaging.util.Constants;
-import org.apache.stratos.messaging.util.Util;
-
-import javax.jms.TextMessage;
-
-public class InstanceStatusEventMessageDelegator implements Runnable {
-    private static final Log log = 
LogFactory.getLog(InstanceStatusEventMessageDelegator.class);
-
-    @Override
-    public void run() {
-        log.info("Instance status event message delegator started");
-
-        while (true) {
-            try {
-                TextMessage message = 
InstanceStatusEventMessageQueue.getInstance().take();
-
-                // retrieve the header
-                String type = 
message.getStringProperty(Constants.EVENT_CLASS_NAME);
-                log.info(String.format("Instance status event message received 
from queue: %s", type));
-
-                if (InstanceStartedEvent.class.getName().equals(type)) {
-                    // retrieve the actual message
-                    String json = message.getText();
-                        
TopologyBuilder.handleMemberStarted((InstanceStartedEvent) Util.
-                                jsonToObject(json, 
InstanceStartedEvent.class));
-
-                } else if 
(InstanceActivatedEvent.class.getName().equals(type)) {
-                    // retrieve the actual message
-                    String json = message.getText();
-                    
TopologyBuilder.handleMemberActivated((InstanceActivatedEvent) Util.
-                                jsonToObject(json, 
InstanceActivatedEvent.class));
-
-                } else if 
(InstanceReadyToShutdownEvent.class.getName().equals(type)) {
-                    //retrieve the actual message
-                    String json = message.getText();
-                    
TopologyBuilder.handleMemberReadyToShutdown((InstanceReadyToShutdownEvent) Util.
-                                jsonToObject(json, 
InstanceReadyToShutdownEvent.class));
-
-                } else if 
(InstanceMaintenanceModeEvent.class.getName().equals(type)) {
-                    //retrieve the actual message
-                    String json = message.getText();
-                    
TopologyBuilder.handleMemberMaintenance((InstanceMaintenanceModeEvent) Util.
-                                jsonToObject(json, 
InstanceMaintenanceModeEvent.class));
-
-                } else {
-                    log.warn("Event message received is not 
InstanceStartedEvent or InstanceActivatedEvent");
-                }
-
-                } catch (Exception e) {
-                String error = "Failed to retrieve the instance status event 
message";
-                log.error(error, e);
-                // Commenting throwing the error. Otherwise thread will not 
execute if an exception is thrown.
-                //throw new RuntimeException(error, e);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/62d65fd7/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageListener.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageListener.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageListener.java
deleted file mode 100644
index 1f72e76..0000000
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageListener.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.cloud.controller.topic.instance.status;
-
-import javax.jms.JMSException;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.command.ActiveMQTextMessage;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.util.Constants;
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
-import org.eclipse.paho.client.mqttv3.MqttCallback;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
-
-/**
- * this is to handle the topology subscription
- */
-public class InstanceStatusEventMessageListener implements MqttCallback {
-       public static final String ORG_APACHE_STRATOS_MESSAGING_EVENT =
-                       "org.apache.stratos.messaging.event.";
-       private static final Log log = LogFactory
-                       .getLog(InstanceStatusEventMessageListener.class);
-
-       @Override
-       public void connectionLost(Throwable arg0) {
-               if (log.isDebugEnabled()) {
-                       log.debug("Connection lost");
-               }
-
-       }
-
-       @Override
-       public void deliveryComplete(IMqttDeliveryToken arg0) {
-               if (log.isDebugEnabled()) {
-                       log.debug("Delivery completed");
-               }
-       }
-
-       @Override
-       public void messageArrived(String topic, MqttMessage message)
-                       throws Exception {
-               if (message instanceof MqttMessage) {
-
-                       TextMessage receivedMessage = new ActiveMQTextMessage();
-
-                       try {
-                               if (log.isDebugEnabled()) {
-                                       log.debug(String.format(
-                                                       "Instance notifier 
message received: %s",
-                                                       
receivedMessage.getText()));
-                               }
-                               receivedMessage.setText(new 
String(message.getPayload()));
-                               
receivedMessage.setStringProperty(Constants.EVENT_CLASS_NAME,
-                                               
ORG_APACHE_STRATOS_MESSAGING_EVENT.concat(topic.replace(
-                                                               "/", ".")));
-
-                               // Add received message to the queue
-                               
InstanceStatusEventMessageQueue.getInstance().add(
-                                               receivedMessage);
-
-                       } catch (JMSException e) {
-                               log.error(e.getMessage(), e);
-                       }
-               }
-
-       }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/62d65fd7/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageQueue.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageQueue.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageQueue.java
deleted file mode 100644
index 4e69c16..0000000
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageQueue.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.cloud.controller.topic.instance.status;
-
-import javax.jms.TextMessage;
-import java.util.concurrent.LinkedBlockingQueue;
-
-/**
- * Implements a blocking queue for managing instance status event messages.
- */
-public class InstanceStatusEventMessageQueue extends 
LinkedBlockingQueue<TextMessage>{
-       private static final long serialVersionUID = 828304342209475302L;
-       private static volatile InstanceStatusEventMessageQueue instance;
-
-    private InstanceStatusEventMessageQueue(){
-    }
-
-    public static InstanceStatusEventMessageQueue getInstance() {
-        if (instance == null) {
-            synchronized (InstanceStatusEventMessageQueue.class){
-                if (instance == null) {
-                    instance = new InstanceStatusEventMessageQueue();
-                }
-            }
-        }
-        return instance;
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/62d65fd7/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
index 6015d87..b957b24 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java
@@ -40,6 +40,7 @@ import org.apache.stratos.messaging.event.topology.*;
 import 
org.apache.stratos.metadata.client.defaults.DefaultMetaDataServiceClient;
 import org.apache.stratos.metadata.client.defaults.MetaDataServiceClient;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -177,6 +178,32 @@ public class TopologyBuilder {
 
     }
 
+    public static void handleApplicationClustersRemoved(String appId) {
+        TopologyManager.acquireWriteLock();
+
+        List<Cluster> removedClusters = new ArrayList<Cluster>();
+        try {
+            Topology topology = TopologyManager.getTopology();
+            for(Service service : topology.getServices()) {
+                for(Cluster cluster : service.getClusters()) {
+                    if(cluster.getAppId().equals(appId)) {
+                        
removedClusters.add(service.removeCluster(cluster.getClusterId()));
+                    }
+                }
+            }
+            log.info("Application Cluster " + appId + " are removed from the 
topology");
+
+            TopologyManager.updateTopology(topology);
+
+        } finally {
+            TopologyManager.releaseWriteLock();
+        }
+
+        TopologyEventPublisher.sendApplicationClustersRemoved(appId, 
removedClusters);
+
+    }
+
+
 
     public static void handleClusterReset(ClusterStatusClusterResetEvent 
event) {
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/62d65fd7/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
index df5cf82..76bdb5d 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
@@ -129,6 +129,15 @@ public class TopologyEventPublisher {
         publishEvent(new ApplicationClustersCreatedEvent(clusters, appId));
     }
 
+    public static void sendApplicationClustersRemoved(String appId, 
List<Cluster> clusters) {
+
+        if (log.isInfoEnabled()) {
+            log.info("Publishing Application Clusters removed event for 
Application: " + appId);
+        }
+
+        //TODO publishEvent(new ApplicationClustersCreatedEvent(clusters, 
appId));
+    }
+
 //    public static void sendApplicationRemovedEvent(String applicationId, 
Set<ClusterDataHolder> clusterData,
 //                                                   int tenantId, String 
tenantDomain) {
 //

Reply via email to