add unit test to message bus in master branch

Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/d44e25ef
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/d44e25ef
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/d44e25ef

Branch: refs/heads/internallb
Commit: d44e25efba72b03b114b5662c60d94ddcc3264f8
Parents: 85e73d1
Author: Kelven Yang <[email protected]>
Authored: Mon Apr 29 14:35:41 2013 -0700
Committer: Kelven Yang <[email protected]>
Committed: Mon Apr 29 14:36:03 2013 -0700

----------------------------------------------------------------------
 .../framework/messagebus/MessageBus.java           |    2 +
 .../framework/messagebus/MessageBusBase.java       |  130 +++++++++++++--
 .../cloudstack/messagebus/TestMessageBus.java      |  116 +++++++++++++
 .../ipc/test/resources/MessageBusTestContext.xml   |   51 ++++++
 4 files changed, 284 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/d44e25ef/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBus.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBus.java 
b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBus.java
index 4aa007d..a15dd44 100644
--- 
a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBus.java
+++ 
b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBus.java
@@ -27,6 +27,8 @@ public interface MessageBus {
        
        void subscribe(String subject, MessageSubscriber subscriber);
        void unsubscribe(String subject, MessageSubscriber subscriber);
+       void clearAll();
+       void prune();
        
        void publish(String senderAddress, String subject, PublishScope scope, 
Object args);
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/d44e25ef/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java
 
b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java
index 5b7af4d..9cf5e77 100644
--- 
a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java
+++ 
b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java
@@ -40,7 +40,7 @@ public class MessageBusBase implements MessageBus {
                _gate = new Gate();
                _pendingActions = new ArrayList<ActionRecord>();
                
-               _subscriberRoot = new SubscriptionNode("/", null);
+               _subscriberRoot = new SubscriptionNode(null, "/", null);
        }
        
        @Override
@@ -72,10 +72,13 @@ public class MessageBusBase implements MessageBus {
        @Override
        public void unsubscribe(String subject, MessageSubscriber subscriber) {
                if(_gate.enter()) {
-                       SubscriptionNode current = locate(subject, null, false);
-                       if(current != null)
-                               current.removeSubscriber(subscriber);
-                       
+                       if(subject != null) {
+                               SubscriptionNode current = locate(subject, 
null, false);
+                               if(current != null)
+                                       current.removeSubscriber(subscriber, 
false);
+                       } else {
+                               
this._subscriberRoot.removeSubscriber(subscriber, true);
+                       }
                        _gate.leave();
                } else {
                        synchronized(_pendingActions) {
@@ -83,7 +86,48 @@ public class MessageBusBase implements MessageBus {
                        }
                }
        }
-
+       
+       @Override
+       public void clearAll() {
+               if(_gate.enter()) {
+                       _subscriberRoot.clearAll();
+                       doPrune();
+                       _gate.leave();
+               } else {
+                       synchronized(_pendingActions) {
+                               _pendingActions.add(new 
ActionRecord(ActionType.ClearAll, null, null));
+                       }
+               }
+       }
+               
+       @Override
+       public void prune() {
+               if(_gate.enter()) {
+                       doPrune();
+                       _gate.leave();
+               } else {
+                       synchronized(_pendingActions) {
+                               _pendingActions.add(new 
ActionRecord(ActionType.Prune, null, null));
+                       }
+               }
+       }
+       
+       private void doPrune() {
+               List<SubscriptionNode> trimNodes = new 
ArrayList<SubscriptionNode>();
+               _subscriberRoot.prune(trimNodes);
+               
+               while(trimNodes.size() > 0) {
+                       SubscriptionNode node = trimNodes.remove(0);
+                       SubscriptionNode parent = node.getParent();
+                       if(parent != null) {
+                               parent.removeChild(node.getNodeKey());
+                               if(parent.isTrimmable()) {
+                                       trimNodes.add(parent);
+                               }
+                       }
+               }
+       }
+       
        @Override
        public void publish(String senderAddress, String subject, PublishScope 
scope, 
                Object args) {
@@ -119,12 +163,22 @@ public class MessageBusBase implements MessageBus {
                                                break;
                                                
                                        case Unsubscribe :
-                                               {
+                                               if(record.getSubject() != null) 
{
                                                        SubscriptionNode 
current = locate(record.getSubject(), null, false);
                                                        if(current != null)
-                                                               
current.removeSubscriber(record.getSubscriber());
+                                                               
current.removeSubscriber(record.getSubscriber(), false);
+                                               } else {
+                                                       
this._subscriberRoot.removeSubscriber(record.getSubscriber(), true);
                                                }
                                                break;
+                                       
+                                       case ClearAll :
+                                               _subscriberRoot.clearAll();
+                                               break;
+                                               
+                                       case Prune :
+                                               doPrune();
+                                               break;
                                                
                                        default :
                                                assert(false);
@@ -136,11 +190,13 @@ public class MessageBusBase implements MessageBus {
                }
        }
        
-       
        private SubscriptionNode locate(String subject, List<SubscriptionNode> 
chainFromTop,
                boolean createPath) {
                
                assert(subject != null);
+               // "/" is special name for root node
+               if(subject.equals("/"))
+                       return _subscriberRoot;
                
                String[] subjectPathTokens = subject.split("\\.");
                return locate(subjectPathTokens, _subscriberRoot, chainFromTop, 
createPath);
@@ -159,7 +215,7 @@ public class MessageBusBase implements MessageBus {
                SubscriptionNode next = current.getChild(subjectPathTokens[0]);
                if(next == null) {
                        if(createPath) {
-                               next = new 
SubscriptionNode(subjectPathTokens[0], null);
+                               next = new SubscriptionNode(current, 
subjectPathTokens[0], null);
                                current.addChild(subjectPathTokens[0], next);
                        } else {
                                return null;
@@ -180,7 +236,9 @@ public class MessageBusBase implements MessageBus {
        //
        private static enum ActionType {
                Subscribe,
-               Unsubscribe
+               Unsubscribe,
+               ClearAll,
+               Prune
        }
        
        private static class ActionRecord {
@@ -262,13 +320,14 @@ public class MessageBusBase implements MessageBus {
        }
        
        private static class SubscriptionNode {
-               @SuppressWarnings("unused")
                private String _nodeKey;
                private List<MessageSubscriber> _subscribers;
                private Map<String, SubscriptionNode> _children;
+               private SubscriptionNode _parent;
                
-               public SubscriptionNode(String nodeKey, MessageSubscriber 
subscriber) {
+               public SubscriptionNode(SubscriptionNode parent, String 
nodeKey, MessageSubscriber subscriber) {
                        assert(nodeKey != null);
+                       _parent = parent;
                        _nodeKey = nodeKey;
                        _subscribers = new ArrayList<MessageSubscriber>();
                        
@@ -278,16 +337,30 @@ public class MessageBusBase implements MessageBus {
                        _children = new HashMap<String, SubscriptionNode>();
                }
                
+               public SubscriptionNode getParent() {
+                       return _parent;                 
+               }
+               
+               public String getNodeKey() {
+                       return _nodeKey;
+               }
+               
                @SuppressWarnings("unused")
                public List<MessageSubscriber> getSubscriber() {
                        return _subscribers;
                }
                
                public void addSubscriber(MessageSubscriber subscriber) {
-                       _subscribers.add(subscriber);
+                       if(!_subscribers.contains(subscriber))
+                               _subscribers.add(subscriber);
                }
                
-               public void removeSubscriber(MessageSubscriber subscriber) {
+               public void removeSubscriber(MessageSubscriber subscriber, 
boolean recursively) {
+                       if(recursively) {
+                               for(Map.Entry<String, SubscriptionNode> entry : 
_children.entrySet()) {
+                                       
entry.getValue().removeSubscriber(subscriber, true);
+                               }
+                       }
                        _subscribers.remove(subscriber);
                }
                
@@ -299,10 +372,37 @@ public class MessageBusBase implements MessageBus {
                        _children.put(key, childNode);
                }
                
+               public void removeChild(String key) {
+                       _children.remove(key);
+               }
+               
+               public void clearAll() {
+                       // depth-first
+                       for(Map.Entry<String, SubscriptionNode> entry : 
_children.entrySet()) {
+                               entry.getValue().clearAll();
+                       }
+                       _subscribers.clear();
+               }
+               
+               public void prune(List<SubscriptionNode> trimNodes) {
+                       assert(trimNodes != null);
+                       
+                       for(Map.Entry<String, SubscriptionNode> entry : 
_children.entrySet()) {
+                               entry.getValue().prune(trimNodes);
+                       }
+                       
+                       if(isTrimmable())
+                               trimNodes.add(this);
+               }
+               
                public void notifySubscribers(String senderAddress, String 
subject,  Object args) {
                        for(MessageSubscriber subscriber : _subscribers) {
                                subscriber.onPublishMessage(senderAddress, 
subject, args);
                        }
                }
+               
+               public boolean isTrimmable() {
+                       return _children.size() == 0 && _subscribers.size() == 
0;
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/d44e25ef/framework/ipc/test/org/apache/cloudstack/messagebus/TestMessageBus.java
----------------------------------------------------------------------
diff --git 
a/framework/ipc/test/org/apache/cloudstack/messagebus/TestMessageBus.java 
b/framework/ipc/test/org/apache/cloudstack/messagebus/TestMessageBus.java
new file mode 100644
index 0000000..dabfdd3
--- /dev/null
+++ b/framework/ipc/test/org/apache/cloudstack/messagebus/TestMessageBus.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.messagebus;
+
+import javax.inject.Inject;
+
+import junit.framework.TestCase;
+
+import org.apache.cloudstack.framework.messagebus.MessageBus;
+import org.apache.cloudstack.framework.messagebus.MessageSubscriber;
+import org.apache.cloudstack.framework.messagebus.PublishScope;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration(locations="classpath:/MessageBusTestContext.xml")
+public class TestMessageBus extends TestCase {
+       
+       @Inject MessageBus _messageBus;
+
+       @Test
+       public void testExactSubjectMatch() {
+               _messageBus.subscribe("Host", new MessageSubscriber() {
+
+                       @Override
+                       public void onPublishMessage(String senderAddress, 
String subject, Object args) {
+                               Assert.assertEquals(subject, "Host");
+                       }
+               });
+               
+               _messageBus.publish(null, "Host", PublishScope.LOCAL, null);
+               _messageBus.publish(null, "VM", PublishScope.LOCAL, null);
+               _messageBus.clearAll();
+       }
+
+       @Test
+       public void testRootSubjectMatch() {
+               _messageBus.subscribe("/", new MessageSubscriber() {
+
+                       @Override
+                       public void onPublishMessage(String senderAddress, 
String subject, Object args) {
+                               Assert.assertTrue(subject.equals("Host") || 
subject.equals("VM"));
+                       }
+               });
+               
+               _messageBus.publish(null, "Host", PublishScope.LOCAL, null);
+               _messageBus.publish(null, "VM", PublishScope.LOCAL, null);
+               _messageBus.clearAll();
+       }
+       
+       @Test
+       public void testMiscMatch() {
+               MessageSubscriber subscriberAtParentLevel = new 
MessageSubscriber() {
+                       @Override
+                       public void onPublishMessage(String senderAddress, 
String subject, Object args) {
+                               Assert.assertTrue(subject.startsWith(("Host")) 
|| subject.startsWith("VM"));
+                       }
+               };
+               
+               MessageSubscriber subscriberAtChildLevel = new 
MessageSubscriber() {
+                       @Override
+                       public void onPublishMessage(String senderAddress, 
String subject, Object args) {
+                               Assert.assertTrue(subject.equals("Host.123"));
+                       }
+               };
+               
+               subscriberAtParentLevel = Mockito.spy(subscriberAtParentLevel);
+               subscriberAtChildLevel = Mockito.spy(subscriberAtChildLevel);
+               
+               _messageBus.subscribe("Host", subscriberAtParentLevel);
+               _messageBus.subscribe("VM", subscriberAtParentLevel);
+               _messageBus.subscribe("Host.123", subscriberAtChildLevel);
+               
+               _messageBus.publish(null, "Host.123", PublishScope.LOCAL, null);
+               _messageBus.publish(null, "Host.321", PublishScope.LOCAL, null);
+               _messageBus.publish(null, "VM.123", PublishScope.LOCAL, null);
+               
+               Mockito.verify(subscriberAtParentLevel).onPublishMessage(null, 
"Host.123", null);
+               Mockito.verify(subscriberAtParentLevel).onPublishMessage(null, 
"Host.321", null);
+               Mockito.verify(subscriberAtParentLevel).onPublishMessage(null, 
"VM.123", null);
+               Mockito.verify(subscriberAtChildLevel).onPublishMessage(null, 
"Host.123", null);
+
+               Mockito.reset(subscriberAtParentLevel);
+               Mockito.reset(subscriberAtChildLevel);
+               
+               _messageBus.unsubscribe(null, subscriberAtParentLevel);
+               _messageBus.publish(null, "Host.123", PublishScope.LOCAL, null);
+               _messageBus.publish(null, "VM.123", PublishScope.LOCAL, null);
+       
+               Mockito.verify(subscriberAtChildLevel).onPublishMessage(null, 
"Host.123", null);
+               Mockito.verify(subscriberAtParentLevel, 
Mockito.times(0)).onPublishMessage(null, "Host.123", null);
+               Mockito.verify(subscriberAtParentLevel, 
Mockito.times(0)).onPublishMessage(null, "VM.123", null);
+               
+               _messageBus.clearAll();
+       }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/d44e25ef/framework/ipc/test/resources/MessageBusTestContext.xml
----------------------------------------------------------------------
diff --git a/framework/ipc/test/resources/MessageBusTestContext.xml 
b/framework/ipc/test/resources/MessageBusTestContext.xml
new file mode 100644
index 0000000..fcfcb08
--- /dev/null
+++ b/framework/ipc/test/resources/MessageBusTestContext.xml
@@ -0,0 +1,51 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements. See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership. The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License. You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied. See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans";
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
+  xmlns:context="http://www.springframework.org/schema/context";
+  xmlns:tx="http://www.springframework.org/schema/tx"; 
+  xmlns:aop="http://www.springframework.org/schema/aop";
+  xsi:schemaLocation="http://www.springframework.org/schema/beans
+                      
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
+                      http://www.springframework.org/schema/tx 
+                      
http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
+                      http://www.springframework.org/schema/aop
+                      
http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
+                      http://www.springframework.org/schema/context
+                      
http://www.springframework.org/schema/context/spring-context-3.0.xsd";>          
           
+  <context:annotation-config />
+
+  <bean id="onwireRegistry" 
class="org.apache.cloudstack.framework.serializer.OnwireClassRegistry"
+    init-method="scan" >
+    <property name="packages">
+      <list>
+        <value>org.apache.cloudstack.framework</value>
+      </list>
+    </property>
+  </bean>
+  
+  <bean id="messageSerializer" 
class="org.apache.cloudstack.framework.serializer.JsonMessageSerializer">
+    <property name="onwireClassRegistry" ref="onwireRegistry" />
+  </bean>
+
+  <bean id="messageBus" class = 
"org.apache.cloudstack.framework.messagebus.MessageBusBase">
+    <property name="messageSerializer" ref="messageSerializer" />
+  </bean>
+  
+</beans>

Reply via email to