Repository: stratos Updated Branches: refs/heads/docker-grouping-merge 89fb37afb -> fb80e2ca8
fixes conflicts in messaging component Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/fb80e2ca Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/fb80e2ca Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/fb80e2ca Branch: refs/heads/docker-grouping-merge Commit: fb80e2ca8afc1514842b91eef229ff5724facd6c Parents: 89fb37a Author: R-Rajkumar <[email protected]> Authored: Mon Nov 3 13:40:00 2014 +0530 Committer: R-Rajkumar <[email protected]> Committed: Mon Nov 3 13:40:00 2014 +0530 ---------------------------------------------------------------------- .../ApplicationsEventMessageListener.java | 18 +++---- .../ClusterStatusEventMessageListener.java | 54 +++++++++++++------- .../stratos/messaging/util/Constants.java | 2 + 3 files changed, 46 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/fb80e2ca/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventMessageListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventMessageListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventMessageListener.java index e53e083..08fce23 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventMessageListener.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventMessageListener.java @@ -16,7 +16,10 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.stratos.messaging.message.receiver.application.status; +package org.apache.stratos.messaging.message.receiver.applications; + +import javax.jms.JMSException; +import javax.jms.TextMessage; import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.commons.logging.Log; @@ -27,17 +30,12 @@ import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import javax.jms.TextMessage; - -public class ApplicationStatusEventMessageListener implements MqttCallback { - private static final Log log = LogFactory.getLog(ApplicationStatusEventMessageListener.class); +public class ApplicationsEventMessageListener implements MqttCallback { + private static final Log log = LogFactory.getLog(ApplicationsEventMessageListener.class); - private ApplicationStatusEventMessageQueue messageQueue; + private ApplicationsEventMessageQueue messageQueue; - public ApplicationStatusEventMessageListener(ApplicationStatusEventMessageQueue messageQueue) { + public ApplicationsEventMessageListener(ApplicationsEventMessageQueue messageQueue) { this.messageQueue = messageQueue; } http://git-wip-us.apache.org/repos/asf/stratos/blob/fb80e2ca/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageListener.java index 12c7800..6c973f4 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageListener.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageListener.java @@ -18,15 +18,19 @@ */ package org.apache.stratos.messaging.message.receiver.cluster.status; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; import javax.jms.TextMessage; -public class ClusterStatusEventMessageListener implements MessageListener { +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.util.Constants; +import org.apache.stratos.messaging.util.Util; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttMessage; + +public class ClusterStatusEventMessageListener implements MqttCallback { private static final Log log = LogFactory.getLog(ClusterStatusEventMessageListener.class); private ClusterStatusEventMessageQueue messageQueue; @@ -34,21 +38,35 @@ public class ClusterStatusEventMessageListener implements MessageListener { public ClusterStatusEventMessageListener(ClusterStatusEventMessageQueue messageQueue) { this.messageQueue = messageQueue; } + + @Override + public void connectionLost(Throwable throwable) { + log.warn("Connection is lost", throwable); + } @Override - public void onMessage(Message message) { - if (message instanceof TextMessage) { - TextMessage receivedMessage = (TextMessage) message; - try { - if (log.isDebugEnabled()) { - log.debug(String.format("Tenant message received: %s", ((TextMessage) message).getText())); - } - // Add received message to the queue - messageQueue.add(receivedMessage); - - } catch (JMSException e) { - log.error(e.getMessage(), e); + public void deliveryComplete(IMqttDeliveryToken deliveryToken) { + if (log.isDebugEnabled()) { + log.debug(String.format("Message delivery is complete: %s", + ((deliveryToken != null) ? deliveryToken.toString() : ""))); + } + } + + @Override + public void messageArrived(String topicName, MqttMessage message) throws Exception { + TextMessage textMessage = new ActiveMQTextMessage(); + textMessage.setText(new String(message.getPayload())); + textMessage.setStringProperty(Constants.EVENT_CLASS_NAME, Util.getEventNameForTopic(topicName)); + + try { + if (log.isDebugEnabled()) { + log.debug(String.format("Tenant message received: %s", textMessage.getText())); } + // Add received message to the queue + messageQueue.add(textMessage); + + } catch (JMSException e) { + log.error(e.getMessage(), e); } } } http://git-wip-us.apache.org/repos/asf/stratos/blob/fb80e2ca/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java index f677d11..7fdd6a4 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java @@ -25,6 +25,8 @@ public class Constants { public static final String INSTANCE_STATUS_TOPIC = "instance/status/#"; public static final String INSTANCE_NOTIFIER_TOPIC = "instance/notifier/#"; public static final String APPLICATION_STATUS_TOPIC = "application-status"; + public static final String APPLICATIONS_TOPIC = "applications"; + public static final String CLUSTER_STATUS_TOPIC = "applications"; public static final String PING_TOPIC = "ping"; public static final String TENANT_TOPIC = "tenant/#"; public static final String TENANT_RANGE_ALL = "*";
