Repository: stratos Updated Branches: refs/heads/docker-grouping-merge a279f9c11 -> 62d65fd73
adding application topic receiver in cloud controller Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/62d65fd7 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/62d65fd7 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/62d65fd7 Branch: refs/heads/docker-grouping-merge Commit: 62d65fd734ce3f227781d3cd7cfad1127cccf477 Parents: a279f9c Author: reka <[email protected]> Authored: Tue Nov 4 15:38:28 2014 +0530 Committer: reka <[email protected]> Committed: Tue Nov 4 15:39:34 2014 +0530 ---------------------------------------------------------------------- .../receiver/ClusterStatusTopicReceiver.java | 113 ------------------- .../internal/CloudControllerDSComponent.java | 21 +++- .../application/ApplicationTopicReceiver.java | 60 ++++++++++ .../status/ClusterStatusTopicReceiver.java | 113 +++++++++++++++++++ .../InstanceStatusEventMessageDelegator.java | 84 ++++++++++++++ .../InstanceStatusEventMessageListener.java | 84 ++++++++++++++ .../status/InstanceStatusEventMessageQueue.java | 45 ++++++++ .../InstanceStatusEventMessageDelegator.java | 84 -------------- .../InstanceStatusEventMessageListener.java | 84 -------------- .../status/InstanceStatusEventMessageQueue.java | 45 -------- .../controller/topology/TopologyBuilder.java | 27 +++++ .../topology/TopologyEventPublisher.java | 9 ++ 12 files changed, 438 insertions(+), 331 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/62d65fd7/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/application/status/receiver/ClusterStatusTopicReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/application/status/receiver/ClusterStatusTopicReceiver.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/application/status/receiver/ClusterStatusTopicReceiver.java deleted file mode 100644 index 4cc7599..0000000 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/application/status/receiver/ClusterStatusTopicReceiver.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.cloud.controller.application.status.receiver; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.cloud.controller.topology.TopologyBuilder; -import org.apache.stratos.messaging.event.Event; -import org.apache.stratos.messaging.event.cluster.status.*; -import org.apache.stratos.messaging.listener.cluster.status.*; -import org.apache.stratos.messaging.message.receiver.cluster.status.ClusterStatusEventReceiver; - -public class ClusterStatusTopicReceiver implements Runnable { - private static final Log log = LogFactory.getLog(ClusterStatusTopicReceiver.class); - - private ClusterStatusEventReceiver statusEventReceiver; - private boolean terminated; - - public ClusterStatusTopicReceiver() { - this.statusEventReceiver = new ClusterStatusEventReceiver(); - addEventListeners(); - } - - public void run() { - //FIXME this activated before autoscaler deployer activated. - try { - Thread.sleep(15000); - } catch (InterruptedException ignore) { - } - Thread thread = new Thread(statusEventReceiver); - thread.start(); - if (log.isInfoEnabled()) { - log.info("Cloud controller application status thread started"); - } - - // Keep the thread live until terminated - while (!terminated) { - try { - Thread.sleep(1000); - } catch (InterruptedException ignore) { - } - } - if (log.isInfoEnabled()) { - log.info("Cloud controller application status thread terminated"); - } - - } - - private void addEventListeners() { - // Listen to topology events that affect clusters - statusEventReceiver.addEventListener(new ClusterStatusClusterResetEventListener() { - @Override - protected void onEvent(Event event) { - TopologyBuilder.handleClusterReset((ClusterStatusClusterResetEvent) event); - } - }); - - statusEventReceiver.addEventListener(new ClusterStatusClusterCreatedEventListener() { - @Override - protected void onEvent(Event event) { - TopologyBuilder.handleClusterCreated((ClusterStatusClusterCreatedEvent) event); - } - }); - - statusEventReceiver.addEventListener(new ClusterStatusClusterActivatedEventListener() { - @Override - protected void onEvent(Event event) { - TopologyBuilder.handleClusterActivatedEvent((ClusterStatusClusterActivatedEvent) event); - } - }); - - statusEventReceiver.addEventListener(new ClusterStatusClusterTerminatedEventListener() { - @Override - protected void onEvent(Event event) { - TopologyBuilder.handleClusterTerminatedEvent((ClusterStatusClusterTerminatedEvent) event); - } - }); - - statusEventReceiver.addEventListener(new ClusterStatusClusterTerminatingEventListener(){ - @Override - protected void onEvent(Event event) { - TopologyBuilder.handleClusterTerminatingEvent((ClusterStatusClusterTerminatingEvent) event); - } - }); - - statusEventReceiver.addEventListener(new ClusterStatusClusterInactivateEventListener() { - @Override - protected void onEvent(Event event) { - TopologyBuilder.handleClusterInActivateEvent((ClusterStatusClusterInactivateEvent) event); - } - }); - } - - public void setTerminated(boolean terminated) { - this.terminated = terminated; - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/62d65fd7/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java index b286390..26edbf1 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java @@ -23,13 +23,14 @@ package org.apache.stratos.cloud.controller.internal; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.stratos.cloud.controller.application.status.receiver.ClusterStatusTopicReceiver; +import org.apache.stratos.cloud.controller.receiver.application.ApplicationTopicReceiver; +import org.apache.stratos.cloud.controller.receiver.cluster.status.ClusterStatusTopicReceiver; import org.apache.stratos.cloud.controller.exception.CloudControllerException; import org.apache.stratos.cloud.controller.impl.CloudControllerServiceImpl; import org.apache.stratos.cloud.controller.interfaces.CloudControllerService; import org.apache.stratos.cloud.controller.publisher.TopologySynchronizerTaskScheduler; -import org.apache.stratos.cloud.controller.topic.instance.status.InstanceStatusEventMessageDelegator; -import org.apache.stratos.cloud.controller.topic.instance.status.InstanceStatusEventMessageListener; +import org.apache.stratos.cloud.controller.receiver.instance.status.InstanceStatusEventMessageDelegator; +import org.apache.stratos.cloud.controller.receiver.instance.status.InstanceStatusEventMessageListener; import org.apache.stratos.cloud.controller.util.CloudControllerConstants; import org.apache.stratos.cloud.controller.util.ServiceReferenceHolder; import org.apache.stratos.messaging.broker.publish.EventPublisherPool; @@ -65,6 +66,8 @@ public class CloudControllerDSComponent { private static final Log log = LogFactory.getLog(CloudControllerDSComponent.class); private ClusterStatusTopicReceiver clusterStatusTopicReceiver; + private ApplicationTopicReceiver applicationTopicReceiver; + protected void activate(ComponentContext context) { try { @@ -82,12 +85,20 @@ public class CloudControllerDSComponent { log.info("Instance status message receiver thread started"); } + applicationTopicReceiver = new ApplicationTopicReceiver(); + Thread tApplicationTopicReceiver = new Thread(applicationTopicReceiver); + tApplicationTopicReceiver.start(); + + if (log.isInfoEnabled()) { + log.info("Application Receiver thread started"); + } + clusterStatusTopicReceiver = new ClusterStatusTopicReceiver(); Thread tClusterStatusTopicReceiver = new Thread(clusterStatusTopicReceiver); tClusterStatusTopicReceiver.start(); - if (log.isDebugEnabled()) { - log.debug("Cluster status Receiver thread started"); + if (log.isInfoEnabled()) { + log.info("Cluster status Receiver thread started"); } // Register cloud controller service http://git-wip-us.apache.org/repos/asf/stratos/blob/62d65fd7/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/application/ApplicationTopicReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/application/ApplicationTopicReceiver.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/application/ApplicationTopicReceiver.java new file mode 100644 index 0000000..7f8fd56 --- /dev/null +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/application/ApplicationTopicReceiver.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.cloud.controller.receiver.application; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.cloud.controller.topology.TopologyBuilder; +import org.apache.stratos.messaging.event.Event; +import org.apache.stratos.messaging.event.applications.ApplicationTerminatedEvent; +import org.apache.stratos.messaging.listener.applications.ApplicationTerminatedEventListener; +import org.apache.stratos.messaging.message.receiver.applications.ApplicationsEventReceiver; + +/** + * This is to receive the application topic messages. + */ +public class ApplicationTopicReceiver implements Runnable { + private static final Log log = LogFactory.getLog(ApplicationTopicReceiver.class); + private ApplicationsEventReceiver applicationsEventReceiver; + private boolean terminated; + + @Override + public void run() { + this.applicationsEventReceiver = new ApplicationsEventReceiver(); + addEventListeners(); + + } + + private void addEventListeners() { + applicationsEventReceiver.addEventListener(new ApplicationTerminatedEventListener() { + @Override + protected void onEvent(Event event) { + //Remove the application related data + log.info("ApplicationTerminatedEvent received for [application]"); + ApplicationTerminatedEvent terminatedEvent = (ApplicationTerminatedEvent)event; + String appId = terminatedEvent.getAppId(); + TopologyBuilder.handleApplicationClustersRemoved(appId); + } + }); + } + + public void setTerminated(boolean terminated) { + this.terminated = terminated; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/62d65fd7/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/cluster/status/ClusterStatusTopicReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/cluster/status/ClusterStatusTopicReceiver.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/cluster/status/ClusterStatusTopicReceiver.java new file mode 100644 index 0000000..196e1e0 --- /dev/null +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/cluster/status/ClusterStatusTopicReceiver.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.cloud.controller.receiver.cluster.status; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.cloud.controller.topology.TopologyBuilder; +import org.apache.stratos.messaging.event.Event; +import org.apache.stratos.messaging.event.cluster.status.*; +import org.apache.stratos.messaging.listener.cluster.status.*; +import org.apache.stratos.messaging.message.receiver.cluster.status.ClusterStatusEventReceiver; + +public class ClusterStatusTopicReceiver implements Runnable { + private static final Log log = LogFactory.getLog(ClusterStatusTopicReceiver.class); + + private ClusterStatusEventReceiver statusEventReceiver; + private boolean terminated; + + public ClusterStatusTopicReceiver() { + this.statusEventReceiver = new ClusterStatusEventReceiver(); + addEventListeners(); + } + + public void run() { + //FIXME this activated before autoscaler deployer activated. + try { + Thread.sleep(15000); + } catch (InterruptedException ignore) { + } + Thread thread = new Thread(statusEventReceiver); + thread.start(); + if (log.isInfoEnabled()) { + log.info("Cloud controller application status thread started"); + } + + // Keep the thread live until terminated + while (!terminated) { + try { + Thread.sleep(1000); + } catch (InterruptedException ignore) { + } + } + if (log.isInfoEnabled()) { + log.info("Cloud controller application status thread terminated"); + } + + } + + private void addEventListeners() { + // Listen to topology events that affect clusters + statusEventReceiver.addEventListener(new ClusterStatusClusterResetEventListener() { + @Override + protected void onEvent(Event event) { + TopologyBuilder.handleClusterReset((ClusterStatusClusterResetEvent) event); + } + }); + + statusEventReceiver.addEventListener(new ClusterStatusClusterCreatedEventListener() { + @Override + protected void onEvent(Event event) { + TopologyBuilder.handleClusterCreated((ClusterStatusClusterCreatedEvent) event); + } + }); + + statusEventReceiver.addEventListener(new ClusterStatusClusterActivatedEventListener() { + @Override + protected void onEvent(Event event) { + TopologyBuilder.handleClusterActivatedEvent((ClusterStatusClusterActivatedEvent) event); + } + }); + + statusEventReceiver.addEventListener(new ClusterStatusClusterTerminatedEventListener() { + @Override + protected void onEvent(Event event) { + TopologyBuilder.handleClusterTerminatedEvent((ClusterStatusClusterTerminatedEvent) event); + } + }); + + statusEventReceiver.addEventListener(new ClusterStatusClusterTerminatingEventListener(){ + @Override + protected void onEvent(Event event) { + TopologyBuilder.handleClusterTerminatingEvent((ClusterStatusClusterTerminatingEvent) event); + } + }); + + statusEventReceiver.addEventListener(new ClusterStatusClusterInactivateEventListener() { + @Override + protected void onEvent(Event event) { + TopologyBuilder.handleClusterInActivateEvent((ClusterStatusClusterInactivateEvent) event); + } + }); + } + + public void setTerminated(boolean terminated) { + this.terminated = terminated; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/62d65fd7/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/instance/status/InstanceStatusEventMessageDelegator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/instance/status/InstanceStatusEventMessageDelegator.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/instance/status/InstanceStatusEventMessageDelegator.java new file mode 100644 index 0000000..5adf58f --- /dev/null +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/instance/status/InstanceStatusEventMessageDelegator.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.cloud.controller.receiver.instance.status; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.cloud.controller.topology.TopologyBuilder; +import org.apache.stratos.messaging.event.instance.status.InstanceActivatedEvent; +import org.apache.stratos.messaging.event.instance.status.InstanceMaintenanceModeEvent; +import org.apache.stratos.messaging.event.instance.status.InstanceReadyToShutdownEvent; +import org.apache.stratos.messaging.event.instance.status.InstanceStartedEvent; +import org.apache.stratos.messaging.util.Constants; +import org.apache.stratos.messaging.util.Util; + +import javax.jms.TextMessage; + +public class InstanceStatusEventMessageDelegator implements Runnable { + private static final Log log = LogFactory.getLog(InstanceStatusEventMessageDelegator.class); + + @Override + public void run() { + log.info("Instance status event message delegator started"); + + while (true) { + try { + TextMessage message = InstanceStatusEventMessageQueue.getInstance().take(); + + // retrieve the header + String type = message.getStringProperty(Constants.EVENT_CLASS_NAME); + log.info(String.format("Instance status event message received from queue: %s", type)); + + if (InstanceStartedEvent.class.getName().equals(type)) { + // retrieve the actual message + String json = message.getText(); + TopologyBuilder.handleMemberStarted((InstanceStartedEvent) Util. + jsonToObject(json, InstanceStartedEvent.class)); + + } else if (InstanceActivatedEvent.class.getName().equals(type)) { + // retrieve the actual message + String json = message.getText(); + TopologyBuilder.handleMemberActivated((InstanceActivatedEvent) Util. + jsonToObject(json, InstanceActivatedEvent.class)); + + } else if (InstanceReadyToShutdownEvent.class.getName().equals(type)) { + //retrieve the actual message + String json = message.getText(); + TopologyBuilder.handleMemberReadyToShutdown((InstanceReadyToShutdownEvent) Util. + jsonToObject(json, InstanceReadyToShutdownEvent.class)); + + } else if (InstanceMaintenanceModeEvent.class.getName().equals(type)) { + //retrieve the actual message + String json = message.getText(); + TopologyBuilder.handleMemberMaintenance((InstanceMaintenanceModeEvent) Util. + jsonToObject(json, InstanceMaintenanceModeEvent.class)); + + } else { + log.warn("Event message received is not InstanceStartedEvent or InstanceActivatedEvent"); + } + + } catch (Exception e) { + String error = "Failed to retrieve the instance status event message"; + log.error(error, e); + // Commenting throwing the error. Otherwise thread will not execute if an exception is thrown. + //throw new RuntimeException(error, e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/62d65fd7/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/instance/status/InstanceStatusEventMessageListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/instance/status/InstanceStatusEventMessageListener.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/instance/status/InstanceStatusEventMessageListener.java new file mode 100644 index 0000000..3523544 --- /dev/null +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/instance/status/InstanceStatusEventMessageListener.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.cloud.controller.receiver.instance.status; + +import javax.jms.JMSException; +import javax.jms.TextMessage; + +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.util.Constants; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttMessage; + +/** + * this is to handle the topology subscription + */ +public class InstanceStatusEventMessageListener implements MqttCallback { + public static final String ORG_APACHE_STRATOS_MESSAGING_EVENT = + "org.apache.stratos.messaging.event."; + private static final Log log = LogFactory + .getLog(InstanceStatusEventMessageListener.class); + + @Override + public void connectionLost(Throwable arg0) { + if (log.isDebugEnabled()) { + log.debug("Connection lost"); + } + + } + + @Override + public void deliveryComplete(IMqttDeliveryToken arg0) { + if (log.isDebugEnabled()) { + log.debug("Delivery completed"); + } + } + + @Override + public void messageArrived(String topic, MqttMessage message) + throws Exception { + if (message instanceof MqttMessage) { + + TextMessage receivedMessage = new ActiveMQTextMessage(); + + try { + if (log.isDebugEnabled()) { + log.debug(String.format( + "Instance notifier message received: %s", + receivedMessage.getText())); + } + receivedMessage.setText(new String(message.getPayload())); + receivedMessage.setStringProperty(Constants.EVENT_CLASS_NAME, + ORG_APACHE_STRATOS_MESSAGING_EVENT.concat(topic.replace( + "/", "."))); + + // Add received message to the queue + InstanceStatusEventMessageQueue.getInstance().add( + receivedMessage); + + } catch (JMSException e) { + log.error(e.getMessage(), e); + } + } + + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/62d65fd7/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/instance/status/InstanceStatusEventMessageQueue.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/instance/status/InstanceStatusEventMessageQueue.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/instance/status/InstanceStatusEventMessageQueue.java new file mode 100644 index 0000000..a978e62 --- /dev/null +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/receiver/instance/status/InstanceStatusEventMessageQueue.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.cloud.controller.receiver.instance.status; + +import javax.jms.TextMessage; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * Implements a blocking queue for managing instance status event messages. + */ +public class InstanceStatusEventMessageQueue extends LinkedBlockingQueue<TextMessage>{ + private static final long serialVersionUID = 828304342209475302L; + private static volatile InstanceStatusEventMessageQueue instance; + + private InstanceStatusEventMessageQueue(){ + } + + public static InstanceStatusEventMessageQueue getInstance() { + if (instance == null) { + synchronized (InstanceStatusEventMessageQueue.class){ + if (instance == null) { + instance = new InstanceStatusEventMessageQueue(); + } + } + } + return instance; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/62d65fd7/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageDelegator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageDelegator.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageDelegator.java deleted file mode 100644 index dd279ed..0000000 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageDelegator.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.cloud.controller.topic.instance.status; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.cloud.controller.topology.TopologyBuilder; -import org.apache.stratos.messaging.event.instance.status.InstanceActivatedEvent; -import org.apache.stratos.messaging.event.instance.status.InstanceMaintenanceModeEvent; -import org.apache.stratos.messaging.event.instance.status.InstanceReadyToShutdownEvent; -import org.apache.stratos.messaging.event.instance.status.InstanceStartedEvent; -import org.apache.stratos.messaging.util.Constants; -import org.apache.stratos.messaging.util.Util; - -import javax.jms.TextMessage; - -public class InstanceStatusEventMessageDelegator implements Runnable { - private static final Log log = LogFactory.getLog(InstanceStatusEventMessageDelegator.class); - - @Override - public void run() { - log.info("Instance status event message delegator started"); - - while (true) { - try { - TextMessage message = InstanceStatusEventMessageQueue.getInstance().take(); - - // retrieve the header - String type = message.getStringProperty(Constants.EVENT_CLASS_NAME); - log.info(String.format("Instance status event message received from queue: %s", type)); - - if (InstanceStartedEvent.class.getName().equals(type)) { - // retrieve the actual message - String json = message.getText(); - TopologyBuilder.handleMemberStarted((InstanceStartedEvent) Util. - jsonToObject(json, InstanceStartedEvent.class)); - - } else if (InstanceActivatedEvent.class.getName().equals(type)) { - // retrieve the actual message - String json = message.getText(); - TopologyBuilder.handleMemberActivated((InstanceActivatedEvent) Util. - jsonToObject(json, InstanceActivatedEvent.class)); - - } else if (InstanceReadyToShutdownEvent.class.getName().equals(type)) { - //retrieve the actual message - String json = message.getText(); - TopologyBuilder.handleMemberReadyToShutdown((InstanceReadyToShutdownEvent) Util. - jsonToObject(json, InstanceReadyToShutdownEvent.class)); - - } else if (InstanceMaintenanceModeEvent.class.getName().equals(type)) { - //retrieve the actual message - String json = message.getText(); - TopologyBuilder.handleMemberMaintenance((InstanceMaintenanceModeEvent) Util. - jsonToObject(json, InstanceMaintenanceModeEvent.class)); - - } else { - log.warn("Event message received is not InstanceStartedEvent or InstanceActivatedEvent"); - } - - } catch (Exception e) { - String error = "Failed to retrieve the instance status event message"; - log.error(error, e); - // Commenting throwing the error. Otherwise thread will not execute if an exception is thrown. - //throw new RuntimeException(error, e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/62d65fd7/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageListener.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageListener.java deleted file mode 100644 index 1f72e76..0000000 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageListener.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.cloud.controller.topic.instance.status; - -import javax.jms.JMSException; -import javax.jms.TextMessage; - -import org.apache.activemq.command.ActiveMQTextMessage; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.messaging.util.Constants; -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.MqttCallback; -import org.eclipse.paho.client.mqttv3.MqttMessage; - -/** - * this is to handle the topology subscription - */ -public class InstanceStatusEventMessageListener implements MqttCallback { - public static final String ORG_APACHE_STRATOS_MESSAGING_EVENT = - "org.apache.stratos.messaging.event."; - private static final Log log = LogFactory - .getLog(InstanceStatusEventMessageListener.class); - - @Override - public void connectionLost(Throwable arg0) { - if (log.isDebugEnabled()) { - log.debug("Connection lost"); - } - - } - - @Override - public void deliveryComplete(IMqttDeliveryToken arg0) { - if (log.isDebugEnabled()) { - log.debug("Delivery completed"); - } - } - - @Override - public void messageArrived(String topic, MqttMessage message) - throws Exception { - if (message instanceof MqttMessage) { - - TextMessage receivedMessage = new ActiveMQTextMessage(); - - try { - if (log.isDebugEnabled()) { - log.debug(String.format( - "Instance notifier message received: %s", - receivedMessage.getText())); - } - receivedMessage.setText(new String(message.getPayload())); - receivedMessage.setStringProperty(Constants.EVENT_CLASS_NAME, - ORG_APACHE_STRATOS_MESSAGING_EVENT.concat(topic.replace( - "/", "."))); - - // Add received message to the queue - InstanceStatusEventMessageQueue.getInstance().add( - receivedMessage); - - } catch (JMSException e) { - log.error(e.getMessage(), e); - } - } - - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/62d65fd7/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageQueue.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageQueue.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageQueue.java deleted file mode 100644 index 4e69c16..0000000 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topic/instance/status/InstanceStatusEventMessageQueue.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.stratos.cloud.controller.topic.instance.status; - -import javax.jms.TextMessage; -import java.util.concurrent.LinkedBlockingQueue; - -/** - * Implements a blocking queue for managing instance status event messages. - */ -public class InstanceStatusEventMessageQueue extends LinkedBlockingQueue<TextMessage>{ - private static final long serialVersionUID = 828304342209475302L; - private static volatile InstanceStatusEventMessageQueue instance; - - private InstanceStatusEventMessageQueue(){ - } - - public static InstanceStatusEventMessageQueue getInstance() { - if (instance == null) { - synchronized (InstanceStatusEventMessageQueue.class){ - if (instance == null) { - instance = new InstanceStatusEventMessageQueue(); - } - } - } - return instance; - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/62d65fd7/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java index 6015d87..b957b24 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java @@ -40,6 +40,7 @@ import org.apache.stratos.messaging.event.topology.*; import org.apache.stratos.metadata.client.defaults.DefaultMetaDataServiceClient; import org.apache.stratos.metadata.client.defaults.MetaDataServiceClient; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; @@ -177,6 +178,32 @@ public class TopologyBuilder { } + public static void handleApplicationClustersRemoved(String appId) { + TopologyManager.acquireWriteLock(); + + List<Cluster> removedClusters = new ArrayList<Cluster>(); + try { + Topology topology = TopologyManager.getTopology(); + for(Service service : topology.getServices()) { + for(Cluster cluster : service.getClusters()) { + if(cluster.getAppId().equals(appId)) { + removedClusters.add(service.removeCluster(cluster.getClusterId())); + } + } + } + log.info("Application Cluster " + appId + " are removed from the topology"); + + TopologyManager.updateTopology(topology); + + } finally { + TopologyManager.releaseWriteLock(); + } + + TopologyEventPublisher.sendApplicationClustersRemoved(appId, removedClusters); + + } + + public static void handleClusterReset(ClusterStatusClusterResetEvent event) { http://git-wip-us.apache.org/repos/asf/stratos/blob/62d65fd7/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java index df5cf82..76bdb5d 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java @@ -129,6 +129,15 @@ public class TopologyEventPublisher { publishEvent(new ApplicationClustersCreatedEvent(clusters, appId)); } + public static void sendApplicationClustersRemoved(String appId, List<Cluster> clusters) { + + if (log.isInfoEnabled()) { + log.info("Publishing Application Clusters removed event for Application: " + appId); + } + + //TODO publishEvent(new ApplicationClustersCreatedEvent(clusters, appId)); + } + // public static void sendApplicationRemovedEvent(String applicationId, Set<ClusterDataHolder> clusterData, // int tenantId, String tenantDomain) { //
