http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventMessageListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventMessageListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventMessageListener.java deleted file mode 100644 index 93eeb54..0000000 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventMessageListener.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.messaging.message.receiver.application.status; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import javax.jms.TextMessage; - -public class ApplicationStatusEventMessageListener implements MessageListener { - private static final Log log = LogFactory.getLog(ApplicationStatusEventMessageListener.class); - - private ApplicationStatusEventMessageQueue messageQueue; - - public ApplicationStatusEventMessageListener(ApplicationStatusEventMessageQueue messageQueue) { - this.messageQueue = messageQueue; - } - - @Override - public void onMessage(Message message) { - if (message instanceof TextMessage) { - TextMessage receivedMessage = (TextMessage) message; - try { - if (log.isDebugEnabled()) { - log.debug(String.format("Tenant message received: %s", ((TextMessage) message).getText())); - } - // Add received message to the queue - messageQueue.add(receivedMessage); - - } catch (JMSException e) { - log.error(e.getMessage(), e); - } - } - } -}
http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventMessageQueue.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventMessageQueue.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventMessageQueue.java deleted file mode 100644 index ba455c9..0000000 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventMessageQueue.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.messaging.message.receiver.application.status; - - -import javax.jms.TextMessage; -import java.util.concurrent.LinkedBlockingQueue; - -public class ApplicationStatusEventMessageQueue extends LinkedBlockingQueue<TextMessage> { -} http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventReceiver.java deleted file mode 100644 index 0b6cada..0000000 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/status/ApplicationStatusEventReceiver.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.messaging.message.receiver.application.status; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber; -import org.apache.stratos.messaging.listener.EventListener; -import org.apache.stratos.messaging.util.Constants; - -public class ApplicationStatusEventReceiver implements Runnable { - private static final Log log = LogFactory.getLog(ApplicationStatusEventReceiver.class); - - private ApplicationStatusEventMessageDelegator messageDelegator; - private ApplicationStatusEventMessageListener messageListener; - private TopicSubscriber topicSubscriber; - private boolean terminated; - - public ApplicationStatusEventReceiver() { - ApplicationStatusEventMessageQueue messageQueue = new ApplicationStatusEventMessageQueue(); - this.messageDelegator = new ApplicationStatusEventMessageDelegator(messageQueue); - this.messageListener = new ApplicationStatusEventMessageListener(messageQueue); - } - - public void addEventListener(EventListener eventListener) { - messageDelegator.addEventListener(eventListener); - } - - @Override - public void run() { - try { - // Start topic subscriber thread - topicSubscriber = new TopicSubscriber(Constants.APPLICATION_STATUS_TOPIC); - topicSubscriber.setMessageListener(messageListener); - Thread subscriberThread = new Thread(topicSubscriber); - subscriberThread.start(); - if (log.isDebugEnabled()) { - log.debug("Application status event message receiver thread started"); - } - - // Start Application status event message delegator thread - Thread receiverThread = new Thread(messageDelegator); - receiverThread.start(); - if (log.isDebugEnabled()) { - log.debug("Application status event message delegator thread started"); - } - - // Keep the thread live until terminated - while (!terminated) { - try { - Thread.sleep(1000); - } catch (InterruptedException ignore) { - } - } - } catch (Exception e) { - if (log.isErrorEnabled()) { - log.error("Application status failed", e); - } - } - } - - public void terminate() { - topicSubscriber.terminate(); - messageDelegator.terminate(); - terminated = true; - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventMessageDelegator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventMessageDelegator.java new file mode 100644 index 0000000..68d44b0 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventMessageDelegator.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.receiver.applications; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.listener.EventListener; +import org.apache.stratos.messaging.message.processor.MessageProcessorChain; +import org.apache.stratos.messaging.message.processor.applications.ApplicationsMessageProcessorChain; +import org.apache.stratos.messaging.util.Constants; + +import javax.jms.TextMessage; + +public class ApplicationsEventMessageDelegator implements Runnable { + private static final Log log = LogFactory.getLog(ApplicationsEventMessageDelegator.class); + + private ApplicationsEventMessageQueue messageQueue; + private MessageProcessorChain processorChain; + private boolean terminated; + + public ApplicationsEventMessageDelegator(ApplicationsEventMessageQueue messageQueue) { + this.messageQueue = messageQueue; + this.processorChain = new ApplicationsMessageProcessorChain(); + } + + public void addEventListener(EventListener eventListener) { + processorChain.addEventListener(eventListener); + } + + @Override + public void run() { + try { + if (log.isInfoEnabled()) { + log.info("Application status event message delegator started"); + } + + while (!terminated) { + try { + TextMessage message = messageQueue.take(); + + // Retrieve the header + String type = message.getStringProperty(Constants.EVENT_CLASS_NAME); + + // Retrieve the actual message + String json = message.getText(); + + if (log.isDebugEnabled()) { + log.debug(String.format("Application status event message received from queue: %s", type)); + } + + // Delegate message to message processor chain + if (log.isDebugEnabled()) { + log.debug(String.format("Delegating application status event message: %s", type)); + } + processorChain.process(type, json, null); + } catch (Exception e) { + log.error("Failed to retrieve application status event message", e); + } + } + } catch (Exception e) { + if (log.isErrorEnabled()) { + log.error("Application status event message delegator failed", e); + } + } + } + + /** + * Terminate topology event message delegator thread. + */ + public void terminate() { + terminated = true; + } + + + private EventMessage jsonToEventMessage(String json) { + + EventMessage event = new EventMessage(); + String message; + + //split the message to 3 parts using ':' first is class name, second contains the text 'message' and the third contains + //message + String[] MessageParts = json.split(":", 3); + + String eventType = MessageParts[0].trim(); + eventType = eventType.substring(eventType.indexOf("\"") + 1, eventType.lastIndexOf("\"")); + if (log.isDebugEnabled()) { + log.debug(String.format("Extracted [event type] %s", eventType)); + } + + event.setEventName(eventType); + String messageTag = MessageParts[1]; + messageTag = messageTag.substring(messageTag.indexOf("\"") + 1, messageTag.lastIndexOf("\"")); + + if ("message".equals(messageTag)) { + message = MessageParts[2].trim(); + //Remove trailing bracket twice to get the message + message = message.substring(0, message.lastIndexOf("}")).trim(); + message = message.substring(0, message.lastIndexOf("}")).trim(); + if (message.indexOf('{') == 0 && message.indexOf('}') == message.length() - 1) { + if (log.isDebugEnabled()) { + log.debug(String.format("[Extracted message] %s ", message)); + } + event.setMessage(message); + return event; + } + } + return null; + } + + private class EventMessage { + private String eventName; + private String message; + + private String getEventName() { + return eventName; + } + + private void setEventName(String eventName) { + this.eventName = eventName; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventMessageListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventMessageListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventMessageListener.java new file mode 100644 index 0000000..936c174 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventMessageListener.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.receiver.applications; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.TextMessage; + +public class ApplicationsEventMessageListener implements MessageListener { + private static final Log log = LogFactory.getLog(ApplicationsEventMessageListener.class); + + private ApplicationsEventMessageQueue messageQueue; + + public ApplicationsEventMessageListener(ApplicationsEventMessageQueue messageQueue) { + this.messageQueue = messageQueue; + } + + @Override + public void onMessage(Message message) { + if (message instanceof TextMessage) { + TextMessage receivedMessage = (TextMessage) message; + try { + if (log.isDebugEnabled()) { + log.debug(String.format("Tenant message received: %s", ((TextMessage) message).getText())); + } + // Add received message to the queue + messageQueue.add(receivedMessage); + + } catch (JMSException e) { + log.error(e.getMessage(), e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventMessageQueue.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventMessageQueue.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventMessageQueue.java new file mode 100644 index 0000000..604513e --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventMessageQueue.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.receiver.applications; + + +import javax.jms.TextMessage; +import java.util.concurrent.LinkedBlockingQueue; + +public class ApplicationsEventMessageQueue extends LinkedBlockingQueue<TextMessage> { +} http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventReceiver.java new file mode 100644 index 0000000..b7577bd --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/applications/ApplicationsEventReceiver.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.receiver.applications; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber; +import org.apache.stratos.messaging.listener.EventListener; +import org.apache.stratos.messaging.util.Constants; + +public class ApplicationsEventReceiver implements Runnable { + private static final Log log = LogFactory.getLog(ApplicationsEventReceiver.class); + + private ApplicationsEventMessageDelegator messageDelegator; + private ApplicationsEventMessageListener messageListener; + private TopicSubscriber topicSubscriber; + private boolean terminated; + + public ApplicationsEventReceiver() { + ApplicationsEventMessageQueue messageQueue = new ApplicationsEventMessageQueue(); + this.messageDelegator = new ApplicationsEventMessageDelegator(messageQueue); + this.messageListener = new ApplicationsEventMessageListener(messageQueue); + } + + public void addEventListener(EventListener eventListener) { + messageDelegator.addEventListener(eventListener); + } + + @Override + public void run() { + try { + // Start topic subscriber thread + topicSubscriber = new TopicSubscriber(Constants.APPLICATIONS_TOPIC); + topicSubscriber.setMessageListener(messageListener); + Thread subscriberThread = new Thread(topicSubscriber); + subscriberThread.start(); + if (log.isDebugEnabled()) { + log.debug("Application status event message receiver thread started"); + } + + // Start Application status event message delegator thread + Thread receiverThread = new Thread(messageDelegator); + receiverThread.start(); + if (log.isDebugEnabled()) { + log.debug("Application status event message delegator thread started"); + } + + // Keep the thread live until terminated + while (!terminated) { + try { + Thread.sleep(1000); + } catch (InterruptedException ignore) { + } + } + } catch (Exception e) { + if (log.isErrorEnabled()) { + log.error("Application status failed", e); + } + } + } + + public void terminate() { + topicSubscriber.terminate(); + messageDelegator.terminate(); + terminated = true; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java new file mode 100644 index 0000000..a2fed87 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.receiver.cluster.status; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.listener.EventListener; +import org.apache.stratos.messaging.message.processor.MessageProcessorChain; +import org.apache.stratos.messaging.message.processor.cluster.status.ClusterStatusMessageProcessorChain; +import org.apache.stratos.messaging.util.Constants; + +import javax.jms.TextMessage; + +public class ClusterStatusEventMessageDelegator implements Runnable { + private static final Log log = LogFactory.getLog(ClusterStatusEventMessageDelegator.class); + + private ClusterStatusEventMessageQueue messageQueue; + private MessageProcessorChain processorChain; + private boolean terminated; + + public ClusterStatusEventMessageDelegator(ClusterStatusEventMessageQueue messageQueue) { + this.messageQueue = messageQueue; + this.processorChain = new ClusterStatusMessageProcessorChain(); + } + + public void addEventListener(EventListener eventListener) { + processorChain.addEventListener(eventListener); + } + + @Override + public void run() { + try { + if (log.isInfoEnabled()) { + log.info("Application status event message delegator started"); + } + + while (!terminated) { + try { + TextMessage message = messageQueue.take(); + + // Retrieve the header + String type = message.getStringProperty(Constants.EVENT_CLASS_NAME); + + // Retrieve the actual message + String json = message.getText(); + + if (log.isDebugEnabled()) { + log.debug(String.format("Application status event message received from queue: %s", type)); + } + + // Delegate message to message processor chain + if (log.isDebugEnabled()) { + log.debug(String.format("Delegating application status event message: %s", type)); + } + processorChain.process(type, json, null); + } catch (Exception e) { + log.error("Failed to retrieve application status event message", e); + } + } + } catch (Exception e) { + if (log.isErrorEnabled()) { + log.error("Application status event message delegator failed", e); + } + } + } + + /** + * Terminate topology event message delegator thread. + */ + public void terminate() { + terminated = true; + } + + + private EventMessage jsonToEventMessage(String json) { + + EventMessage event = new EventMessage(); + String message; + + //split the message to 3 parts using ':' first is class name, second contains the text 'message' and the third contains + //message + String[] MessageParts = json.split(":", 3); + + String eventType = MessageParts[0].trim(); + eventType = eventType.substring(eventType.indexOf("\"") + 1, eventType.lastIndexOf("\"")); + if (log.isDebugEnabled()) { + log.debug(String.format("Extracted [event type] %s", eventType)); + } + + event.setEventName(eventType); + String messageTag = MessageParts[1]; + messageTag = messageTag.substring(messageTag.indexOf("\"") + 1, messageTag.lastIndexOf("\"")); + + if ("message".equals(messageTag)) { + message = MessageParts[2].trim(); + //Remove trailing bracket twice to get the message + message = message.substring(0, message.lastIndexOf("}")).trim(); + message = message.substring(0, message.lastIndexOf("}")).trim(); + if (message.indexOf('{') == 0 && message.indexOf('}') == message.length() - 1) { + if (log.isDebugEnabled()) { + log.debug(String.format("[Extracted message] %s ", message)); + } + event.setMessage(message); + return event; + } + } + return null; + } + + private class EventMessage { + private String eventName; + private String message; + + private String getEventName() { + return eventName; + } + + private void setEventName(String eventName) { + this.eventName = eventName; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageListener.java new file mode 100644 index 0000000..12c7800 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageListener.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.receiver.cluster.status; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.TextMessage; + +public class ClusterStatusEventMessageListener implements MessageListener { + private static final Log log = LogFactory.getLog(ClusterStatusEventMessageListener.class); + + private ClusterStatusEventMessageQueue messageQueue; + + public ClusterStatusEventMessageListener(ClusterStatusEventMessageQueue messageQueue) { + this.messageQueue = messageQueue; + } + + @Override + public void onMessage(Message message) { + if (message instanceof TextMessage) { + TextMessage receivedMessage = (TextMessage) message; + try { + if (log.isDebugEnabled()) { + log.debug(String.format("Tenant message received: %s", ((TextMessage) message).getText())); + } + // Add received message to the queue + messageQueue.add(receivedMessage); + + } catch (JMSException e) { + log.error(e.getMessage(), e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageQueue.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageQueue.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageQueue.java new file mode 100644 index 0000000..9656800 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageQueue.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.receiver.cluster.status; + + +import javax.jms.TextMessage; +import java.util.concurrent.LinkedBlockingQueue; + +public class ClusterStatusEventMessageQueue extends LinkedBlockingQueue<TextMessage> { +} http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java new file mode 100644 index 0000000..72ccaed --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.receiver.cluster.status; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber; +import org.apache.stratos.messaging.listener.EventListener; +import org.apache.stratos.messaging.util.Constants; + +public class ClusterStatusEventReceiver implements Runnable { + private static final Log log = LogFactory.getLog(ClusterStatusEventReceiver.class); + + private ClusterStatusEventMessageDelegator messageDelegator; + private ClusterStatusEventMessageListener messageListener; + private TopicSubscriber topicSubscriber; + private boolean terminated; + + public ClusterStatusEventReceiver() { + ClusterStatusEventMessageQueue messageQueue = new ClusterStatusEventMessageQueue(); + this.messageDelegator = new ClusterStatusEventMessageDelegator(messageQueue); + this.messageListener = new ClusterStatusEventMessageListener(messageQueue); + } + + public void addEventListener(EventListener eventListener) { + messageDelegator.addEventListener(eventListener); + } + + @Override + public void run() { + try { + // Start topic subscriber thread + topicSubscriber = new TopicSubscriber(Constants.CLUSTER_STATUS_TOPIC); + topicSubscriber.setMessageListener(messageListener); + Thread subscriberThread = new Thread(topicSubscriber); + subscriberThread.start(); + if (log.isDebugEnabled()) { + log.debug("Application status event message receiver thread started"); + } + + // Start Application status event message delegator thread + Thread receiverThread = new Thread(messageDelegator); + receiverThread.start(); + if (log.isDebugEnabled()) { + log.debug("Application status event message delegator thread started"); + } + + // Keep the thread live until terminated + while (!terminated) { + try { + Thread.sleep(1000); + } catch (InterruptedException ignore) { + } + } + } catch (Exception e) { + if (log.isErrorEnabled()) { + log.error("Application status failed", e); + } + } + } + + public void terminate() { + topicSubscriber.terminate(); + messageDelegator.terminate(); + terminated = true; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/275ba2dd/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java index 33f2f22..2d2d532 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java @@ -24,7 +24,9 @@ public class Constants { public static final String HEALTH_STAT_TOPIC = "summarized-health-stats"; public static final String INSTANCE_STATUS_TOPIC = "instance-status"; public static final String INSTANCE_NOTIFIER_TOPIC = "instance-notifier"; - public static final String APPLICATION_STATUS_TOPIC = "application-status"; + public static final String APPLICATIONS_TOPIC = "applications"; + public static final String CLUSTER_STATUS_TOPIC = "applications"; + public static final String PING_TOPIC = "ping"; public static final String TENANT_TOPIC = "tenant"; public static final String TENANT_RANGE_ALL = "*";
