Updated Branches:
  refs/heads/master 52d9282ea -> b6dbaf655

Implemented instance notifier event listeners, processors and event message 
receiver


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/3fd19d0c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/3fd19d0c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/3fd19d0c

Branch: refs/heads/master
Commit: 3fd19d0c421168246884e9272e3d649435b9b8c1
Parents: 2aae4e8
Author: Imesh Gunaratne <[email protected]>
Authored: Thu Dec 19 15:29:30 2013 +0530
Committer: Imesh Gunaratne <[email protected]>
Committed: Thu Dec 19 15:29:30 2013 +0530

----------------------------------------------------------------------
 .../broker/subscribe/TopicSubscriber.java       | 11 ++-
 .../synchronization/ArtifactUpdatedEvent.java   | 82 ------------------
 .../instance/notifier/ArtifactUpdatedEvent.java | 84 ++++++++++++++++++
 .../notifier/InstanceNotifierEvent.java         | 31 +++++++
 .../notifier/ArtifactUpdateEventListener.java   | 28 ++++++
 .../ArtifactUpdateMessageProcessor.java         | 61 +++++++++++++
 .../InstanceNotifierMessageProcessorChain.java  | 53 ++++++++++++
 .../InstanceNotifierEventMessageDelegator.java  | 91 ++++++++++++++++++++
 .../InstanceNotifierEventMessageListener.java   | 54 ++++++++++++
 .../InstanceNotifierEventMessageQueue.java      | 44 ++++++++++
 .../InstanceNotifierEventMessageReceiver.java   | 77 +++++++++++++++++
 .../tenant/TenantEventMessageListener.java      |  1 -
 .../stratos/messaging/util/Constants.java       |  2 +-
 .../messaging/test/MessageFilterTest.java       | 81 +++++++++++++++++
 14 files changed, 613 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/3fd19d0c/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
index d8ec008..f5ba8e9 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
@@ -44,8 +44,9 @@ public class TopicSubscriber implements Runnable {
        private TopicConnector connector;
     private TopicHealthChecker healthChecker;
        private javax.jms.TopicSubscriber topicSubscriber = null;
+    private boolean subscribed;
 
-       /**
+    /**
         * @param aTopicName
         *            topic name of this subscriber instance.
         */
@@ -65,8 +66,8 @@ public class TopicSubscriber implements Runnable {
                        topic = topicSession.createTopic(topicName);
                }
                topicSubscriber = topicSession.createSubscriber(topic);
-
                topicSubscriber.setMessageListener(messageListener);
+        subscribed = true;
        }
 
        /**
@@ -91,7 +92,6 @@ public class TopicSubscriber implements Runnable {
                while (!terminated) {
                        try {
                                doSubscribe();
-
                        } catch (Exception e) {
                                log.error("Error while subscribing to the 
topic: " + topicName, e);
                        } finally {
@@ -107,6 +107,7 @@ public class TopicSubscriber implements Runnable {
                                // health checker failed
                                // closes all sessions/connections
                                try {
+                    subscribed = false;
                                        if (topicSubscriber != null) {
                                                topicSubscriber.close();
                                        }
@@ -129,4 +130,8 @@ public class TopicSubscriber implements Runnable {
         healthChecker.terminate();
         terminated = true;
     }
+
+    public boolean isSubscribed() {
+        return subscribed;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/3fd19d0c/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/artifact/synchronization/ArtifactUpdatedEvent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/artifact/synchronization/ArtifactUpdatedEvent.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/artifact/synchronization/ArtifactUpdatedEvent.java
deleted file mode 100644
index e555dbc..0000000
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/artifact/synchronization/ArtifactUpdatedEvent.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.messaging.event.artifact.synchronization;
-
-/**
- * This event is fired to a cluster when an artifact notification received 
from the git repository.
- */
-
-public class ArtifactUpdatedEvent {
-    private String clusterId;
-    private String status;
-    private String repoUserName;
-    private String repoPassword;
-    private String repoURL;
-    private String tenantId;
-
-    public String getClusterId() {
-        return clusterId;
-    }
-
-    public void setClusterId(String clusterId) {
-        this.clusterId = clusterId;
-    }
-
-    public String getStatus() {
-        return status;
-    }
-
-    public void setStatus(String status) {
-        this.status = status;
-    }
-
-       public String getRepoUserName() {
-               return repoUserName;
-       }
-
-       public void setRepoUserName(String repoUserName) {
-               this.repoUserName = repoUserName;
-       }
-
-       public String getRepoPassword() {
-               return repoPassword;
-       }
-
-       public void setRepoPassword(String repoPassword) {
-               this.repoPassword = repoPassword;
-       }
-
-       public String getRepoURL() {
-               return repoURL;
-       }
-
-       public void setRepoURL(String repoURL) {
-               this.repoURL = repoURL;
-       }
-
-       public String getTenantId() {
-               return tenantId;
-       }
-
-       public void setTenantId(String tenantId) {
-               this.tenantId = tenantId;
-       }
-       
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/3fd19d0c/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/instance/notifier/ArtifactUpdatedEvent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/instance/notifier/ArtifactUpdatedEvent.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/instance/notifier/ArtifactUpdatedEvent.java
new file mode 100644
index 0000000..80f6817
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/instance/notifier/ArtifactUpdatedEvent.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.event.instance.notifier;
+
+import org.apache.stratos.messaging.event.tenant.TenantEvent;
+
+import java.io.Serializable;
+
+/**
+ * This event is fired to a cluster when an artifact notification received 
from the git repository.
+ */
+
+public class ArtifactUpdatedEvent extends InstanceNotifierEvent implements 
Serializable {
+    private String clusterId;
+    private String status;
+    private String repoUserName;
+    private String repoPassword;
+    private String repoURL;
+    private String tenantId;
+
+    public String getClusterId() {
+        return clusterId;
+    }
+
+    public void setClusterId(String clusterId) {
+        this.clusterId = clusterId;
+    }
+
+    public String getStatus() {
+        return status;
+    }
+
+    public void setStatus(String status) {
+        this.status = status;
+    }
+
+       public String getRepoUserName() {
+               return repoUserName;
+       }
+
+       public void setRepoUserName(String repoUserName) {
+               this.repoUserName = repoUserName;
+       }
+
+       public String getRepoPassword() {
+               return repoPassword;
+       }
+
+       public void setRepoPassword(String repoPassword) {
+               this.repoPassword = repoPassword;
+       }
+
+       public String getRepoURL() {
+               return repoURL;
+       }
+
+       public void setRepoURL(String repoURL) {
+               this.repoURL = repoURL;
+       }
+
+       public String getTenantId() {
+               return tenantId;
+       }
+
+       public void setTenantId(String tenantId) {
+               this.tenantId = tenantId;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/3fd19d0c/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/instance/notifier/InstanceNotifierEvent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/instance/notifier/InstanceNotifierEvent.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/instance/notifier/InstanceNotifierEvent.java
new file mode 100644
index 0000000..4939034
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/instance/notifier/InstanceNotifierEvent.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.event.instance.notifier;
+
+import org.apache.stratos.messaging.event.Event;
+
+import java.io.Serializable;
+
+/**
+ * Instance notifier event
+ */
+public abstract class InstanceNotifierEvent extends Event implements 
Serializable {
+    private static final long serialVersionUID = -5113750577049033578L;
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/3fd19d0c/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/instance/notifier/ArtifactUpdateEventListener.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/instance/notifier/ArtifactUpdateEventListener.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/instance/notifier/ArtifactUpdateEventListener.java
new file mode 100644
index 0000000..92dd2af
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/instance/notifier/ArtifactUpdateEventListener.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.listener.instance.notifier;
+
+import org.apache.stratos.messaging.listener.EventListener;
+
+/**
+ * Artifact update event listener.
+ */
+public abstract class ArtifactUpdateEventListener extends EventListener {
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/3fd19d0c/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/instance/notifier/ArtifactUpdateMessageProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/instance/notifier/ArtifactUpdateMessageProcessor.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/instance/notifier/ArtifactUpdateMessageProcessor.java
new file mode 100644
index 0000000..e6ee152
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/instance/notifier/ArtifactUpdateMessageProcessor.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.processor.instance.notifier;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import 
org.apache.stratos.messaging.event.instance.notifier.ArtifactUpdatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+/**
+ * Artifact update message processor.
+ */
+public class ArtifactUpdateMessageProcessor extends MessageProcessor {
+
+    private static final Log log = 
LogFactory.getLog(ArtifactUpdateMessageProcessor.class);
+
+    private MessageProcessor nextProcessor;
+
+    @Override
+    public void setNext(MessageProcessor nextProcessor) {
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public boolean process(String type, String message, Object object) {
+        if (ArtifactUpdatedEvent.class.getName().equals(type)) {
+            // Parse complete message and build event
+            ArtifactUpdatedEvent event = (ArtifactUpdatedEvent) 
Util.jsonToObject(message, ArtifactUpdatedEvent.class);
+
+            // Notify event listeners
+            notifyEventListeners(event);
+            return true;
+        }
+        else {
+            if(nextProcessor != null) {
+                return nextProcessor.process(type, message, object);
+            }
+            else {
+                throw new RuntimeException(String.format("Failed to process 
artifact update message using available message processors: [type] %s [body] 
%s", type, message));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/3fd19d0c/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/instance/notifier/InstanceNotifierMessageProcessorChain.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/instance/notifier/InstanceNotifierMessageProcessorChain.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/instance/notifier/InstanceNotifierMessageProcessorChain.java
new file mode 100644
index 0000000..aae3b35
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/instance/notifier/InstanceNotifierMessageProcessorChain.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.processor.instance.notifier;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.listener.EventListener;
+import 
org.apache.stratos.messaging.listener.instance.notifier.ArtifactUpdateEventListener;
+import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
+
+/**
+ * Defines default instance notifier message processor chain.
+ */
+public class InstanceNotifierMessageProcessorChain extends 
MessageProcessorChain {
+    private static final Log log = 
LogFactory.getLog(InstanceNotifierMessageProcessorChain.class);
+
+    private ArtifactUpdateMessageProcessor artifactUpdateMessageProcessor;
+
+    public void initialize() {
+        // Add instance notifier event processors
+        artifactUpdateMessageProcessor = new ArtifactUpdateMessageProcessor();
+        add(artifactUpdateMessageProcessor);
+
+        if (log.isDebugEnabled()) {
+            log.debug("Instance notifier message processor chain initialized");
+        }
+    }
+
+    public void addEventListener(EventListener eventListener) {
+        if (eventListener instanceof ArtifactUpdateEventListener) {
+            artifactUpdateMessageProcessor.addEventListener(eventListener);
+        } else {
+            throw new RuntimeException("Unknown event listener");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/3fd19d0c/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java
new file mode 100644
index 0000000..3ad3015
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.receiver.instance.notifier;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
+import 
org.apache.stratos.messaging.message.processor.instance.notifier.InstanceNotifierMessageProcessorChain;
+import org.apache.stratos.messaging.util.Constants;
+
+import javax.jms.TextMessage;
+
+
+/**
+ * Implements logic for processing instance notifier event messages based on a 
given
+ * topology process chain.
+ */
+public class InstanceNotifierEventMessageDelegator implements Runnable {
+
+    private static final Log log = 
LogFactory.getLog(InstanceNotifierEventMessageDelegator.class);
+    private MessageProcessorChain processorChain;
+    private boolean terminated;
+
+    public InstanceNotifierEventMessageDelegator() {
+        this.processorChain = new InstanceNotifierMessageProcessorChain();
+    }
+
+    public InstanceNotifierEventMessageDelegator(MessageProcessorChain 
processorChain) {
+        this.processorChain = processorChain;
+    }
+
+    @Override
+    public void run() {
+        try {
+            if (log.isInfoEnabled()) {
+                log.info("Instance notifier event message delegator started");
+            }
+
+            while (!terminated) {
+                try {
+                    TextMessage message = 
InstanceNotifierEventMessageQueue.getInstance().take();
+
+                    // Retrieve the header
+                    String type = 
message.getStringProperty(Constants.EVENT_CLASS_NAME);
+
+                    // Retrieve the actual message
+                    String json = message.getText();
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Instance notifier event 
message received from queue: %s", type));
+                    }
+
+                    // Delegate message to message processor chain
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("Delegating instance notifier 
event message: %s", type));
+                    }
+                    processorChain.process(type, json, null);
+                } catch (Exception e) {
+                    log.error("Failed to retrieve instance notifier event 
message", e);
+                }
+            }
+        } catch (Exception e) {
+            if (log.isErrorEnabled()) {
+                log.error("Instance notifier event message delegator failed", 
e);
+            }
+        }
+    }
+
+    /**
+     * Terminate topology event message delegator thread.
+     */
+    public void terminate() {
+        terminated = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/3fd19d0c/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java
new file mode 100644
index 0000000..4e134ac
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.receiver.instance.notifier;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.TextMessage;
+
+/**
+ * Implements functionality for receiving text based event messages from the 
instance notifier
+ * message broker topic and add them to the event queue.
+ */
+public class InstanceNotifierEventMessageListener implements MessageListener {
+
+    private static final Log log = 
LogFactory.getLog(InstanceNotifierEventMessageListener.class);
+
+    @Override
+    public void onMessage(Message message) {
+        if (message instanceof TextMessage) {
+            TextMessage receivedMessage = (TextMessage) message;
+            try {
+                if (log.isDebugEnabled()) {
+                    log.debug(String.format("Instance notifier message 
received: %s", ((TextMessage) message).getText()));
+                }
+                // Add received message to the queue
+                
InstanceNotifierEventMessageQueue.getInstance().add(receivedMessage);
+
+            } catch (JMSException e) {
+                log.error(e.getMessage(), e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/3fd19d0c/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageQueue.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageQueue.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageQueue.java
new file mode 100644
index 0000000..f345ed2
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageQueue.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.receiver.instance.notifier;
+
+import javax.jms.TextMessage;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Implements a blocking queue for managing instance notifier event messages.
+ */
+public class InstanceNotifierEventMessageQueue extends 
LinkedBlockingQueue<TextMessage>{
+    private static volatile InstanceNotifierEventMessageQueue instance;
+
+    private InstanceNotifierEventMessageQueue(){
+    }
+
+    public static synchronized InstanceNotifierEventMessageQueue getInstance() 
{
+        if (instance == null) {
+            synchronized (InstanceNotifierEventMessageQueue.class){
+                if (instance == null) {
+                    instance = new InstanceNotifierEventMessageQueue();
+                }
+            }
+        }
+        return instance;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/3fd19d0c/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageReceiver.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageReceiver.java
new file mode 100644
index 0000000..98040d8
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageReceiver.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.receiver.instance.notifier;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
+import org.apache.stratos.messaging.util.Constants;
+
+/**
+ * A thread for receiving instance notifier information from message broker.
+ */
+public class InstanceNotifierEventMessageReceiver implements Runnable {
+    private static final Log log = 
LogFactory.getLog(InstanceNotifierEventMessageReceiver.class);
+    private InstanceNotifierEventMessageDelegator messageDelegator;
+    private TopicSubscriber topicSubscriber;
+    private boolean terminated;
+
+    public InstanceNotifierEventMessageReceiver() {
+        this.messageDelegator = new InstanceNotifierEventMessageDelegator();
+    }
+
+    public 
InstanceNotifierEventMessageReceiver(InstanceNotifierEventMessageDelegator 
messageDelegator) {
+        this.messageDelegator = messageDelegator;
+    }
+
+    @Override
+    public void run() {
+        try {
+            // Start topic subscriber thread
+            topicSubscriber = new 
TopicSubscriber(Constants.INSTANCE_NOTIFIER_TOPIC);
+            topicSubscriber.setMessageListener(new 
InstanceNotifierEventMessageListener());
+            Thread subscriberThread = new Thread(topicSubscriber);
+            subscriberThread.start();
+            if (log.isDebugEnabled()) {
+                log.debug("InstanceNotifier event message receiver thread 
started");
+            }
+
+            // Start instance notifier event message delegator thread
+            Thread receiverThread = new Thread(messageDelegator);
+            receiverThread.start();
+            if (log.isDebugEnabled()) {
+                log.debug("InstanceNotifier event message delegator thread 
started");
+            }
+
+            // Keep the thread live until terminated
+            while (!terminated);
+        } catch (Exception e) {
+            if (log.isErrorEnabled()) {
+                log.error("InstanceNotifier receiver failed", e);
+            }
+        }
+    }
+
+    public void terminate() {
+        topicSubscriber.terminate();
+        messageDelegator.terminate();
+        terminated = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/3fd19d0c/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageListener.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageListener.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageListener.java
index 79af20e..59e9ad1 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageListener.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageListener.java
@@ -21,7 +21,6 @@ package org.apache.stratos.messaging.message.receiver.tenant;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import 
org.apache.stratos.messaging.message.receiver.topology.TopologyEventMessageQueue;
 
 import javax.jms.JMSException;
 import javax.jms.Message;

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/3fd19d0c/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
index 0fbaabe..346101e 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
@@ -23,7 +23,7 @@ public class Constants {
        public static final String TOPOLOGY_TOPIC = "topology";
        public static final String HEALTH_STAT_TOPIC = 
"summarized-health-stats";
     public static final String INSTANCE_STATUS_TOPIC = "instance-status";
-    public static final String ARTIFACT_SYNCHRONIZATION_TOPIC = 
"artifact-synchronization";
+    public static final String INSTANCE_NOTIFIER_TOPIC = "instance-notifier";
     public static final String TENANT_TOPIC = "tenant";
 
     public static final String TENANT_RANGE_DELIMITER = "-";

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/3fd19d0c/components/org.apache.stratos.messaging/src/test/java/org/apache/stratos/messaging/test/MessageFilterTest.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/test/java/org/apache/stratos/messaging/test/MessageFilterTest.java
 
b/components/org.apache.stratos.messaging/src/test/java/org/apache/stratos/messaging/test/MessageFilterTest.java
new file mode 100755
index 0000000..1dc3345
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/test/java/org/apache/stratos/messaging/test/MessageFilterTest.java
@@ -0,0 +1,81 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+
+ *  http://www.apache.org/licenses/LICENSE-2.0
+
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.stratos.messaging.test;
+
+import org.apache.stratos.messaging.message.filter.MessageFilter;
+import org.apache.stratos.messaging.util.Constants;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.lang.RuntimeException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Message filter tests.
+ */
+@RunWith(JUnit4.class)
+public class MessageFilterTest {
+
+    @Test
+    public final void testFilterIncluded() {
+        String filterName = "filter1";
+        String validationError = "MessageFilter.included() method failed";
+        System.setProperty(filterName,  "property1=value1,value2 | 
property2=value3,value4");
+        MessageFilter messageFilter = new MessageFilter(filterName);
+        Assert.assertTrue(validationError, messageFilter.included("property1", 
"value1"));
+        Assert.assertTrue(validationError, messageFilter.included("property1", 
"value2"));
+        Assert.assertTrue(validationError, messageFilter.included("property2", 
"value3"));
+        Assert.assertTrue(validationError, messageFilter.included("property2", 
"value4"));
+        System.setProperty(filterName, "");
+    }
+
+    @Test
+    public final void testFilterExcluded() {
+        String filterName = "filter2";
+        String validationError = "MessageFilter.excluded() method failed";
+        System.setProperty(filterName,  "property1=value1,value2 | 
property2=value3,value4");
+        MessageFilter messageFilter = new MessageFilter(filterName);
+        Assert.assertFalse(validationError, 
messageFilter.excluded("property1", "value1"));
+        Assert.assertFalse(validationError, 
messageFilter.excluded("property1", "value2"));
+        Assert.assertFalse(validationError, 
messageFilter.excluded("property2", "value3"));
+        Assert.assertFalse(validationError, 
messageFilter.excluded("property2", "value4"));
+        System.setProperty(filterName, "");
+    }
+
+    @Test
+    public final void testFilterGetAllPropertyValues() {
+        String filterName = "filter2";
+        String validationError = "MessageFilter.getIncludedPropertyValues() 
method failed";
+        System.setProperty(filterName,  "property1=value1,value2 | 
property2=value3,value4");
+        MessageFilter messageFilter = new MessageFilter(filterName);
+
+        Collection<String> property1Values = 
messageFilter.getIncludedPropertyValues("property1");
+        Assert.assertTrue(validationError, property1Values.contains("value1"));
+        Assert.assertTrue(validationError, property1Values.contains("value2"));
+
+        Collection<String> property2Values = 
messageFilter.getIncludedPropertyValues("property2");
+        Assert.assertTrue(validationError, property2Values.contains("value3"));
+        Assert.assertTrue(validationError, property2Values.contains("value4"));
+        System.setProperty(filterName, "");
+    }
+}

Reply via email to