http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java index c0aee87..8e8d974 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java @@ -1,104 +1,105 @@ /* * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file + * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file + * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the + * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.stratos.messaging.broker.subscribe; -import javax.jms.*; +import javax.jms.JMSException; +import javax.jms.TopicSession; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.stratos.messaging.broker.connect.TopicConnector; +import org.apache.stratos.messaging.broker.connect.MQTTConnector; import org.apache.stratos.messaging.broker.heartbeat.TopicHealthChecker; +import org.apache.stratos.messaging.message.processor.MessageProcessorChain; +import org.apache.stratos.messaging.message.processor.instance.notifier.InstanceNotifierMessageProcessorChain; import org.apache.stratos.messaging.util.Util; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttClient; /** * Any instance who needs to subscribe to a topic, should communicate with this * object. - * + * * @author nirmal - * + * */ public class TopicSubscriber implements Runnable { private static final Log log = LogFactory.getLog(TopicSubscriber.class); - private boolean terminated = false; - private MessageListener messageListener; + private boolean terminated = false; + private MqttCallback messageListener; private TopicSession topicSession; - private String topicName; - private TopicConnector connector; - private TopicHealthChecker healthChecker; - private javax.jms.TopicSubscriber topicSubscriber = null; - private boolean subscribed; + private final String topicName; - /** + private TopicHealthChecker healthChecker; + private final javax.jms.TopicSubscriber topicSubscriber = null; + private boolean subscribed; + private final MessageProcessorChain processorChain; + + /** * @param aTopicName * topic name of this subscriber instance. */ public TopicSubscriber(String aTopicName) { topicName = aTopicName; - connector = new TopicConnector(); - if(log.isDebugEnabled()) { - log.debug(String.format("Topic subscriber connector created: [topic] %s", topicName)); - } + + if (log.isDebugEnabled()) { + log.debug(String.format("Topic subscriber connector created: [topic] %s", topicName)); + } + this.processorChain = new InstanceNotifierMessageProcessorChain(); } private void doSubscribe() throws Exception, JMSException { - // Initialize a topic connection - connector.init(topicName); - if(log.isDebugEnabled()) { - log.debug(String.format("Topic subscriber connector initialized: [topic] %s", topicName)); - } - // Create new session - topicSession = createSession(connector); - if(log.isDebugEnabled()) { - log.debug(String.format("Topic subscriber session created: [topic] %s", topicName)); - } - // Create a new subscriber - createSubscriber(topicSession); - if(log.isDebugEnabled()) { - log.debug(String.format("Topic subscriber created: [topic] %s", topicName)); - } - subscribed = true; - } - private void createSubscriber(TopicSession topicSession) throws JMSException { - Topic topic = connector.getTopic(); - if (topic == null) { - // if topic doesn't exist, create it. - topic = topicSession.createTopic(topicName); - } - topicSubscriber = topicSession.createSubscriber(topic); - topicSubscriber.setMessageListener(messageListener); - } - - private TopicSession createSession(TopicConnector topicConnector) throws Exception { - // Create a new session - return topicConnector.newSession(); - } + MqttClient mqttClient = MQTTConnector.getMQTTSubClient(Util.getRandomString(5)); + try { + + mqttClient.connect(); + if (log.isDebugEnabled()) { + log.debug("Subscribing to topic '" + topicName + "' from " + + mqttClient.getServerURI()); + } + // Subscribing to specific topic + + mqttClient.subscribe(topicName); + + // Continue waiting for messages until the Enter is pressed + mqttClient.setCallback(messageListener); + while (true) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + } + + } finally { + mqttClient.disconnect(); + } + } /** * @param messageListener * this MessageListener will get triggered each time this * subscription receives a message. */ - public void setMessageListener(MessageListener messageListener) { + public void setMessageListener(MqttCallback messageListener) { this.messageListener = messageListener; } @@ -111,70 +112,67 @@ public class TopicSubscriber implements Runnable { @Override public void run() { - // Keep the thread live until terminated + // Keep the thread live until terminated while (!terminated) { try { doSubscribe(); } catch (Exception e) { - subscribed = false; + subscribed = false; log.error("Error while subscribing to the topic: " + topicName, e); } finally { - if(subscribed) { - // start the health checker if subscribed - healthChecker = new TopicHealthChecker(topicName); - Thread healthCheckerThread = new Thread(healthChecker); - healthCheckerThread.start(); - try { - // waits till the thread finishes. - healthCheckerThread.join(); - } catch (InterruptedException ignore) { - } - } - else { - // subscription failed - if(log.isInfoEnabled()) { - log.info("Will try to subscribe again in "+Util.getFailoverPingInterval()/1000+" sec"); - } - try { - Thread.sleep(Util.getFailoverPingInterval()); - } catch (InterruptedException ignore) { - } - } - // closes all sessions/connections - try { - if (topicSubscriber != null) { - topicSubscriber.close(); - if(log.isDebugEnabled()) { - log.debug(String.format("Topic subscriber closed: [topic] %s", topicName)); - } - } - if (topicSession != null) { - topicSession.close(); - if(log.isDebugEnabled()) { - log.debug(String.format("Topic subscriber session closed: [topic] %s", topicName)); - } - } - if (connector != null) { - connector.close(); - if(log.isDebugEnabled()) { - log.debug(String.format("Topic subscriber connector closed: [topic] %s", topicName)); - } - } - } catch (JMSException ignore) { - } + if (subscribed) { + // start the health checker if subscribed + healthChecker = new TopicHealthChecker(topicName); + Thread healthCheckerThread = new Thread(healthChecker); + healthCheckerThread.start(); + try { + // waits till the thread finishes. + healthCheckerThread.join(); + } catch (InterruptedException ignore) { + } + } else { + // subscription failed + if (log.isInfoEnabled()) { + log.info("Will try to subscribe again in " + + Util.getFailoverPingInterval() / 1000 + " sec"); + } + try { + Thread.sleep(Util.getFailoverPingInterval()); + } catch (InterruptedException ignore) { + } + } + // closes all sessions/connections + try { + if (topicSubscriber != null) { + topicSubscriber.close(); + if (log.isDebugEnabled()) { + log.debug(String.format("Topic subscriber closed: [topic] %s", + topicName)); + } + } + if (topicSession != null) { + topicSession.close(); + if (log.isDebugEnabled()) { + log.debug(String.format("Topic subscriber session closed: [topic] %s", + topicName)); + } + } + + } catch (JMSException ignore) { + } } } } - /** - * Terminate topic subscriber. - */ - public void terminate() { - healthChecker.terminate(); - terminated = true; - } - - public boolean isSubscribed() { - return subscribed; - } + /** + * Terminate topic subscriber. + */ + public void terminate() { + healthChecker.terminate(); + terminated = true; + } + + public boolean isSubscribed() { + return subscribed; + } }
http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java index 5e818bc..bfaf622 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java @@ -1,60 +1,88 @@ /* * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file + * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file + * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the + * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.stratos.messaging.message.receiver.health.stat; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; import javax.jms.TextMessage; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.util.Constants; +import org.apache.stratos.messaging.util.Util; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttMessage; + /** - * Implements functionality for receiving text based event messages from the health stat + * Implements functionality for receiving text based event messages from the + * health stat * message broker topic and add them to the event queue. */ -public class HealthStatEventMessageListener implements MessageListener { - - private static final Log log = LogFactory.getLog(HealthStatEventMessageListener.class); - - private HealthStatEventMessageQueue messageQueue; - - public HealthStatEventMessageListener(HealthStatEventMessageQueue messageQueue) { - this.messageQueue = messageQueue; - } - - @Override - public void onMessage(Message message) { - if (message instanceof TextMessage) { - TextMessage receivedMessage = (TextMessage) message; - try { - if (log.isDebugEnabled()) { - log.debug(String.format("Tenant message received: %s", ((TextMessage) message).getText())); - } - // Add received message to the queue - messageQueue.add(receivedMessage); - - } catch (JMSException e) { - log.error(e.getMessage(), e); - } - } - } +public class HealthStatEventMessageListener implements MqttCallback { + + private static final Log log = LogFactory.getLog(HealthStatEventMessageListener.class); + + private final HealthStatEventMessageQueue messageQueue; + + public HealthStatEventMessageListener(HealthStatEventMessageQueue messageQueue) { + this.messageQueue = messageQueue; + } + + @Override + public void connectionLost(Throwable arg0) { + // TODO Auto-generated method stub + + } + + @Override + public void deliveryComplete(IMqttDeliveryToken arg0) { + // TODO Auto-generated method stub + + } + + @Override + public void messageArrived(String topicName, MqttMessage message) throws Exception { + if (message instanceof MqttMessage) { + + TextMessage receivedMessage = new ActiveMQTextMessage(); + if (log.isDebugEnabled()) { + log.debug(String.format("Health stat event messege received....")); + + } + receivedMessage.setText(new String(message.getPayload())); + receivedMessage.setStringProperty(Constants.EVENT_CLASS_NAME, + Util.getEventNameForTopic(topicName)); + + try { + if (log.isDebugEnabled()) { + log.debug(String.format("Health stat event message received: %s", + ((TextMessage) message).getText())); + } + // Add received message to the queue + messageQueue.add(receivedMessage); + + } catch (JMSException e) { + log.error(e.getMessage(), e); + } + } + + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java index 8b07180..2371c32 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java @@ -1,18 +1,18 @@ /* * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file + * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file + * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the + * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ @@ -29,59 +29,59 @@ import org.apache.stratos.messaging.util.Constants; * A thread for receiving health stat information from message broker */ public class HealthStatEventReceiver implements Runnable { - private static final Log log = LogFactory.getLog(HealthStatEventReceiver.class); + private static final Log log = LogFactory.getLog(HealthStatEventReceiver.class); - private HealthStatEventMessageDelegator messageDelegator; - private HealthStatEventMessageListener messageListener; - private TopicSubscriber topicSubscriber; - private boolean terminated; + private final HealthStatEventMessageDelegator messageDelegator; + private final HealthStatEventMessageListener messageListener; + private TopicSubscriber topicSubscriber; + private boolean terminated; - public HealthStatEventReceiver() { - HealthStatEventMessageQueue messageQueue = new HealthStatEventMessageQueue(); - this.messageDelegator = new HealthStatEventMessageDelegator(messageQueue); - this.messageListener = new HealthStatEventMessageListener(messageQueue); - } + public HealthStatEventReceiver() { + HealthStatEventMessageQueue messageQueue = new HealthStatEventMessageQueue(); + this.messageDelegator = new HealthStatEventMessageDelegator(messageQueue); + this.messageListener = new HealthStatEventMessageListener(messageQueue); + } - public void addEventListener(EventListener eventListener) { - messageDelegator.addEventListener(eventListener); - } + public void addEventListener(EventListener eventListener) { + messageDelegator.addEventListener(eventListener); + } - @Override - public void run() { - try { - // Start topic subscriber thread - topicSubscriber = new TopicSubscriber(Constants.HEALTH_STAT_TOPIC); - topicSubscriber.setMessageListener(messageListener); - Thread subscriberThread = new Thread(topicSubscriber); - subscriberThread.start(); - if (log.isDebugEnabled()) { - log.debug("Health stats event message receiver thread started"); - } + @Override + public void run() { + try { + // Start topic subscriber thread + topicSubscriber = new TopicSubscriber(Constants.HEALTH_STAT_TOPIC); + topicSubscriber.setMessageListener(messageListener); + Thread subscriberThread = new Thread(topicSubscriber); + subscriberThread.start(); + if (log.isDebugEnabled()) { + log.debug("Health stats event message receiver thread started"); + } - // Start health stat event message delegator thread - Thread receiverThread = new Thread(messageDelegator); - receiverThread.start(); - if (log.isDebugEnabled()) { - log.debug("Health stats event message delegator thread started"); - } + // Start health stat event message delegator thread + Thread receiverThread = new Thread(messageDelegator); + receiverThread.start(); + if (log.isDebugEnabled()) { + log.debug("Health stats event message delegator thread started"); + } - // Keep the thread live until terminated - while (!terminated) { - try { - Thread.sleep(1000); - } catch (InterruptedException ignore) { - } - } - } catch (Exception e) { - if (log.isErrorEnabled()) { - log.error("Topology receiver failed", e); - } - } - } + // Keep the thread live until terminated + while (!terminated) { + try { + Thread.sleep(1000); + } catch (InterruptedException ignore) { + } + } + } catch (Exception e) { + if (log.isErrorEnabled()) { + log.error("Topology receiver failed", e); + } + } + } - public void terminate() { - topicSubscriber.terminate(); - messageDelegator.terminate(); - terminated = true; - } + public void terminate() { + topicSubscriber.terminate(); + messageDelegator.terminate(); + terminated = true; + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java index d8cc6b5..c7d1e98 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java @@ -1,60 +1,90 @@ /* * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file + * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file + * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the + * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.stratos.messaging.message.receiver.instance.notifier; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; import javax.jms.TextMessage; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.util.Constants; +import org.apache.stratos.messaging.util.Util; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttMessage; + /** - * Implements functionality for receiving text based event messages from the instance notifier + * Implements functionality for receiving text based event messages from the + * instance notifier * message broker topic and add them to the event queue. */ -class InstanceNotifierEventMessageListener implements MessageListener { - - private static final Log log = LogFactory.getLog(InstanceNotifierEventMessageListener.class); - - private InstanceNotifierEventMessageQueue messageQueue; - - public InstanceNotifierEventMessageListener(InstanceNotifierEventMessageQueue messageQueue) { - this.messageQueue = messageQueue; - } - - @Override - public void onMessage(Message message) { - if (message instanceof TextMessage) { - TextMessage receivedMessage = (TextMessage) message; - try { - if (log.isDebugEnabled()) { - log.debug(String.format("Instance notifier message received: %s", ((TextMessage) message).getText())); - } - // Add received message to the queue - messageQueue.add(receivedMessage); - - } catch (JMSException e) { - log.error(e.getMessage(), e); - } - } - } +class InstanceNotifierEventMessageListener implements MqttCallback { + + private static final Log log = LogFactory.getLog(InstanceNotifierEventMessageListener.class); + + private final InstanceNotifierEventMessageQueue messageQueue; + + public InstanceNotifierEventMessageListener(InstanceNotifierEventMessageQueue messageQueue) { + this.messageQueue = messageQueue; + } + + @Override + public void connectionLost(Throwable arg0) { + if (log.isDebugEnabled()) { + log.debug("MQTT connection lost"); + } + + } + + @Override + public void deliveryComplete(IMqttDeliveryToken arg0) { + + } + + @Override + public void messageArrived(String topicName, MqttMessage message) throws Exception { + if (message instanceof MqttMessage) { + + TextMessage receivedMessage = new ActiveMQTextMessage(); + if (log.isDebugEnabled()) { + log.debug(String.format("instance notifier messege received....")); + + } + + receivedMessage.setText(new String(message.getPayload())); + receivedMessage.setStringProperty(Constants.EVENT_CLASS_NAME, + Util.getEventNameForTopic(topicName)); + + try { + if (log.isDebugEnabled()) { + log.debug(String.format("Instance notifier message received: %s", + ((TextMessage) message).getText())); + } + // Add received message to the queue + messageQueue.add(receivedMessage); + + } catch (JMSException e) { + log.error(e.getMessage(), e); + } + } + + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java index 57fea76..34c73a9 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java @@ -1,18 +1,18 @@ /* * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file + * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file + * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the + * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ @@ -29,62 +29,62 @@ import org.apache.stratos.messaging.util.Constants; * A thread for receiving instance notifier information from message broker. */ public class InstanceNotifierEventReceiver implements Runnable { - private static final Log log = LogFactory.getLog(InstanceNotifierEventReceiver.class); - private InstanceNotifierEventMessageDelegator messageDelegator; - private InstanceNotifierEventMessageListener messageListener; - private TopicSubscriber topicSubscriber; - private boolean terminated; + private static final Log log = LogFactory.getLog(InstanceNotifierEventReceiver.class); + private final InstanceNotifierEventMessageDelegator messageDelegator; + private final InstanceNotifierEventMessageListener messageListener; + private TopicSubscriber topicSubscriber; + private boolean terminated; - public InstanceNotifierEventReceiver() { - InstanceNotifierEventMessageQueue messageQueue = new InstanceNotifierEventMessageQueue(); - this.messageDelegator = new InstanceNotifierEventMessageDelegator(messageQueue); - this.messageListener = new InstanceNotifierEventMessageListener(messageQueue); - } + public InstanceNotifierEventReceiver() { + InstanceNotifierEventMessageQueue messageQueue = new InstanceNotifierEventMessageQueue(); + this.messageDelegator = new InstanceNotifierEventMessageDelegator(messageQueue); + this.messageListener = new InstanceNotifierEventMessageListener(messageQueue); + } - public void addEventListener(EventListener eventListener) { - messageDelegator.addEventListener(eventListener); - } + public void addEventListener(EventListener eventListener) { + messageDelegator.addEventListener(eventListener); + } - @Override - public void run() { - try { - // Start topic subscriber thread - topicSubscriber = new TopicSubscriber(Constants.INSTANCE_NOTIFIER_TOPIC); - topicSubscriber.setMessageListener(messageListener); - Thread subscriberThread = new Thread(topicSubscriber); - subscriberThread.start(); - if (log.isDebugEnabled()) { - log.debug("InstanceNotifier event message receiver thread started"); - } + @Override + public void run() { + try { + // Start topic subscriber thread + topicSubscriber = new TopicSubscriber(Constants.INSTANCE_NOTIFIER_TOPIC); + topicSubscriber.setMessageListener(messageListener); + Thread subscriberThread = new Thread(topicSubscriber); + subscriberThread.start(); + if (log.isDebugEnabled()) { + log.debug("InstanceNotifier event message receiver thread started"); + } - // Start instance notifier event message delegator thread - Thread receiverThread = new Thread(messageDelegator); - receiverThread.start(); - if (log.isDebugEnabled()) { - log.debug("InstanceNotifier event message delegator thread started"); - } + // Start instance notifier event message delegator thread + Thread receiverThread = new Thread(messageDelegator); + receiverThread.start(); + if (log.isDebugEnabled()) { + log.debug("InstanceNotifier event message delegator thread started"); + } - // Keep the thread live until terminated - while (!terminated) { - try { - Thread.sleep(1000); - } catch (InterruptedException ignore) { - } - } - } catch (Exception e) { - if (log.isErrorEnabled()) { - log.error("InstanceNotifier receiver failed", e); - } - } - } + // Keep the thread live until terminated + while (!terminated) { + try { + Thread.sleep(1000); + } catch (InterruptedException ignore) { + } + } + } catch (Exception e) { + if (log.isErrorEnabled()) { + log.error("InstanceNotifier receiver failed", e); + } + } + } - public boolean isSubscribed() { - return ((topicSubscriber != null) && (topicSubscriber.isSubscribed())); - } + public boolean isSubscribed() { + return ((topicSubscriber != null) && (topicSubscriber.isSubscribed())); + } - public void terminate() { - topicSubscriber.terminate(); - messageDelegator.terminate(); - terminated = true; - } + public void terminate() { + topicSubscriber.terminate(); + messageDelegator.terminate(); + terminated = true; + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageListener.java index cafdf74..ad43d15 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageListener.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageListener.java @@ -1,60 +1,84 @@ /* * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file + * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file + * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the + * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.stratos.messaging.message.receiver.tenant; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; import javax.jms.TextMessage; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.util.Constants; +import org.apache.stratos.messaging.util.Util; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttMessage; + /** - * Implements functionality for receiving text based event messages from the tenant + * Implements functionality for receiving text based event messages from the + * tenant * message broker topic and add them to the event queue. */ -class TenantEventMessageListener implements MessageListener { - - private static final Log log = LogFactory.getLog(TenantEventMessageListener.class); - - private TenantEventMessageQueue messageQueue; - - public TenantEventMessageListener(TenantEventMessageQueue messageQueue) { - this.messageQueue = messageQueue; - } - - @Override - public void onMessage(Message message) { - if (message instanceof TextMessage) { - TextMessage receivedMessage = (TextMessage) message; - try { - if (log.isDebugEnabled()) { - log.debug(String.format("Tenant message received: %s", ((TextMessage) message).getText())); - } - // Add received message to the queue - messageQueue.add(receivedMessage); - - } catch (JMSException e) { - log.error(e.getMessage(), e); - } - } - } +class TenantEventMessageListener implements MqttCallback { + + private static final Log log = LogFactory.getLog(TenantEventMessageListener.class); + + private final TenantEventMessageQueue messageQueue; + + public TenantEventMessageListener(TenantEventMessageQueue messageQueue) { + this.messageQueue = messageQueue; + } + + @Override + public void connectionLost(Throwable arg0) { + // TODO Auto-generated method stub + + } + + @Override + public void deliveryComplete(IMqttDeliveryToken arg0) { + // TODO Auto-generated method stub + + } + + @Override + public void messageArrived(String topicName, MqttMessage message) throws Exception { + if (message instanceof MqttMessage) { + + TextMessage receivedMessage = new ActiveMQTextMessage(); + + receivedMessage.setText(new String(message.getPayload())); + receivedMessage.setStringProperty(Constants.EVENT_CLASS_NAME, + Util.getEventNameForTopic(topicName)); + + try { + if (log.isDebugEnabled()) { + log.debug(String.format("Tanent message received: %s", + ((TextMessage) message).getText())); + } + // Add received message to the queue + messageQueue.add(receivedMessage); + + } catch (JMSException e) { + log.error(e.getMessage(), e); + } + } + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageListener.java index 799b1b1..54774ea 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageListener.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageListener.java @@ -1,58 +1,83 @@ /* * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file + * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file + * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the + * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.stratos.messaging.message.receiver.topology; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; import javax.jms.TextMessage; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.util.Constants; +import org.apache.stratos.messaging.util.Util; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttMessage; + /** - * Implements functionality for receiving text based event messages from the topology + * Implements functionality for receiving text based event messages from the + * topology * message broker topic and add them to the event queue. */ -class TopologyEventMessageListener implements MessageListener { - private static final Log log = LogFactory.getLog(TopologyEventMessageListener.class); - - private TopologyEventMessageQueue messageQueue; - - public TopologyEventMessageListener(TopologyEventMessageQueue messageQueue) { - this.messageQueue = messageQueue; - } - - @Override - public void onMessage(Message message) { - if (message instanceof TextMessage) { - TextMessage receivedMessage = (TextMessage) message; - try { - if (log.isDebugEnabled()) { - log.debug(String.format("Topology message received: %s", ((TextMessage) message).getText())); - } - // Add received message to the queue - messageQueue.add(receivedMessage); - - } catch (JMSException e) { - log.error(e.getMessage(), e); - } - } - } +class TopologyEventMessageListener implements MqttCallback { + private static final Log log = LogFactory.getLog(TopologyEventMessageListener.class); + + private final TopologyEventMessageQueue messageQueue; + + public TopologyEventMessageListener(TopologyEventMessageQueue messageQueue) { + this.messageQueue = messageQueue; + } + + @Override + public void connectionLost(Throwable arg0) { + // TODO Auto-generated method stub + + } + + @Override + public void deliveryComplete(IMqttDeliveryToken arg0) { + // TODO Auto-generated method stub + + } + + @Override + public void messageArrived(String topicName, MqttMessage message) throws Exception { + if (message instanceof MqttMessage) { + + TextMessage receivedMessage = new ActiveMQTextMessage(); + + receivedMessage.setText(new String(message.getPayload())); + receivedMessage.setStringProperty(Constants.EVENT_CLASS_NAME, + Util.getEventNameForTopic(topicName)); + + try { + if (log.isDebugEnabled()) { + log.debug(String.format("topology message received: %s", + ((TextMessage) message).getText())); + } + // Add received message to the queue + messageQueue.add(receivedMessage); + + } catch (JMSException e) { + log.error(e.getMessage(), e); + } + } + } + } http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java index 5991e25..a615eb4 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java @@ -1,18 +1,18 @@ /* * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file + * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file + * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the + * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ @@ -20,59 +20,60 @@ package org.apache.stratos.messaging.util; public class Constants { /* Message broker topic names */ - public static final String TOPOLOGY_TOPIC = "topology"; - public static final String HEALTH_STAT_TOPIC = "summarized-health-stats"; - public static final String INSTANCE_STATUS_TOPIC = "instance-status"; - public static final String INSTANCE_NOTIFIER_TOPIC = "instance-notifier"; - public static final String PING_TOPIC = "ping"; - public static final String TENANT_TOPIC = "tenant"; - public static final String TENANT_RANGE_ALL = "*"; + public static final String TOPOLOGY_TOPIC = "topology/#"; + public static final String HEALTH_STAT_TOPIC = "health/#"; + public static final String INSTANCE_STATUS_TOPIC = "instance/#"; + public static final String INSTANCE_NOTIFIER_TOPIC = "instance/#"; + public static final String PING_TOPIC = "ping"; + public static final String TENANT_TOPIC = "tenant/#"; + public static final String TENANT_RANGE_ALL = "*"; - public static final String TENANT_RANGE_DELIMITER = "-"; - public static final String EVENT_CLASS_NAME = "event-class-name"; + public static final String TENANT_RANGE_DELIMITER = "-"; + public static final String EVENT_CLASS_NAME = "event-class-name"; - /* Topology filter constants */ - public static final String FILTER_VALUE_ASSIGN_OPERATOR="="; - public static final String FILTER_KEY_VALUE_PAIR_SEPARATOR = "|"; - public static final String FILTER_VALUE_SEPARATOR = ","; + /* Topology filter constants */ + public static final String FILTER_VALUE_ASSIGN_OPERATOR = "="; + public static final String FILTER_KEY_VALUE_PAIR_SEPARATOR = "|"; + public static final String FILTER_VALUE_SEPARATOR = ","; - public static final String TOPOLOGY_SERVICE_FILTER = "stratos.topology.service.filter"; - public static final String TOPOLOGY_SERVICE_FILTER_SERVICE_NAME = "service-name"; + public static final String TOPOLOGY_SERVICE_FILTER = "stratos.topology.service.filter"; + public static final String TOPOLOGY_SERVICE_FILTER_SERVICE_NAME = "service-name"; - public static final String TOPOLOGY_CLUSTER_FILTER = "stratos.topology.cluster.filter"; - public static final String TOPOLOGY_CLUSTER_FILTER_CLUSTER_ID = "cluster-id"; + public static final String TOPOLOGY_CLUSTER_FILTER = "stratos.topology.cluster.filter"; + public static final String TOPOLOGY_CLUSTER_FILTER_CLUSTER_ID = "cluster-id"; - public static final String TOPOLOGY_MEMBER_FILTER = "stratos.topology.member.filter"; - public static final String TOPOLOGY_MEMBER_FILTER_LB_CLUSTER_ID = "lb-cluster-id"; + public static final String TOPOLOGY_MEMBER_FILTER = "stratos.topology.member.filter"; + public static final String TOPOLOGY_MEMBER_FILTER_LB_CLUSTER_ID = "lb-cluster-id"; - public static final String REQUEST_BASE_CONTEXT = "org.wso2.carbon.context.RequestBaseContext"; - - // to identify a lb cluster - public static final String IS_LOAD_BALANCER = "load.balancer"; - public static final String LOAD_BALANCER_REF = "load.balancer.ref"; - public static final String SERVICE_AWARE_LOAD_BALANCER = "service.aware.load.balancer"; - public static final String DEFAULT_LOAD_BALANCER = "default.load.balancer"; - public static final String NO_LOAD_BALANCER = "no.load.balancer"; - public static final String EXISTING_LOAD_BALANCERS = "existing.load.balancers"; - public static final String LOAD_BALANCED_SERVICE_TYPE = "load.balanced.service.type"; + public static final String REQUEST_BASE_CONTEXT = "org.wso2.carbon.context.RequestBaseContext"; - // volume - public static final String IS_VOLUME_REQUIRED = "volume.required"; - public static final String SHOULD_DELETE_VOLUME = "volume.delete.on.unsubscription"; - public static final String VOLUME_SIZE = "volume.size.gb"; - public static final String VOLUME_ID = "volume.id"; - public static final String DEVICE_NAME = "volume.device.name"; - public static final String GRACEFUL_SHUTDOWN_TIMEOUT = "graceful.shutdown.timeout"; + // to identify a lb cluster + public static final String IS_LOAD_BALANCER = "load.balancer"; + public static final String LOAD_BALANCER_REF = "load.balancer.ref"; + public static final String SERVICE_AWARE_LOAD_BALANCER = "service.aware.load.balancer"; + public static final String DEFAULT_LOAD_BALANCER = "default.load.balancer"; + public static final String NO_LOAD_BALANCER = "no.load.balancer"; + public static final String EXISTING_LOAD_BALANCERS = "existing.load.balancers"; + public static final String LOAD_BALANCED_SERVICE_TYPE = "load.balanced.service.type"; - public static final String IS_PRIMARY = "PRIMARY"; + // volume + public static final String IS_VOLUME_REQUIRED = "volume.required"; + public static final String SHOULD_DELETE_VOLUME = "volume.delete.on.unsubscription"; + public static final String VOLUME_SIZE = "volume.size.gb"; + public static final String VOLUME_ID = "volume.id"; + public static final String DEVICE_NAME = "volume.device.name"; + public static final String GRACEFUL_SHUTDOWN_TIMEOUT = "graceful.shutdown.timeout"; - //System Properties - public static final String AVERAGE_PING_INTERVAL_PROPERTY = "stratos.messaging.averagePingInterval"; - public static final String FAILOVER_PING_INTERVAL_PROPERTY = "stratos.messaging.failoverPingInterval"; + public static final String IS_PRIMARY = "PRIMARY"; - //Default values - public static final int DEFAULT_AVERAGE_PING_INTERVAL = 1000; - public static final int DEFAULT_FAILOVER_PING_INTERVAL = 30000; + // System Properties + public static final String AVERAGE_PING_INTERVAL_PROPERTY = + "stratos.messaging.averagePingInterval"; + public static final String FAILOVER_PING_INTERVAL_PROPERTY = + "stratos.messaging.failoverPingInterval"; + // Default values + public static final int DEFAULT_AVERAGE_PING_INTERVAL = 1000; + public static final int DEFAULT_FAILOVER_PING_INTERVAL = 30000; } http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Properties.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Properties.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Properties.java index f32b91c..3f2c5cf 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Properties.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Properties.java @@ -1,37 +1,38 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file + * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 * - * http://www.apache.org/licenses/LICENSE-2.0 - * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.stratos.messaging.util; /** - * Had to wrap {@link Property} array using a class, since there's a bug in current + * Had to wrap {@link Property} array using a class, since there's a bug in + * current * stub generation. */ public class Properties { - private Property[] properties; + private Property[] properties; + + public Property[] getProperties() { + return properties; + } - public Property[] getProperties() { - return properties; - } + public void setProperties(Property[] properties) { + this.properties = properties; + } - public void setProperties(Property[] properties) { - this.properties = properties; - } - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Util.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Util.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Util.java index 95b5a07..a6db9ee 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Util.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Util.java @@ -1,32 +1,34 @@ /* * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file + * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file + * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the + * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.stratos.messaging.util; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.messaging.message.JsonMessage; - import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.util.Properties; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.event.Event; +import org.apache.stratos.messaging.message.JsonMessage; public class Util { private static final Log log = LogFactory.getLog(Util.class); @@ -44,9 +46,9 @@ public class Util { log.error("Failed to load properties from file: " + filePath, e); } finally { try { - if(is != null) { - is.close(); - } + if (is != null) { + is.close(); + } } catch (IOException ignore) { } } @@ -54,96 +56,120 @@ public class Util { return props; } - /** - * Validate tenant range. - * Valid formats: Integer-Integer, Integer-* - * Examples: 1-100, 101-200, 201-* - * @param tenantRange - */ - public static void validateTenantRange(String tenantRange) { - boolean valid = false; - if(tenantRange != null) { - if(tenantRange.equals("*")) { - valid = true; - } else { - String[] array = tenantRange.split(Constants.TENANT_RANGE_DELIMITER); - if(array.length == 2) { - // Integer-Integer - if(isNumber(array[0]) && (isNumber(array[1]))){ - valid = true; - } - // Integer-* - else if(isNumber(array[0]) && "*".equals(array[1])) { - valid = true; - } - } - } - - } - if(!valid) - throw new RuntimeException(String.format("Tenant range %s is not valid", tenantRange)); - } - - public static boolean isNumber(String s) { - try { - Integer.parseInt(s); - return true; - } - catch (NumberFormatException e) { - // Not a valid number - } - return false; - } - - /** - * Transform json into an object of given type. - * @param json - * @param type - * @return - */ - public static Object jsonToObject(String json, Class type) { - return (new JsonMessage(json, type)).getObject(); - } - - - // Time interval between each ping message sent to topic. - private static int averagePingInterval; - - // Time interval between each ping message after an error had occurred. - private static int failoverPingInterval; - - /** - * fetch value from system param - * @return - */ - public static int getAveragePingInterval() { - if (averagePingInterval <= 0) { - averagePingInterval = Util.getNumericSystemProperty(Constants.DEFAULT_AVERAGE_PING_INTERVAL,Constants.AVERAGE_PING_INTERVAL_PROPERTY); - } - return averagePingInterval; - } - - /** - * fetch value from system param - * @return - */ - public static int getFailoverPingInterval() { - if (failoverPingInterval <= 0) { - failoverPingInterval = Util.getNumericSystemProperty(Constants.DEFAULT_FAILOVER_PING_INTERVAL,Constants.FAILOVER_PING_INTERVAL_PROPERTY); - } - return failoverPingInterval; - } - - /** - * Method to safely access numeric system properties - * @param defaultValue - * @return - */ - public static Integer getNumericSystemProperty(Integer defaultValue, String propertyKey) { - try { - return Integer.valueOf(System.getProperty(propertyKey)); - } catch (NumberFormatException ex) { - return defaultValue; - } - } + /** + * Validate tenant range. + * Valid formats: Integer-Integer, Integer-* + * Examples: 1-100, 101-200, 201-* + * + * @param tenantRange + */ + public static void validateTenantRange(String tenantRange) { + boolean valid = false; + if (tenantRange != null) { + if (tenantRange.equals("*")) { + valid = true; + } else { + String[] array = tenantRange.split(Constants.TENANT_RANGE_DELIMITER); + if (array.length == 2) { + // Integer-Integer + if (isNumber(array[0]) && (isNumber(array[1]))) { + valid = true; + } + // Integer-* + else if (isNumber(array[0]) && "*".equals(array[1])) { + valid = true; + } + } + } + + } + if (!valid) + throw new RuntimeException(String.format("Tenant range %s is not valid", tenantRange)); + } + + public static boolean isNumber(String s) { + try { + Integer.parseInt(s); + return true; + } catch (NumberFormatException e) { + // Not a valid number + } + return false; + } + + /** + * Transform json into an object of given type. + * + * @param json + * @param type + * @return + */ + public static Object jsonToObject(String json, Class type) { + return (new JsonMessage(json, type)).getObject(); + } + + // Time interval between each ping message sent to topic. + private static int averagePingInterval; + + // Time interval between each ping message after an error had occurred. + private static int failoverPingInterval; + + /** + * fetch value from system param + * + * @return + */ + public static int getAveragePingInterval() { + if (averagePingInterval <= 0) { + averagePingInterval = + Util.getNumericSystemProperty(Constants.DEFAULT_AVERAGE_PING_INTERVAL, + Constants.AVERAGE_PING_INTERVAL_PROPERTY); + } + return averagePingInterval; + } + + /** + * fetch value from system param + * + * @return + */ + public static int getFailoverPingInterval() { + if (failoverPingInterval <= 0) { + failoverPingInterval = + Util.getNumericSystemProperty(Constants.DEFAULT_FAILOVER_PING_INTERVAL, + Constants.FAILOVER_PING_INTERVAL_PROPERTY); + } + return failoverPingInterval; + } + + /** + * Method to safely access numeric system properties + * + * @param defaultValue + * @return + */ + public static Integer getNumericSystemProperty(Integer defaultValue, String propertyKey) { + try { + return Integer.valueOf(System.getProperty(propertyKey)); + } catch (NumberFormatException ex) { + return defaultValue; + } + } + + public static String getMessageTopicName(Event event) { + return event.getClass().getName().substring(35).replace(".", "/"); + } + + public static String getEventNameForTopic(String arg0) { + return "org.apache.stratos.messaging.event.".concat(arg0.replace("/", ".")); + } + + public static String getRandomString(int len) { + String AB = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + Random rnd = new Random(); + StringBuilder sb = new StringBuilder(len); + for (int i = 0; i < len; i++) + sb.append(AB.charAt(rnd.nextInt(AB.length()))); + return sb.toString(); + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java index 80174f4..0205566 100644 --- a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java +++ b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java @@ -1,23 +1,30 @@ /* * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file + * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file + * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the + * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.stratos.cep.extension; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + import org.apache.log4j.Logger; import org.apache.stratos.messaging.broker.publish.EventPublisher; import org.apache.stratos.messaging.broker.publish.EventPublisherPool; @@ -28,7 +35,6 @@ import org.apache.stratos.messaging.domain.topology.Service; import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent; import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver; import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; -import org.apache.stratos.messaging.util.Constants; import org.wso2.siddhi.core.config.SiddhiContext; import org.wso2.siddhi.core.event.StreamEvent; import org.wso2.siddhi.core.event.in.InEvent; @@ -47,210 +53,224 @@ import org.wso2.siddhi.query.api.expression.constant.IntConstant; import org.wso2.siddhi.query.api.expression.constant.LongConstant; import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - @SiddhiExtension(namespace = "stratos", function = "faultHandling") -public class FaultHandlingWindowProcessor extends WindowProcessor implements RunnableWindowProcessor { +public class FaultHandlingWindowProcessor extends WindowProcessor implements + RunnableWindowProcessor { + + private static final String HEALTH_STAT_MEMBER_FAULT_EVENT = "health/stat/MemberFaultEvent"; + private static final int TIME_OUT = 60 * 1000; + static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class); + private ScheduledExecutorService eventRemoverScheduler; + private int subjectedAttrIndex; + private ThreadBarrier threadBarrier; + private long timeToKeep; + private ISchedulerSiddhiQueue<StreamEvent> window; + private final ConcurrentHashMap<String, Long> memberTimeStampMap = + new ConcurrentHashMap<String, Long>(); + private final ConcurrentHashMap<String, Member> memberIdMap = + new ConcurrentHashMap<String, Member>(); - private static final int TIME_OUT = 60 * 1000; - static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class); - private ScheduledExecutorService eventRemoverScheduler; - private int subjectedAttrIndex; - private ThreadBarrier threadBarrier; - private long timeToKeep; - private ISchedulerSiddhiQueue<StreamEvent> window; - private ConcurrentHashMap<String, Long> memberTimeStampMap = new ConcurrentHashMap<String, Long>(); - private ConcurrentHashMap<String, Member> memberIdMap = new ConcurrentHashMap<String, Member>(); - EventPublisher healthStatPublisher = EventPublisherPool.getPublisher(Constants.HEALTH_STAT_TOPIC); - Map<String, Object> MemberFaultEventMap = new HashMap<String, Object>(); - Map<String, Object> memberFaultEventMessageMap = new HashMap<String, Object>(); - private TopologyEventReceiver topologyEventReceiver; - private String memberID; + EventPublisher healthStatPublisher = null; + Map<String, Object> MemberFaultEventMap = new HashMap<String, Object>(); + Map<String, Object> memberFaultEventMessageMap = new HashMap<String, Object>(); + private TopologyEventReceiver topologyEventReceiver; + private String memberID; - @Override - protected void processEvent(InEvent event) { - addDataToMap(event); - } + @Override + protected void processEvent(InEvent event) { + addDataToMap(event); + } - @Override - protected void processEvent(InListEvent listEvent) { - System.out.println(listEvent); - for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) { - addDataToMap((InEvent) listEvent.getEvent(i)); - } - } + @Override + protected void processEvent(InListEvent listEvent) { + System.out.println(listEvent); + for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) { + addDataToMap((InEvent) listEvent.getEvent(i)); + } + } - protected void addDataToMap(InEvent event) { - if (memberID != null) { - String id = (String)event.getData()[subjectedAttrIndex]; - memberTimeStampMap.put(id, event.getTimeStamp()); - log.debug("Event received from [member-id] " + id); - } - else { - log.error("NULL member ID in the event received"); - } - } + protected void addDataToMap(InEvent event) { + if (memberID != null) { + String id = (String) event.getData()[subjectedAttrIndex]; + memberTimeStampMap.put(id, event.getTimeStamp()); + log.debug("Event received from [member-id] " + id); + } else { + log.error("NULL member ID in the event received"); + } + } - @Override - public Iterator<StreamEvent> iterator() { - return window.iterator(); - } + @Override + public Iterator<StreamEvent> iterator() { + return window.iterator(); + } - @Override - public Iterator<StreamEvent> iterator(String predicate) { - if (siddhiContext.isDistributedProcessingEnabled()) { - return ((SchedulerSiddhiQueueGrid<StreamEvent>) window).iterator(predicate); - } else { - return window.iterator(); - } - } + @Override + public Iterator<StreamEvent> iterator(String predicate) { + if (siddhiContext.isDistributedProcessingEnabled()) { + return ((SchedulerSiddhiQueueGrid<StreamEvent>) window).iterator(predicate); + } else { + return window.iterator(); + } + } - /* - * Retrieve the current activated member list from the topology and put them into the - * memberTimeStampMap if not already exists. This will allow the system to recover - * from any inconsistent state caused by MB/CEP failures. - */ - private void loadFromTopology(){ - if (TopologyManager.getTopology().isInitialized()){ - TopologyManager.acquireReadLock(); - memberIdMap.clear(); - long currentTimeStamp = System.currentTimeMillis(); - Iterator<Service> servicesItr = TopologyManager.getTopology().getServices().iterator(); - while(servicesItr.hasNext()){ - Service service = servicesItr.next(); - Iterator<Cluster> clusterItr = service.getClusters().iterator(); - while(clusterItr.hasNext()){ - Cluster cluster = clusterItr.next(); - Iterator<Member> memberItr = cluster.getMembers().iterator(); - while(memberItr.hasNext()){ - Member member = memberItr.next(); - if (member.getStatus().equals(MemberStatus.Activated)){ - memberTimeStampMap.putIfAbsent(member.getMemberId(), currentTimeStamp); - memberIdMap.put(member.getMemberId(), member); - } - } - } - } - TopologyManager.releaseReadLock(); - } - if (log.isDebugEnabled()){ - log.debug("Member TimeStamp Map: " + memberTimeStampMap); - log.debug("Member ID Map: " + memberIdMap); - } - } + /* + * Retrieve the current activated member list from the topology and put them + * into the + * memberTimeStampMap if not already exists. This will allow the system to + * recover + * from any inconsistent state caused by MB/CEP failures. + */ + private void loadFromTopology() { + if (TopologyManager.getTopology().isInitialized()) { + TopologyManager.acquireReadLock(); + memberIdMap.clear(); + long currentTimeStamp = System.currentTimeMillis(); + Iterator<Service> servicesItr = TopologyManager.getTopology().getServices().iterator(); + while (servicesItr.hasNext()) { + Service service = servicesItr.next(); + Iterator<Cluster> clusterItr = service.getClusters().iterator(); + while (clusterItr.hasNext()) { + Cluster cluster = clusterItr.next(); + Iterator<Member> memberItr = cluster.getMembers().iterator(); + while (memberItr.hasNext()) { + Member member = memberItr.next(); + if (member.getStatus().equals(MemberStatus.Activated)) { + memberTimeStampMap.putIfAbsent(member.getMemberId(), currentTimeStamp); + memberIdMap.put(member.getMemberId(), member); + } + } + } + } + TopologyManager.releaseReadLock(); + } + if (log.isDebugEnabled()) { + log.debug("Member TimeStamp Map: " + memberTimeStampMap); + log.debug("Member ID Map: " + memberIdMap); + } + } - private void publishMemberFault(String memberID){ - Member member = memberIdMap.get(memberID); - if (member == null){ - log.error("Failed to publish MemberFault event. Member having [member-id] " + memberID + " does not exist in topology"); - return; - } - MemberFaultEvent memberFaultEvent = new MemberFaultEvent(member.getClusterId(), member.getMemberId(), member.getPartitionId(), 0); - memberFaultEventMessageMap.put("message", memberFaultEvent); - Properties headers = new Properties(); - headers.put(Constants.EVENT_CLASS_NAME, memberFaultEvent.getClass().getName()); - healthStatPublisher.publish(MemberFaultEventMap, headers, true); + private void publishMemberFault(String memberID) { + Member member = memberIdMap.get(memberID); + if (member == null) { + log.error("Failed to publish MemberFault event. Member having [member-id] " + memberID + + " does not exist in topology"); + return; + } + MemberFaultEvent memberFaultEvent = + new MemberFaultEvent(member.getClusterId(), + member.getMemberId(), + member.getPartitionId(), 0); + memberFaultEventMessageMap.put("message", memberFaultEvent); - if (log.isDebugEnabled()){ - log.debug("Published MemberFault event for [member-id] " + memberID); - } - } + healthStatPublisher = EventPublisherPool.getPublisher(HEALTH_STAT_MEMBER_FAULT_EVENT); + healthStatPublisher.publish(MemberFaultEventMap, true); + if (log.isDebugEnabled()) { + log.debug("Published MemberFault event for [member-id] " + memberID); + } + } - @Override - public void run() { - try { - threadBarrier.pass(); - loadFromTopology(); - Iterator it = memberTimeStampMap.entrySet().iterator(); + @Override + public void run() { + try { + threadBarrier.pass(); + loadFromTopology(); + Iterator it = memberTimeStampMap.entrySet().iterator(); - while ( it.hasNext() ) { - Map.Entry pair = (Map.Entry)it.next(); - long currentTime = System.currentTimeMillis(); - Long eventTimeStamp = (Long) pair.getValue(); + while (it.hasNext()) { + Map.Entry pair = (Map.Entry) it.next(); + long currentTime = System.currentTimeMillis(); + Long eventTimeStamp = (Long) pair.getValue(); - if ((currentTime - eventTimeStamp) > TIME_OUT) { - log.info("Faulty member detected [member-id] " + pair.getKey() + " with [last time-stamp] " + eventTimeStamp + " [time-out] " + TIME_OUT + " milliseconds"); - it.remove(); - publishMemberFault((String) pair.getKey()); - } - } - if (log.isDebugEnabled()){ - log.debug("Fault handling processor iteration completed with [time-stamp map length] " + memberTimeStampMap.size() + " [activated member-count] " + memberIdMap.size()); - } - eventRemoverScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); - } catch (Throwable t) { - log.error(t.getMessage(), t); - } - } + if ((currentTime - eventTimeStamp) > TIME_OUT) { + log.info("Faulty member detected [member-id] " + pair.getKey() + + " with [last time-stamp] " + eventTimeStamp + " [time-out] " + + TIME_OUT + " milliseconds"); + it.remove(); + publishMemberFault((String) pair.getKey()); + } + } + if (log.isDebugEnabled()) { + log.debug("Fault handling processor iteration completed with [time-stamp map length] " + + memberTimeStampMap.size() + + " [activated member-count] " + + memberIdMap.size()); + } + eventRemoverScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); + } catch (Throwable t) { + log.error(t.getMessage(), t); + } + } - @Override - protected Object[] currentState() { - return new Object[]{window.currentState()}; - } + @Override + protected Object[] currentState() { + return new Object[] { window.currentState() }; + } - @Override - protected void restoreState(Object[] data) { - window.restoreState(data); - window.restoreState((Object[]) data[0]); - window.reSchedule(); - } + @Override + protected void restoreState(Object[] data) { + window.restoreState(data); + window.restoreState((Object[]) data[0]); + window.reSchedule(); + } - @Override - protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor, AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) { - if (parameters[0] instanceof IntConstant) { - timeToKeep = ((IntConstant) parameters[0]).getValue(); - } else { - timeToKeep = ((LongConstant) parameters[0]).getValue(); - } + @Override + protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor, + AbstractDefinition streamDefinition, String elementId, boolean async, + SiddhiContext siddhiContext) { + if (parameters[0] instanceof IntConstant) { + timeToKeep = ((IntConstant) parameters[0]).getValue(); + } else { + timeToKeep = ((LongConstant) parameters[0]).getValue(); + } - memberID = ((Variable)parameters[1]).getAttributeName(); + memberID = ((Variable) parameters[1]).getAttributeName(); - String subjectedAttr = ((Variable)parameters[1]).getAttributeName(); - subjectedAttrIndex = streamDefinition.getAttributePosition(subjectedAttr); + String subjectedAttr = ((Variable) parameters[1]).getAttributeName(); + subjectedAttrIndex = streamDefinition.getAttributePosition(subjectedAttr); - if (this.siddhiContext.isDistributedProcessingEnabled()) { - window = new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, this.async); - } else { - window = new SchedulerSiddhiQueue<StreamEvent>(this); - } - MemberFaultEventMap.put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap); - this.topologyEventReceiver = new TopologyEventReceiver(); - Thread thread = new Thread(topologyEventReceiver); - thread.start(); - log.info("WSO2 CEP topology receiver thread started"); + if (this.siddhiContext.isDistributedProcessingEnabled()) { + window = + new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, + this.async); + } else { + window = new SchedulerSiddhiQueue<StreamEvent>(this); + } + MemberFaultEventMap.put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", + memberFaultEventMessageMap); + this.topologyEventReceiver = new TopologyEventReceiver(); + Thread thread = new Thread(topologyEventReceiver); + thread.start(); + log.info("WSO2 CEP topology receiver thread started"); - //Ordinary scheduling - window.schedule(); + // Ordinary scheduling + window.schedule(); - } + } - @Override - public void schedule() { - eventRemoverScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); - } + @Override + public void schedule() { + eventRemoverScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS); + } - @Override - public void scheduleNow() { - eventRemoverScheduler.schedule(this, 0, TimeUnit.MILLISECONDS); - } + @Override + public void scheduleNow() { + eventRemoverScheduler.schedule(this, 0, TimeUnit.MILLISECONDS); + } - @Override - public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) { - this.eventRemoverScheduler = scheduledExecutorService; - } + @Override + public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) { + this.eventRemoverScheduler = scheduledExecutorService; + } - @Override - public void setThreadBarrier(ThreadBarrier threadBarrier) { - this.threadBarrier = threadBarrier; - } + @Override + public void setThreadBarrier(ThreadBarrier threadBarrier) { + this.threadBarrier = threadBarrier; + } - @Override - public void destroy(){ - this.topologyEventReceiver.terminate(); - window = null; - } + @Override + public void destroy() { + this.topologyEventReceiver.terminate(); + window = null; + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/features/cloud-controller/org.apache.stratos.cloud.controller.feature/pom.xml ---------------------------------------------------------------------- diff --git a/features/cloud-controller/org.apache.stratos.cloud.controller.feature/pom.xml b/features/cloud-controller/org.apache.stratos.cloud.controller.feature/pom.xml index 125bbd3..2b0120f 100644 --- a/features/cloud-controller/org.apache.stratos.cloud.controller.feature/pom.xml +++ b/features/cloud-controller/org.apache.stratos.cloud.controller.feature/pom.xml @@ -275,6 +275,7 @@ </bundleDef> <bundleDef>org.apache.jclouds.driver:jclouds-bouncycastle:${jclouds.version}</bundleDef> <bundleDef>org.apache.jclouds.driver:jclouds-sshj:${jclouds.version}</bundleDef> + <!--bundleDef>org.apache.jclouds.driver:jclouds-log4j:${jclouds.version}</bundleDef--> <bundleDef>org.apache.jclouds.driver:jclouds-enterprise:${jclouds.version}</bundleDef> <bundleDef>org.apache.jclouds:jclouds-core:${jclouds.version}</bundleDef> <bundleDef>org.apache.jclouds:jclouds-compute:${jclouds.version}</bundleDef> @@ -288,12 +289,14 @@ <bundleDef>org.apache.jclouds.api:openstack-keystone:${jclouds.version}</bundleDef> <bundleDef>org.apache.stratos:aws-ec2:1.8.0-stratos</bundleDef> <bundleDef>com.google.guava:guava:17.0</bundleDef> + + <bundleDef> org.apache.servicemix.bundles:org.apache.servicemix.bundles.jsch-agentproxy-jsch:0.0.7_1 </bundleDef> <bundleDef>com.jcraft:jsch.agentproxy.connector-factory:0.0.7</bundleDef> <bundleDef>com.jcraft:jsch.agentproxy.sshagent:0.0.7</bundleDef> - <bundleDef>com.jcraft:jsch.agentproxy.usocket-nc:0.0.7</bundleDef> + <bundleDef>com.jcraft:jsch.agentproxy.usocket-jna:0.0.7</bundleDef> <bundleDef>com.jcraft:jsch.agentproxy.core:0.0.7</bundleDef> <bundleDef>net.java.dev.jna:jna:4.1.0</bundleDef> <bundleDef> @@ -302,6 +305,7 @@ <bundleDef>net.schmizz:sshj:0.9.0</bundleDef> <bundleDef>org.apache.servicemix.bundles:org.apache.servicemix.bundles.jzlib:1.1.1_1 </bundleDef> + <bundleDef>com.google.code.gson:gson:${gson2.version}</bundleDef> <bundleDef>com.google.guice.wso2:guice:${google.guice.wso2.version}</bundleDef> <bundleDef> @@ -323,6 +327,7 @@ </bundleDef> <bundleDef>jdom.wso2:jdom:1.0.0.wso2v1</bundleDef> <bundleDef>org.json.wso2:json:1.0.0.wso2v1</bundleDef> + <!--bundleDef>org.jaggeryjs:0.9.0.ALPHA2-wso2v2</bundleDef--> <bundleDef>org.apache.jclouds.api:sts:${jclouds.version}</bundleDef> <bundleDef>javax.ws.rs:jsr311-api:1.1.1</bundleDef> <bundleDef>org.apache.stratos:org.apache.stratos.messaging:${project.version} http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 0592097..0914560 100644 --- a/pom.xml +++ b/pom.xml @@ -98,7 +98,10 @@ <checksumPolicy>ignore</checksumPolicy> </releases> </repository> - + <repository> + <id>Eclipse Paho Repo</id> + <url>https://repo.eclipse.org/content/repositories/paho-releases/</url> + </repository> <repository> <id>central</id> <name>Maven Repository Switchboard</name> http://git-wip-us.apache.org/repos/asf/stratos/blob/bc78b9dc/products/stratos/conf/mqtttopic.properties ---------------------------------------------------------------------- diff --git a/products/stratos/conf/mqtttopic.properties b/products/stratos/conf/mqtttopic.properties new file mode 100644 index 0000000..823c1a9 --- /dev/null +++ b/products/stratos/conf/mqtttopic.properties @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +mqtturl=tcp://localhost:1883 +clientID=stratos +tempfilelocation=/tmp
