Updated Branches: refs/heads/master 52d9282ea -> b6dbaf655
Implemented instance notifier event listeners, processors and event message receiver Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/3fd19d0c Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/3fd19d0c Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/3fd19d0c Branch: refs/heads/master Commit: 3fd19d0c421168246884e9272e3d649435b9b8c1 Parents: 2aae4e8 Author: Imesh Gunaratne <[email protected]> Authored: Thu Dec 19 15:29:30 2013 +0530 Committer: Imesh Gunaratne <[email protected]> Committed: Thu Dec 19 15:29:30 2013 +0530 ---------------------------------------------------------------------- .../broker/subscribe/TopicSubscriber.java | 11 ++- .../synchronization/ArtifactUpdatedEvent.java | 82 ------------------ .../instance/notifier/ArtifactUpdatedEvent.java | 84 ++++++++++++++++++ .../notifier/InstanceNotifierEvent.java | 31 +++++++ .../notifier/ArtifactUpdateEventListener.java | 28 ++++++ .../ArtifactUpdateMessageProcessor.java | 61 +++++++++++++ .../InstanceNotifierMessageProcessorChain.java | 53 ++++++++++++ .../InstanceNotifierEventMessageDelegator.java | 91 ++++++++++++++++++++ .../InstanceNotifierEventMessageListener.java | 54 ++++++++++++ .../InstanceNotifierEventMessageQueue.java | 44 ++++++++++ .../InstanceNotifierEventMessageReceiver.java | 77 +++++++++++++++++ .../tenant/TenantEventMessageListener.java | 1 - .../stratos/messaging/util/Constants.java | 2 +- .../messaging/test/MessageFilterTest.java | 81 +++++++++++++++++ 14 files changed, 613 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/3fd19d0c/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java index d8ec008..f5ba8e9 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java @@ -44,8 +44,9 @@ public class TopicSubscriber implements Runnable { private TopicConnector connector; private TopicHealthChecker healthChecker; private javax.jms.TopicSubscriber topicSubscriber = null; + private boolean subscribed; - /** + /** * @param aTopicName * topic name of this subscriber instance. */ @@ -65,8 +66,8 @@ public class TopicSubscriber implements Runnable { topic = topicSession.createTopic(topicName); } topicSubscriber = topicSession.createSubscriber(topic); - topicSubscriber.setMessageListener(messageListener); + subscribed = true; } /** @@ -91,7 +92,6 @@ public class TopicSubscriber implements Runnable { while (!terminated) { try { doSubscribe(); - } catch (Exception e) { log.error("Error while subscribing to the topic: " + topicName, e); } finally { @@ -107,6 +107,7 @@ public class TopicSubscriber implements Runnable { // health checker failed // closes all sessions/connections try { + subscribed = false; if (topicSubscriber != null) { topicSubscriber.close(); } @@ -129,4 +130,8 @@ public class TopicSubscriber implements Runnable { healthChecker.terminate(); terminated = true; } + + public boolean isSubscribed() { + return subscribed; + } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/3fd19d0c/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/artifact/synchronization/ArtifactUpdatedEvent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/artifact/synchronization/ArtifactUpdatedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/artifact/synchronization/ArtifactUpdatedEvent.java deleted file mode 100644 index e555dbc..0000000 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/artifact/synchronization/ArtifactUpdatedEvent.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.messaging.event.artifact.synchronization; - -/** - * This event is fired to a cluster when an artifact notification received from the git repository. - */ - -public class ArtifactUpdatedEvent { - private String clusterId; - private String status; - private String repoUserName; - private String repoPassword; - private String repoURL; - private String tenantId; - - public String getClusterId() { - return clusterId; - } - - public void setClusterId(String clusterId) { - this.clusterId = clusterId; - } - - public String getStatus() { - return status; - } - - public void setStatus(String status) { - this.status = status; - } - - public String getRepoUserName() { - return repoUserName; - } - - public void setRepoUserName(String repoUserName) { - this.repoUserName = repoUserName; - } - - public String getRepoPassword() { - return repoPassword; - } - - public void setRepoPassword(String repoPassword) { - this.repoPassword = repoPassword; - } - - public String getRepoURL() { - return repoURL; - } - - public void setRepoURL(String repoURL) { - this.repoURL = repoURL; - } - - public String getTenantId() { - return tenantId; - } - - public void setTenantId(String tenantId) { - this.tenantId = tenantId; - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/3fd19d0c/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/instance/notifier/ArtifactUpdatedEvent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/instance/notifier/ArtifactUpdatedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/instance/notifier/ArtifactUpdatedEvent.java new file mode 100644 index 0000000..80f6817 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/instance/notifier/ArtifactUpdatedEvent.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.event.instance.notifier; + +import org.apache.stratos.messaging.event.tenant.TenantEvent; + +import java.io.Serializable; + +/** + * This event is fired to a cluster when an artifact notification received from the git repository. + */ + +public class ArtifactUpdatedEvent extends InstanceNotifierEvent implements Serializable { + private String clusterId; + private String status; + private String repoUserName; + private String repoPassword; + private String repoURL; + private String tenantId; + + public String getClusterId() { + return clusterId; + } + + public void setClusterId(String clusterId) { + this.clusterId = clusterId; + } + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + + public String getRepoUserName() { + return repoUserName; + } + + public void setRepoUserName(String repoUserName) { + this.repoUserName = repoUserName; + } + + public String getRepoPassword() { + return repoPassword; + } + + public void setRepoPassword(String repoPassword) { + this.repoPassword = repoPassword; + } + + public String getRepoURL() { + return repoURL; + } + + public void setRepoURL(String repoURL) { + this.repoURL = repoURL; + } + + public String getTenantId() { + return tenantId; + } + + public void setTenantId(String tenantId) { + this.tenantId = tenantId; + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/3fd19d0c/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/instance/notifier/InstanceNotifierEvent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/instance/notifier/InstanceNotifierEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/instance/notifier/InstanceNotifierEvent.java new file mode 100644 index 0000000..4939034 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/instance/notifier/InstanceNotifierEvent.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.messaging.event.instance.notifier; + +import org.apache.stratos.messaging.event.Event; + +import java.io.Serializable; + +/** + * Instance notifier event + */ +public abstract class InstanceNotifierEvent extends Event implements Serializable { + private static final long serialVersionUID = -5113750577049033578L; +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/3fd19d0c/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/instance/notifier/ArtifactUpdateEventListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/instance/notifier/ArtifactUpdateEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/instance/notifier/ArtifactUpdateEventListener.java new file mode 100644 index 0000000..92dd2af --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/instance/notifier/ArtifactUpdateEventListener.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.messaging.listener.instance.notifier; + +import org.apache.stratos.messaging.listener.EventListener; + +/** + * Artifact update event listener. + */ +public abstract class ArtifactUpdateEventListener extends EventListener { +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/3fd19d0c/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/instance/notifier/ArtifactUpdateMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/instance/notifier/ArtifactUpdateMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/instance/notifier/ArtifactUpdateMessageProcessor.java new file mode 100644 index 0000000..e6ee152 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/instance/notifier/ArtifactUpdateMessageProcessor.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.messaging.message.processor.instance.notifier; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.event.instance.notifier.ArtifactUpdatedEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.util.Util; + +/** + * Artifact update message processor. + */ +public class ArtifactUpdateMessageProcessor extends MessageProcessor { + + private static final Log log = LogFactory.getLog(ArtifactUpdateMessageProcessor.class); + + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public boolean process(String type, String message, Object object) { + if (ArtifactUpdatedEvent.class.getName().equals(type)) { + // Parse complete message and build event + ArtifactUpdatedEvent event = (ArtifactUpdatedEvent) Util.jsonToObject(message, ArtifactUpdatedEvent.class); + + // Notify event listeners + notifyEventListeners(event); + return true; + } + else { + if(nextProcessor != null) { + return nextProcessor.process(type, message, object); + } + else { + throw new RuntimeException(String.format("Failed to process artifact update message using available message processors: [type] %s [body] %s", type, message)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/3fd19d0c/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/instance/notifier/InstanceNotifierMessageProcessorChain.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/instance/notifier/InstanceNotifierMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/instance/notifier/InstanceNotifierMessageProcessorChain.java new file mode 100644 index 0000000..aae3b35 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/instance/notifier/InstanceNotifierMessageProcessorChain.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.messaging.message.processor.instance.notifier; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.listener.EventListener; +import org.apache.stratos.messaging.listener.instance.notifier.ArtifactUpdateEventListener; +import org.apache.stratos.messaging.message.processor.MessageProcessorChain; + +/** + * Defines default instance notifier message processor chain. + */ +public class InstanceNotifierMessageProcessorChain extends MessageProcessorChain { + private static final Log log = LogFactory.getLog(InstanceNotifierMessageProcessorChain.class); + + private ArtifactUpdateMessageProcessor artifactUpdateMessageProcessor; + + public void initialize() { + // Add instance notifier event processors + artifactUpdateMessageProcessor = new ArtifactUpdateMessageProcessor(); + add(artifactUpdateMessageProcessor); + + if (log.isDebugEnabled()) { + log.debug("Instance notifier message processor chain initialized"); + } + } + + public void addEventListener(EventListener eventListener) { + if (eventListener instanceof ArtifactUpdateEventListener) { + artifactUpdateMessageProcessor.addEventListener(eventListener); + } else { + throw new RuntimeException("Unknown event listener"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/3fd19d0c/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java new file mode 100644 index 0000000..3ad3015 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.messaging.message.receiver.instance.notifier; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.message.processor.MessageProcessorChain; +import org.apache.stratos.messaging.message.processor.instance.notifier.InstanceNotifierMessageProcessorChain; +import org.apache.stratos.messaging.util.Constants; + +import javax.jms.TextMessage; + + +/** + * Implements logic for processing instance notifier event messages based on a given + * topology process chain. + */ +public class InstanceNotifierEventMessageDelegator implements Runnable { + + private static final Log log = LogFactory.getLog(InstanceNotifierEventMessageDelegator.class); + private MessageProcessorChain processorChain; + private boolean terminated; + + public InstanceNotifierEventMessageDelegator() { + this.processorChain = new InstanceNotifierMessageProcessorChain(); + } + + public InstanceNotifierEventMessageDelegator(MessageProcessorChain processorChain) { + this.processorChain = processorChain; + } + + @Override + public void run() { + try { + if (log.isInfoEnabled()) { + log.info("Instance notifier event message delegator started"); + } + + while (!terminated) { + try { + TextMessage message = InstanceNotifierEventMessageQueue.getInstance().take(); + + // Retrieve the header + String type = message.getStringProperty(Constants.EVENT_CLASS_NAME); + + // Retrieve the actual message + String json = message.getText(); + if (log.isDebugEnabled()) { + log.debug(String.format("Instance notifier event message received from queue: %s", type)); + } + + // Delegate message to message processor chain + if (log.isDebugEnabled()) { + log.debug(String.format("Delegating instance notifier event message: %s", type)); + } + processorChain.process(type, json, null); + } catch (Exception e) { + log.error("Failed to retrieve instance notifier event message", e); + } + } + } catch (Exception e) { + if (log.isErrorEnabled()) { + log.error("Instance notifier event message delegator failed", e); + } + } + } + + /** + * Terminate topology event message delegator thread. + */ + public void terminate() { + terminated = true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/3fd19d0c/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java new file mode 100644 index 0000000..4e134ac --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.messaging.message.receiver.instance.notifier; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.TextMessage; + +/** + * Implements functionality for receiving text based event messages from the instance notifier + * message broker topic and add them to the event queue. + */ +public class InstanceNotifierEventMessageListener implements MessageListener { + + private static final Log log = LogFactory.getLog(InstanceNotifierEventMessageListener.class); + + @Override + public void onMessage(Message message) { + if (message instanceof TextMessage) { + TextMessage receivedMessage = (TextMessage) message; + try { + if (log.isDebugEnabled()) { + log.debug(String.format("Instance notifier message received: %s", ((TextMessage) message).getText())); + } + // Add received message to the queue + InstanceNotifierEventMessageQueue.getInstance().add(receivedMessage); + + } catch (JMSException e) { + log.error(e.getMessage(), e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/3fd19d0c/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageQueue.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageQueue.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageQueue.java new file mode 100644 index 0000000..f345ed2 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageQueue.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.messaging.message.receiver.instance.notifier; + +import javax.jms.TextMessage; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * Implements a blocking queue for managing instance notifier event messages. + */ +public class InstanceNotifierEventMessageQueue extends LinkedBlockingQueue<TextMessage>{ + private static volatile InstanceNotifierEventMessageQueue instance; + + private InstanceNotifierEventMessageQueue(){ + } + + public static synchronized InstanceNotifierEventMessageQueue getInstance() { + if (instance == null) { + synchronized (InstanceNotifierEventMessageQueue.class){ + if (instance == null) { + instance = new InstanceNotifierEventMessageQueue(); + } + } + } + return instance; + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/3fd19d0c/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageReceiver.java new file mode 100644 index 0000000..98040d8 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageReceiver.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.messaging.message.receiver.instance.notifier; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber; +import org.apache.stratos.messaging.util.Constants; + +/** + * A thread for receiving instance notifier information from message broker. + */ +public class InstanceNotifierEventMessageReceiver implements Runnable { + private static final Log log = LogFactory.getLog(InstanceNotifierEventMessageReceiver.class); + private InstanceNotifierEventMessageDelegator messageDelegator; + private TopicSubscriber topicSubscriber; + private boolean terminated; + + public InstanceNotifierEventMessageReceiver() { + this.messageDelegator = new InstanceNotifierEventMessageDelegator(); + } + + public InstanceNotifierEventMessageReceiver(InstanceNotifierEventMessageDelegator messageDelegator) { + this.messageDelegator = messageDelegator; + } + + @Override + public void run() { + try { + // Start topic subscriber thread + topicSubscriber = new TopicSubscriber(Constants.INSTANCE_NOTIFIER_TOPIC); + topicSubscriber.setMessageListener(new InstanceNotifierEventMessageListener()); + Thread subscriberThread = new Thread(topicSubscriber); + subscriberThread.start(); + if (log.isDebugEnabled()) { + log.debug("InstanceNotifier event message receiver thread started"); + } + + // Start instance notifier event message delegator thread + Thread receiverThread = new Thread(messageDelegator); + receiverThread.start(); + if (log.isDebugEnabled()) { + log.debug("InstanceNotifier event message delegator thread started"); + } + + // Keep the thread live until terminated + while (!terminated); + } catch (Exception e) { + if (log.isErrorEnabled()) { + log.error("InstanceNotifier receiver failed", e); + } + } + } + + public void terminate() { + topicSubscriber.terminate(); + messageDelegator.terminate(); + terminated = true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/3fd19d0c/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageListener.java index 79af20e..59e9ad1 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageListener.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageListener.java @@ -21,7 +21,6 @@ package org.apache.stratos.messaging.message.receiver.tenant; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.stratos.messaging.message.receiver.topology.TopologyEventMessageQueue; import javax.jms.JMSException; import javax.jms.Message; http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/3fd19d0c/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java index 0fbaabe..346101e 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java @@ -23,7 +23,7 @@ public class Constants { public static final String TOPOLOGY_TOPIC = "topology"; public static final String HEALTH_STAT_TOPIC = "summarized-health-stats"; public static final String INSTANCE_STATUS_TOPIC = "instance-status"; - public static final String ARTIFACT_SYNCHRONIZATION_TOPIC = "artifact-synchronization"; + public static final String INSTANCE_NOTIFIER_TOPIC = "instance-notifier"; public static final String TENANT_TOPIC = "tenant"; public static final String TENANT_RANGE_DELIMITER = "-"; http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/3fd19d0c/components/org.apache.stratos.messaging/src/test/java/org/apache/stratos/messaging/test/MessageFilterTest.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/test/java/org/apache/stratos/messaging/test/MessageFilterTest.java b/components/org.apache.stratos.messaging/src/test/java/org/apache/stratos/messaging/test/MessageFilterTest.java new file mode 100755 index 0000000..1dc3345 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/test/java/org/apache/stratos/messaging/test/MessageFilterTest.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + + * http://www.apache.org/licenses/LICENSE-2.0 + + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.test; + +import org.apache.stratos.messaging.message.filter.MessageFilter; +import org.apache.stratos.messaging.util.Constants; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.lang.RuntimeException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * Message filter tests. + */ +@RunWith(JUnit4.class) +public class MessageFilterTest { + + @Test + public final void testFilterIncluded() { + String filterName = "filter1"; + String validationError = "MessageFilter.included() method failed"; + System.setProperty(filterName, "property1=value1,value2 | property2=value3,value4"); + MessageFilter messageFilter = new MessageFilter(filterName); + Assert.assertTrue(validationError, messageFilter.included("property1", "value1")); + Assert.assertTrue(validationError, messageFilter.included("property1", "value2")); + Assert.assertTrue(validationError, messageFilter.included("property2", "value3")); + Assert.assertTrue(validationError, messageFilter.included("property2", "value4")); + System.setProperty(filterName, ""); + } + + @Test + public final void testFilterExcluded() { + String filterName = "filter2"; + String validationError = "MessageFilter.excluded() method failed"; + System.setProperty(filterName, "property1=value1,value2 | property2=value3,value4"); + MessageFilter messageFilter = new MessageFilter(filterName); + Assert.assertFalse(validationError, messageFilter.excluded("property1", "value1")); + Assert.assertFalse(validationError, messageFilter.excluded("property1", "value2")); + Assert.assertFalse(validationError, messageFilter.excluded("property2", "value3")); + Assert.assertFalse(validationError, messageFilter.excluded("property2", "value4")); + System.setProperty(filterName, ""); + } + + @Test + public final void testFilterGetAllPropertyValues() { + String filterName = "filter2"; + String validationError = "MessageFilter.getIncludedPropertyValues() method failed"; + System.setProperty(filterName, "property1=value1,value2 | property2=value3,value4"); + MessageFilter messageFilter = new MessageFilter(filterName); + + Collection<String> property1Values = messageFilter.getIncludedPropertyValues("property1"); + Assert.assertTrue(validationError, property1Values.contains("value1")); + Assert.assertTrue(validationError, property1Values.contains("value2")); + + Collection<String> property2Values = messageFilter.getIncludedPropertyValues("property2"); + Assert.assertTrue(validationError, property2Values.contains("value3")); + Assert.assertTrue(validationError, property2Values.contains("value4")); + System.setProperty(filterName, ""); + } +}
