Repository: incubator-stratos Updated Branches: refs/heads/master a04e657e1 -> bccad5be0
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageListener.java index 59e9ad1..cafdf74 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageListener.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageListener.java @@ -31,10 +31,16 @@ import javax.jms.TextMessage; * Implements functionality for receiving text based event messages from the tenant * message broker topic and add them to the event queue. */ -public class TenantEventMessageListener implements MessageListener { +class TenantEventMessageListener implements MessageListener { private static final Log log = LogFactory.getLog(TenantEventMessageListener.class); + private TenantEventMessageQueue messageQueue; + + public TenantEventMessageListener(TenantEventMessageQueue messageQueue) { + this.messageQueue = messageQueue; + } + @Override public void onMessage(Message message) { if (message instanceof TextMessage) { @@ -44,7 +50,7 @@ public class TenantEventMessageListener implements MessageListener { log.debug(String.format("Tenant message received: %s", ((TextMessage) message).getText())); } // Add received message to the queue - TenantEventMessageQueue.getInstance().add(receivedMessage); + messageQueue.add(receivedMessage); } catch (JMSException e) { log.error(e.getMessage(), e); http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageQueue.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageQueue.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageQueue.java index 423f169..d6f0217 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageQueue.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageQueue.java @@ -25,20 +25,5 @@ import java.util.concurrent.LinkedBlockingQueue; /** * Implements a blocking queue for managing tenant event messages. */ -public class TenantEventMessageQueue extends LinkedBlockingQueue<TextMessage>{ - private static volatile TenantEventMessageQueue instance; - - private TenantEventMessageQueue(){ - } - - public static TenantEventMessageQueue getInstance() { - if (instance == null) { - synchronized (TenantEventMessageQueue.class){ - if (instance == null) { - instance = new TenantEventMessageQueue(); - } - } - } - return instance; - } +class TenantEventMessageQueue extends LinkedBlockingQueue<TextMessage> { } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java new file mode 100644 index 0000000..059f95f --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.messaging.message.receiver.tenant; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber; +import org.apache.stratos.messaging.listener.EventListener; +import org.apache.stratos.messaging.util.Constants; + +/** + * A thread for receiving tenant information from message broker and + * build tenant information in tenant manager. + */ +public class TenantEventReceiver implements Runnable { + private static final Log log = LogFactory.getLog(TenantEventReceiver.class); + private TenantEventMessageDelegator messageDelegator; + private TenantEventMessageListener messageListener; + private TopicSubscriber topicSubscriber; + private boolean terminated; + + public TenantEventReceiver() { + TenantEventMessageQueue messageQueue = new TenantEventMessageQueue(); + this.messageDelegator = new TenantEventMessageDelegator(messageQueue); + this.messageListener = new TenantEventMessageListener(messageQueue); + } + + public void addEventListener(EventListener eventListener) { + messageDelegator.addEventListener(eventListener); + } + + @Override + public void run() { + try { + // Start topic subscriber thread + topicSubscriber = new TopicSubscriber(Constants.TENANT_TOPIC); + topicSubscriber.setMessageListener(messageListener); + Thread subscriberThread = new Thread(topicSubscriber); + subscriberThread.start(); + if (log.isDebugEnabled()) { + log.debug("Tenant event message receiver thread started"); + } + + // Start tenant event message delegator thread + Thread receiverThread = new Thread(messageDelegator); + receiverThread.start(); + if (log.isDebugEnabled()) { + log.debug("Tenant event message delegator thread started"); + } + + // Keep the thread live until terminated + while (!terminated) { + try { + Thread.sleep(1000); + } catch (InterruptedException ignore) { + } + } + } catch (Exception e) { + if (log.isErrorEnabled()) { + log.error("Tenant receiver failed", e); + } + } + } + + public void terminate() { + topicSubscriber.terminate(); + messageDelegator.terminate(); + terminated = true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantReceiver.java deleted file mode 100644 index a6f2141..0000000 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantReceiver.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.stratos.messaging.message.receiver.tenant; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber; -import org.apache.stratos.messaging.util.Constants; - -/** - * A thread for receiving tenant information from message broker and - * build tenant information in tenant manager. - */ -public class TenantReceiver implements Runnable { - private static final Log log = LogFactory.getLog(TenantReceiver.class); - private TenantEventMessageDelegator messageDelegator; - private TopicSubscriber topicSubscriber; - private boolean terminated; - - public TenantReceiver() { - this.messageDelegator = new TenantEventMessageDelegator(); - } - - public TenantReceiver(TenantEventMessageDelegator messageDelegator) { - this.messageDelegator = messageDelegator; - } - - @Override - public void run() { - try { - // Start topic subscriber thread - topicSubscriber = new TopicSubscriber(Constants.TENANT_TOPIC); - topicSubscriber.setMessageListener(new TenantEventMessageListener()); - Thread subscriberThread = new Thread(topicSubscriber); - subscriberThread.start(); - if (log.isDebugEnabled()) { - log.debug("Tenant event message receiver thread started"); - } - - // Start tenant event message delegator thread - Thread receiverThread = new Thread(messageDelegator); - receiverThread.start(); - if (log.isDebugEnabled()) { - log.debug("Tenant event message delegator thread started"); - } - - // Keep the thread live until terminated - while (!terminated) { - try { - Thread.sleep(1000); - } catch (InterruptedException ignore) { - } - } - } catch (Exception e) { - if (log.isErrorEnabled()) { - log.error("Tenant receiver failed", e); - } - } - } - - public void terminate() { - topicSubscriber.terminate(); - messageDelegator.terminate(); - terminated = true; - } -} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java index a06bbe3..9cc8f78 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java @@ -18,41 +18,34 @@ */ package org.apache.stratos.messaging.message.receiver.topology; -import java.util.concurrent.LinkedBlockingQueue; - -import javax.jms.TextMessage; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.listener.EventListener; import org.apache.stratos.messaging.message.processor.MessageProcessorChain; -import org.apache.stratos.messaging.message.processor.topology.*; +import org.apache.stratos.messaging.message.processor.topology.TopologyMessageProcessorChain; import org.apache.stratos.messaging.util.Constants; +import javax.jms.TextMessage; + /** * Implements logic for processing topology event messages based on a given * topology process chain. */ -public class TopologyEventMessageDelegator implements Runnable { +class TopologyEventMessageDelegator implements Runnable { private static final Log log = LogFactory.getLog(TopologyEventMessageDelegator.class); private MessageProcessorChain processorChain; - private LinkedBlockingQueue<TextMessage> messageQueue; + private TopologyEventMessageQueue messageQueue; private boolean terminated; - public TopologyEventMessageDelegator() { + public TopologyEventMessageDelegator(TopologyEventMessageQueue messageQueue) { + this.messageQueue = messageQueue; this.processorChain = new TopologyMessageProcessorChain(); - this.messageQueue = TopologyEventMessageQueue.getInstance(); } - public TopologyEventMessageDelegator(MessageProcessorChain processorChain) { - this.processorChain = processorChain; - this.messageQueue = TopologyEventMessageQueue.getInstance(); - } - - public TopologyEventMessageDelegator(MessageProcessorChain processorChain, LinkedBlockingQueue<TextMessage> queue) { - this.processorChain = processorChain; - this.messageQueue = queue; + public void addEventListener(EventListener eventListener) { + processorChain.addEventListener(eventListener); } @Override http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageListener.java index 03afe13..799b1b1 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageListener.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageListener.java @@ -18,22 +18,27 @@ */ package org.apache.stratos.messaging.message.receiver.topology; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - /** * Implements functionality for receiving text based event messages from the topology * message broker topic and add them to the event queue. */ -public class TopologyEventMessageListener implements MessageListener { - +class TopologyEventMessageListener implements MessageListener { private static final Log log = LogFactory.getLog(TopologyEventMessageListener.class); + private TopologyEventMessageQueue messageQueue; + + public TopologyEventMessageListener(TopologyEventMessageQueue messageQueue) { + this.messageQueue = messageQueue; + } + @Override public void onMessage(Message message) { if (message instanceof TextMessage) { @@ -43,7 +48,7 @@ public class TopologyEventMessageListener implements MessageListener { log.debug(String.format("Topology message received: %s", ((TextMessage) message).getText())); } // Add received message to the queue - TopologyEventMessageQueue.getInstance().add(receivedMessage); + messageQueue.add(receivedMessage); } catch (JMSException e) { log.error(e.getMessage(), e); http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageQueue.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageQueue.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageQueue.java index db33289..8ebbc98 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageQueue.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageQueue.java @@ -19,27 +19,11 @@ package org.apache.stratos.messaging.message.receiver.topology; -import java.util.concurrent.LinkedBlockingQueue; - import javax.jms.TextMessage; +import java.util.concurrent.LinkedBlockingQueue; /** * Implements a blocking queue for managing topology event messages. */ -public class TopologyEventMessageQueue extends LinkedBlockingQueue<TextMessage>{ - private static volatile TopologyEventMessageQueue instance; - - private TopologyEventMessageQueue(){ - } - - public static TopologyEventMessageQueue getInstance() { - if (instance == null) { - synchronized (TopologyEventMessageQueue.class){ - if (instance == null) { - instance = new TopologyEventMessageQueue(); - } - } - } - return instance; - } +class TopologyEventMessageQueue extends LinkedBlockingQueue<TextMessage> { } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java new file mode 100644 index 0000000..b271beb --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.messaging.message.receiver.topology; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber; +import org.apache.stratos.messaging.listener.EventListener; +import org.apache.stratos.messaging.util.Constants; + +/** + * A thread for receiving topology information from message broker and + * build topology in topology manager. + */ +public class TopologyEventReceiver implements Runnable { + private static final Log log = LogFactory.getLog(TopologyEventReceiver.class); + private TopologyEventMessageDelegator messageDelegator; + private TopologyEventMessageListener messageListener; + private TopicSubscriber topicSubscriber; + private boolean terminated; + + public TopologyEventReceiver() { + TopologyEventMessageQueue messageQueue = new TopologyEventMessageQueue(); + this.messageDelegator = new TopologyEventMessageDelegator(messageQueue); + this.messageListener = new TopologyEventMessageListener(messageQueue); + } + + public void addEventListener(EventListener eventListener) { + messageDelegator.addEventListener(eventListener); + } + + @Override + public void run() { + try { + // Start topic subscriber thread + topicSubscriber = new TopicSubscriber(Constants.TOPOLOGY_TOPIC); + topicSubscriber.setMessageListener(messageListener); + Thread subscriberThread = new Thread(topicSubscriber); + subscriberThread.start(); + if (log.isDebugEnabled()) { + log.debug("Topology event message receiver thread started"); + } + + // Start topology event message delegator thread + Thread receiverThread = new Thread(messageDelegator); + receiverThread.start(); + if (log.isDebugEnabled()) { + log.debug("Topology event message delegator thread started"); + } + + // Keep the thread live until terminated + while (!terminated) { + try { + Thread.sleep(1000); + } catch (InterruptedException ignore) { + } + } + } catch (Exception e) { + if (log.isErrorEnabled()) { + log.error("Topology receiver failed", e); + } + } + } + + public void terminate() { + topicSubscriber.terminate(); + messageDelegator.terminate(); + terminated = true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bccad5be/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyReceiver.java deleted file mode 100644 index ab02956..0000000 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyReceiver.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.stratos.messaging.message.receiver.topology; - -import javax.jms.MessageListener; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber; -import org.apache.stratos.messaging.util.Constants; - -/** - * A thread for receiving topology information from message broker and - * build topology in topology manager. - */ -public class TopologyReceiver implements Runnable { - private static final Log log = LogFactory.getLog(TopologyReceiver.class); - private TopologyEventMessageDelegator messageDelegator; - private MessageListener messageListener; - private TopicSubscriber topicSubscriber; - private boolean terminated; - - public TopologyReceiver() { - this.messageDelegator = new TopologyEventMessageDelegator(); - this.messageListener = new TopologyEventMessageListener(); - } - - public TopologyReceiver(TopologyEventMessageDelegator messageDelegator) { - this.messageDelegator = messageDelegator; - this.messageListener = new TopologyEventMessageListener(); - } - - public TopologyReceiver(MessageListener listener) { - this.messageDelegator = new TopologyEventMessageDelegator(); - this.messageListener = listener; - } - - public TopologyReceiver(TopologyEventMessageDelegator messageDelegator, MessageListener listener) { - this.messageDelegator = messageDelegator; - this.messageListener = listener; - } - - - @Override - public void run() { - try { - // Start topic subscriber thread - topicSubscriber = new TopicSubscriber(Constants.TOPOLOGY_TOPIC); - topicSubscriber.setMessageListener(messageListener); - Thread subscriberThread = new Thread(topicSubscriber); - subscriberThread.start(); - if (log.isDebugEnabled()) { - log.debug("Topology event message receiver thread started"); - } - - // Start topology event message delegator thread - Thread receiverThread = new Thread(messageDelegator); - receiverThread.start(); - if (log.isDebugEnabled()) { - log.debug("Topology event message delegator thread started"); - } - - // Keep the thread live until terminated - while (!terminated) { - try { - Thread.sleep(1000); - } catch (InterruptedException ignore) { - } - } - } catch (Exception e) { - if (log.isErrorEnabled()) { - log.error("Topology receiver failed", e); - } - } - } - - public void terminate() { - topicSubscriber.terminate(); - messageDelegator.terminate(); - terminated = true; - } -}
