add unit test to message bus in master branch
Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/d44e25ef Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/d44e25ef Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/d44e25ef Branch: refs/heads/internallb Commit: d44e25efba72b03b114b5662c60d94ddcc3264f8 Parents: 85e73d1 Author: Kelven Yang <[email protected]> Authored: Mon Apr 29 14:35:41 2013 -0700 Committer: Kelven Yang <[email protected]> Committed: Mon Apr 29 14:36:03 2013 -0700 ---------------------------------------------------------------------- .../framework/messagebus/MessageBus.java | 2 + .../framework/messagebus/MessageBusBase.java | 130 +++++++++++++-- .../cloudstack/messagebus/TestMessageBus.java | 116 +++++++++++++ .../ipc/test/resources/MessageBusTestContext.xml | 51 ++++++ 4 files changed, 284 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cloudstack/blob/d44e25ef/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBus.java ---------------------------------------------------------------------- diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBus.java b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBus.java index 4aa007d..a15dd44 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBus.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBus.java @@ -27,6 +27,8 @@ public interface MessageBus { void subscribe(String subject, MessageSubscriber subscriber); void unsubscribe(String subject, MessageSubscriber subscriber); + void clearAll(); + void prune(); void publish(String senderAddress, String subject, PublishScope scope, Object args); } http://git-wip-us.apache.org/repos/asf/cloudstack/blob/d44e25ef/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java ---------------------------------------------------------------------- diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java index 5b7af4d..9cf5e77 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java @@ -40,7 +40,7 @@ public class MessageBusBase implements MessageBus { _gate = new Gate(); _pendingActions = new ArrayList<ActionRecord>(); - _subscriberRoot = new SubscriptionNode("/", null); + _subscriberRoot = new SubscriptionNode(null, "/", null); } @Override @@ -72,10 +72,13 @@ public class MessageBusBase implements MessageBus { @Override public void unsubscribe(String subject, MessageSubscriber subscriber) { if(_gate.enter()) { - SubscriptionNode current = locate(subject, null, false); - if(current != null) - current.removeSubscriber(subscriber); - + if(subject != null) { + SubscriptionNode current = locate(subject, null, false); + if(current != null) + current.removeSubscriber(subscriber, false); + } else { + this._subscriberRoot.removeSubscriber(subscriber, true); + } _gate.leave(); } else { synchronized(_pendingActions) { @@ -83,7 +86,48 @@ public class MessageBusBase implements MessageBus { } } } - + + @Override + public void clearAll() { + if(_gate.enter()) { + _subscriberRoot.clearAll(); + doPrune(); + _gate.leave(); + } else { + synchronized(_pendingActions) { + _pendingActions.add(new ActionRecord(ActionType.ClearAll, null, null)); + } + } + } + + @Override + public void prune() { + if(_gate.enter()) { + doPrune(); + _gate.leave(); + } else { + synchronized(_pendingActions) { + _pendingActions.add(new ActionRecord(ActionType.Prune, null, null)); + } + } + } + + private void doPrune() { + List<SubscriptionNode> trimNodes = new ArrayList<SubscriptionNode>(); + _subscriberRoot.prune(trimNodes); + + while(trimNodes.size() > 0) { + SubscriptionNode node = trimNodes.remove(0); + SubscriptionNode parent = node.getParent(); + if(parent != null) { + parent.removeChild(node.getNodeKey()); + if(parent.isTrimmable()) { + trimNodes.add(parent); + } + } + } + } + @Override public void publish(String senderAddress, String subject, PublishScope scope, Object args) { @@ -119,12 +163,22 @@ public class MessageBusBase implements MessageBus { break; case Unsubscribe : - { + if(record.getSubject() != null) { SubscriptionNode current = locate(record.getSubject(), null, false); if(current != null) - current.removeSubscriber(record.getSubscriber()); + current.removeSubscriber(record.getSubscriber(), false); + } else { + this._subscriberRoot.removeSubscriber(record.getSubscriber(), true); } break; + + case ClearAll : + _subscriberRoot.clearAll(); + break; + + case Prune : + doPrune(); + break; default : assert(false); @@ -136,11 +190,13 @@ public class MessageBusBase implements MessageBus { } } - private SubscriptionNode locate(String subject, List<SubscriptionNode> chainFromTop, boolean createPath) { assert(subject != null); + // "/" is special name for root node + if(subject.equals("/")) + return _subscriberRoot; String[] subjectPathTokens = subject.split("\\."); return locate(subjectPathTokens, _subscriberRoot, chainFromTop, createPath); @@ -159,7 +215,7 @@ public class MessageBusBase implements MessageBus { SubscriptionNode next = current.getChild(subjectPathTokens[0]); if(next == null) { if(createPath) { - next = new SubscriptionNode(subjectPathTokens[0], null); + next = new SubscriptionNode(current, subjectPathTokens[0], null); current.addChild(subjectPathTokens[0], next); } else { return null; @@ -180,7 +236,9 @@ public class MessageBusBase implements MessageBus { // private static enum ActionType { Subscribe, - Unsubscribe + Unsubscribe, + ClearAll, + Prune } private static class ActionRecord { @@ -262,13 +320,14 @@ public class MessageBusBase implements MessageBus { } private static class SubscriptionNode { - @SuppressWarnings("unused") private String _nodeKey; private List<MessageSubscriber> _subscribers; private Map<String, SubscriptionNode> _children; + private SubscriptionNode _parent; - public SubscriptionNode(String nodeKey, MessageSubscriber subscriber) { + public SubscriptionNode(SubscriptionNode parent, String nodeKey, MessageSubscriber subscriber) { assert(nodeKey != null); + _parent = parent; _nodeKey = nodeKey; _subscribers = new ArrayList<MessageSubscriber>(); @@ -278,16 +337,30 @@ public class MessageBusBase implements MessageBus { _children = new HashMap<String, SubscriptionNode>(); } + public SubscriptionNode getParent() { + return _parent; + } + + public String getNodeKey() { + return _nodeKey; + } + @SuppressWarnings("unused") public List<MessageSubscriber> getSubscriber() { return _subscribers; } public void addSubscriber(MessageSubscriber subscriber) { - _subscribers.add(subscriber); + if(!_subscribers.contains(subscriber)) + _subscribers.add(subscriber); } - public void removeSubscriber(MessageSubscriber subscriber) { + public void removeSubscriber(MessageSubscriber subscriber, boolean recursively) { + if(recursively) { + for(Map.Entry<String, SubscriptionNode> entry : _children.entrySet()) { + entry.getValue().removeSubscriber(subscriber, true); + } + } _subscribers.remove(subscriber); } @@ -299,10 +372,37 @@ public class MessageBusBase implements MessageBus { _children.put(key, childNode); } + public void removeChild(String key) { + _children.remove(key); + } + + public void clearAll() { + // depth-first + for(Map.Entry<String, SubscriptionNode> entry : _children.entrySet()) { + entry.getValue().clearAll(); + } + _subscribers.clear(); + } + + public void prune(List<SubscriptionNode> trimNodes) { + assert(trimNodes != null); + + for(Map.Entry<String, SubscriptionNode> entry : _children.entrySet()) { + entry.getValue().prune(trimNodes); + } + + if(isTrimmable()) + trimNodes.add(this); + } + public void notifySubscribers(String senderAddress, String subject, Object args) { for(MessageSubscriber subscriber : _subscribers) { subscriber.onPublishMessage(senderAddress, subject, args); } } + + public boolean isTrimmable() { + return _children.size() == 0 && _subscribers.size() == 0; + } } } http://git-wip-us.apache.org/repos/asf/cloudstack/blob/d44e25ef/framework/ipc/test/org/apache/cloudstack/messagebus/TestMessageBus.java ---------------------------------------------------------------------- diff --git a/framework/ipc/test/org/apache/cloudstack/messagebus/TestMessageBus.java b/framework/ipc/test/org/apache/cloudstack/messagebus/TestMessageBus.java new file mode 100644 index 0000000..dabfdd3 --- /dev/null +++ b/framework/ipc/test/org/apache/cloudstack/messagebus/TestMessageBus.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cloudstack.messagebus; + +import javax.inject.Inject; + +import junit.framework.TestCase; + +import org.apache.cloudstack.framework.messagebus.MessageBus; +import org.apache.cloudstack.framework.messagebus.MessageSubscriber; +import org.apache.cloudstack.framework.messagebus.PublishScope; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration(locations="classpath:/MessageBusTestContext.xml") +public class TestMessageBus extends TestCase { + + @Inject MessageBus _messageBus; + + @Test + public void testExactSubjectMatch() { + _messageBus.subscribe("Host", new MessageSubscriber() { + + @Override + public void onPublishMessage(String senderAddress, String subject, Object args) { + Assert.assertEquals(subject, "Host"); + } + }); + + _messageBus.publish(null, "Host", PublishScope.LOCAL, null); + _messageBus.publish(null, "VM", PublishScope.LOCAL, null); + _messageBus.clearAll(); + } + + @Test + public void testRootSubjectMatch() { + _messageBus.subscribe("/", new MessageSubscriber() { + + @Override + public void onPublishMessage(String senderAddress, String subject, Object args) { + Assert.assertTrue(subject.equals("Host") || subject.equals("VM")); + } + }); + + _messageBus.publish(null, "Host", PublishScope.LOCAL, null); + _messageBus.publish(null, "VM", PublishScope.LOCAL, null); + _messageBus.clearAll(); + } + + @Test + public void testMiscMatch() { + MessageSubscriber subscriberAtParentLevel = new MessageSubscriber() { + @Override + public void onPublishMessage(String senderAddress, String subject, Object args) { + Assert.assertTrue(subject.startsWith(("Host")) || subject.startsWith("VM")); + } + }; + + MessageSubscriber subscriberAtChildLevel = new MessageSubscriber() { + @Override + public void onPublishMessage(String senderAddress, String subject, Object args) { + Assert.assertTrue(subject.equals("Host.123")); + } + }; + + subscriberAtParentLevel = Mockito.spy(subscriberAtParentLevel); + subscriberAtChildLevel = Mockito.spy(subscriberAtChildLevel); + + _messageBus.subscribe("Host", subscriberAtParentLevel); + _messageBus.subscribe("VM", subscriberAtParentLevel); + _messageBus.subscribe("Host.123", subscriberAtChildLevel); + + _messageBus.publish(null, "Host.123", PublishScope.LOCAL, null); + _messageBus.publish(null, "Host.321", PublishScope.LOCAL, null); + _messageBus.publish(null, "VM.123", PublishScope.LOCAL, null); + + Mockito.verify(subscriberAtParentLevel).onPublishMessage(null, "Host.123", null); + Mockito.verify(subscriberAtParentLevel).onPublishMessage(null, "Host.321", null); + Mockito.verify(subscriberAtParentLevel).onPublishMessage(null, "VM.123", null); + Mockito.verify(subscriberAtChildLevel).onPublishMessage(null, "Host.123", null); + + Mockito.reset(subscriberAtParentLevel); + Mockito.reset(subscriberAtChildLevel); + + _messageBus.unsubscribe(null, subscriberAtParentLevel); + _messageBus.publish(null, "Host.123", PublishScope.LOCAL, null); + _messageBus.publish(null, "VM.123", PublishScope.LOCAL, null); + + Mockito.verify(subscriberAtChildLevel).onPublishMessage(null, "Host.123", null); + Mockito.verify(subscriberAtParentLevel, Mockito.times(0)).onPublishMessage(null, "Host.123", null); + Mockito.verify(subscriberAtParentLevel, Mockito.times(0)).onPublishMessage(null, "VM.123", null); + + _messageBus.clearAll(); + } +} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/d44e25ef/framework/ipc/test/resources/MessageBusTestContext.xml ---------------------------------------------------------------------- diff --git a/framework/ipc/test/resources/MessageBusTestContext.xml b/framework/ipc/test/resources/MessageBusTestContext.xml new file mode 100644 index 0000000..fcfcb08 --- /dev/null +++ b/framework/ipc/test/resources/MessageBusTestContext.xml @@ -0,0 +1,51 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns:context="http://www.springframework.org/schema/context" + xmlns:tx="http://www.springframework.org/schema/tx" + xmlns:aop="http://www.springframework.org/schema/aop" + xsi:schemaLocation="http://www.springframework.org/schema/beans + http://www.springframework.org/schema/beans/spring-beans-3.0.xsd + http://www.springframework.org/schema/tx + http://www.springframework.org/schema/tx/spring-tx-3.0.xsd + http://www.springframework.org/schema/aop + http://www.springframework.org/schema/aop/spring-aop-3.0.xsd + http://www.springframework.org/schema/context + http://www.springframework.org/schema/context/spring-context-3.0.xsd"> + <context:annotation-config /> + + <bean id="onwireRegistry" class="org.apache.cloudstack.framework.serializer.OnwireClassRegistry" + init-method="scan" > + <property name="packages"> + <list> + <value>org.apache.cloudstack.framework</value> + </list> + </property> + </bean> + + <bean id="messageSerializer" class="org.apache.cloudstack.framework.serializer.JsonMessageSerializer"> + <property name="onwireClassRegistry" ref="onwireRegistry" /> + </bean> + + <bean id="messageBus" class = "org.apache.cloudstack.framework.messagebus.MessageBusBase"> + <property name="messageSerializer" ref="messageSerializer" /> + </bean> + +</beans>
